1.5 Java 9中的响应式编程
到了Java 9,JDK开始支持响应式编程。从Java 9的JDK中可以找到java.util. concurrent.Flow类,其中所包含的接口和定义的静态方法就是用来支持Flow控制编程的,并且主要基于里面的Publisher、Subscriber、Subscription等接口来支持响应式编程。
本节会分4部分介绍。第1部分是响应式编程接口的介绍。第2部分是一个Java 9响应式编程入门的简单Demo。第3部分是对JDK的SubmissionPublisher<T>类的源码解读。最后一部分是使用Java 9的响应式编程整合Spring的实战案例,以便让大家深入理解和运用Java 9提供的API,并能够快速运用到自己开发的项目中。
1.5.1 响应式编程接口


我们可以看到,Flow.Publisher<T>接口是一个函数式接口(其上有注解@Functional Interface),它只有一个抽象方法public void subscribe(Subscriber<? super T> subscriber);。
● void onSubscribe(Subscription subscription) :在给定的Subscription想要使用Subscriber其他方法的前提下,必须先调用这个方法。
● void onError(Throwable throwable):当Publisher或者Subscription遇到了不可恢复的错误时,调用此方法,然后Subscription就不能再调用Subscriber的其他方法了。
● void onNext(T item):获取Subscription的下一个元素。
● void onComplete:在调用这个方法后,Subscription就不能再调用Subscriber的其他方法了。
● void cancel:调用这个方法造成的直接后果是Subscription会停止接收信息。
● void request(long n):Subscription调用这个方法添加 n个元素。如果 n小于0,Subscriber将收到一个onError信号。如果n等于0,那么调用complete方法,否则调用onNext(T item)方法。
1.5.2 Java 9响应式编程入门Demo
public class DockerXDemoSubscriber<T> implements Flow.Subscriber<T>{ private String name; private Flow.Subscription subscription; final long bufferSize; long count; public String getName() { return name; } public Flow.Subscription getSubscription() { return subscription; } public DockerXDemoSubscriber(long bufferSize,String name) { this.bufferSize = bufferSize; this.name = name; } public void onSubscribe(Flow.Subscription subscription) { //count = bufferSize - bufferSize / 2; //在消费一半的时候重新请求 (this.subscription = subscription).request(bufferSize); System.out.println("开始onSubscribe订阅"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } public void onNext(T item) { //if (--count <= 0) subscription.request(count = bufferSize - //bufferSize / 2); System.out.println(" ###### " + Thread.currentThread().getName()+ " name: " + name + " item: " + item + " ######"); System.out.println(name + " received: " + item); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } public void onError(Throwable throwable) { throwable.printStackTrace(); } public void onComplete() { System.out.println("Completed"); } }
public class DockerXDemoPublisher<T> implements Flow.Publisher<T>,
AutoCloseable {
private final ExecutorService executor; // daemon-based
private CopyOnWriteArrayList<DockerXDemoSubscription> list = new
public void submit(T item) {
System.out.println("********* 开始发布元素item: " + item + "
list.forEach(e -> {
e.future=executor.submit(() -> {
public DockerXDemoPublisher(ExecutorService executor) {
this.executor = executor;
public void close() {
list.forEach(e -> {
executor.submit(() -> { e.subscriber.onComplete();});
public void subscribe(Flow.Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new ockerXDemoSubscription(subscriber,
list.add(new DockerXDemoSubscription(subscriber,executor));
static class DockerXDemoSubscription<T> implements Flow.Subscription
private final Flow.Subscriber<? super T> subscriber;
private final ExecutorService executor;
private Future<?> future;
private T item;
private boolean completed;
public DockerXDemoSubscription(Flow.Subscriber<? super T>
subscriber,ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
public void request(long n) {
if (n != 0 && !completed) {
if (n < 0) {
IllegalArgumentException ex = new
executor.execute(() -> subscriber.onError(ex));
} else {
future = executor.submit(() -> {
} else {
public void cancel() {
completed = true;
if (future != null && !future.isCancelled()) {
一个需要注意的细节是,SubmissionPublisher<T>类有一个submit(T item)方法。通过查阅Javadoc可知,该方法就是通过异步调用每个订阅它的subscriber的onNext方法将发布的给定元素传送过去的,而当针对subscriber的资源不可用时,阻塞不会中断。这样SubmissionPublisher<T>会提交元素给当前的订阅者(subscriber),直到它关闭为止。本例对其进行了简单的实现,后面会具体讲解。
private static void demoSubscribe(DockerXDemoPublisher<Integer> publisher, String subscriberName){ DockerXDemoSubscriber<Integer> subscriber = new DockerXDemoSubscriber<>(4L,subscriberName); publisher.subscribe(subscriber); }
ExecutorService execService = ForkJoinPool.commonPool(); //Executors.newFixedThreadPool(3); try (DockerXDemoPublisher<Integer> publisher = new DockerXDemoPublisher<>(execService)) { demoSubscribe(publisher,"One"); demoSubscribe(publisher,"Two"); demoSubscribe(publisher,"Three"); IntStream.range(1,5).forEach(publisher::submit); } finally { ... }
finally { try { execService.shutdown(); int shutdownDelaySec = 1; System.out.println("………………等待" + shutdownDelaySec + " 秒后结束服 务………"); execService.awaitTermination(shutdownDelaySec,TimeUnit.SECONDS); } catch (Exception ex) { System.out.println("捕获到execService.awaitTermination()方法的异常:" + ex.getClass().getName()); } finally { System.out.println("调用execService.shutdownNow()结束服务..."); List<Runnable> l = execService.shutdownNow(); System.out.println("还剩"+l.size() + " 个任务等待执行,服务已关闭"); } }
开始onSubscribe订阅 ###### ForkJoinPool.commonPool-worker-9 name: One item: null ###### One received: null 开始onSubscribe订阅 ###### ForkJoinPool.commonPool-worker-9 name: Two item: null ###### Two received: null 开始onSubscribe订阅 ###### ForkJoinPool.commonPool-worker-9 name: Three item: null ###### Three received: null ***************** 开始发布元素item: 1 ***************** ***************** 开始发布元素item: 2 ***************** ***************** 开始发布元素item: 3 ***************** ***************** 开始发布元素item: 4 ***************** ###### ForkJoinPool.commonPool-worker-9 name: One item: 1 ###### One received: 1 ###### ForkJoinPool.commonPool-worker-2 name: Two item: 1 ###### Two received: 1 ###### ForkJoinPool.commonPool-worker-4 name: One item: 2 ###### One received: 2 ###### ForkJoinPool.commonPool-worker-11 name: Three item: 1 ###### Three received: 1 ###### ForkJoinPool.commonPool-worker-13 name: Two item: 2 ###### Two received: 2 ###### ForkJoinPool.commonPool-worker-15 name: Three item: 2 ###### Three received: 2 ###### ForkJoinPool.commonPool-worker-6 name: One item: 3 ###### One received: 3 ………………等待1 秒后结束服务……………… ###### main name: Two item: 3 ###### Two received: 3 ###### ForkJoinPool.commonPool-worker-9 name: Three item: 3 ###### Three received: 3 ###### ForkJoinPool.commonPool-worker-13 name: One item: 4 ###### One received: 4 ###### ForkJoinPool.commonPool-worker-4 name: Two item: 4 ###### Two received: 4 ###### ForkJoinPool.commonPool-worker-15 name: Three item: 4 ###### Three received: 4 Completed Completed Completed 调用execService.shutdownNow()结束服务... 还剩0 个任务等待执行,服务已关闭
1.5.3 SubmissionPublisher类的源码解读
我们经常会通过Executors.newFixedThreadPool(int nThreads)和ForkJoinPool.commonPool来获得一个线程池。其中Executors.newFixedThreadPool(int nThreads)用于创建一个指定最大线程数量的线程池,池中的每一个线程除非明确指定要关闭,否则会一直存在。
ForkJoinPool.commonPool是SubmissionPublisher内置的默认Executor,ForkJoinPool. commonPool内部调用了new ForkJoinPool((byte)0);,传入的参数0没什么用,其会通过System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");获取并发线程数。如果并未设置java.util.concurrent.ForkJoinPool.common.parallelism属性,将使用Runtime.getRuntime().availableProcessors() -1,即本机CPU核数-1。如果CPU核支持超线程技术,则核数为CPU的线程数量。现在,大家应该可以理解Demo中的这段代码了。
@SuppressWarnings("serial") @jdk.internal.vm.annotation.Contended private static final class BufferedSubscription<T> implements Flow.Subscription,ForkJoinPool.ManagedBlocker { // Order-sensitive field declarations long timeout; // > 0 if timed wait volatile long demand; // # unfilled requests int maxCapacity; // reduced on OOME int putStat; // offer result for ManagedBlocker volatile int ctl; // atomic run state flags volatile int head; // next position to take int tail; // next position to put Object[] array; // buffer: null if disabled //这里包含了我们要传入的订阅者的信息 Flow.Subscriber<? super T> subscriber; // null if disabled Executor executor; // null if disabled BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> onNextHandler; volatile Throwable pendingError; // holds until onError issued volatile Thread waiter; // blocked producer thread T putItem; // for offer within ManagedBlocker //这里通过next来构造Publisher的执行链,也就是一堆订阅者在此做一个编排 BufferedSubscription<T> next; // used only by publisher //这里将发送失败需要重试的信息放到一起 BufferedSubscription<T> nextRetry; // used only by publisher // ctl values static final int ACTIVE = 0x01; // consumer task active static final int CONSUME = 0x02; // keep-alive for consumer task static final int DISABLED = 0x04; // final state static final int ERROR = 0x08; // signal onError then disable static final int SUBSCRIBE = 0x10; // signal onSubscribe static final int COMPLETE = 0x20; // signal onComplete when done static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel /** * 当maxBufferCapacity大于这个值时,使用默认的初始大小, * maxBufferCapacity的大小必须为2的n次方 **/ static final int DEFAULT_INITIAL_CAP = 32;
public void close() { if (!closed) { BufferedSubscription<T> b; synchronized (this) { //no need to re-check closed here b = clients; clients = null; closed = true; } while (b != null) { BufferedSubscription<T> next = b.next; b.next = null; b.onComplete(); b = next; } } }
我们来观察里面的subscribe(Flow.Subscriber<? super T> subscriber)方法。在调用这个方法后,会生成一个BufferedSubscription实例,其中包装了subscriber。然后会调用subscription.onSubscribe方法,在这个方法内会调用startOrDisable方法。
在这里,我们可以看到e.execute(new ConsumerTask<T>(this))。其中的ConsumerTask继承自抽象类ForkJoinTask<Void>,并实现了Runnable接口和CompletableFuture.Asynchronous-CompletionTask接口。其构造函数传入的参数是一个BufferedSubscription实例,这样ConsumerTask的run方法其实是调用BufferedSubscription实例的consume方法。而在consume方法里,可以看到我们传入的subscriber实例在此出现,同时里面还调用了checkControl(s,c)方法。这个方法很关键,在此通过s.onSubscribe(this)将BufferedSubscription实例作为参数传入subscriber的onSubscribe中。
public void subscribe(Flow.Subscriber<? super T> subscriber) { if (subscriber == null) throw new NullPointerException(); BufferedSubscription<T> subscription = new BufferedSubscription<T>(subscriber,executor, onNextHandler,maxBufferCapacity); synchronized (this) { for (BufferedSubscription<T> b = clients,pred = null;;) { if (b == null) { Throwable ex; subscription.onSubscribe(); if ((ex = closedException) != null) subscription.onError(ex); else if (closed) subscription.onComplete(); else if (pred == null) clients = subscription; else pred.next = subscription; break; } BufferedSubscription<T> next = b.next; if (b.isDisabled()) { // remove b.next = null; // detach if (pred == null) clients = next; else pred.next = next; } else if (subscriber.equals(b.subscriber)) { b.onError(new IllegalStateException("Duplicate subscribe")); break; } else pred = b; b = next; } } } /** * Responds to control events in consume(). */ private boolean checkControl(Flow.Subscriber<? super T> s,int c) { boolean stat = true; if ((c & SUBSCRIBE) != 0) { if (CTL.compareAndSet(this,c,c & ~SUBSCRIBE)) { try { if (s != null) s.onSubscribe(this); } catch (Throwable ex) { onError(ex); } } } else if ((c & ERROR) != 0) { Throwable ex = pendingError; ctl = DISABLED; //no need for CAS if (ex != null) { //null if errorless cancel try { if (s != null) s.onError(ex); } catch (Throwable ignore) { } } } else { detach(); stat = false; } return stat; }
● offer:该方法用于将元素发布给subscriber,subscriber可以异步无阻塞地调用它的onNext方法。同时,这个方法可以在超时的时候放弃一些元素,我们可以指定超时时间。在这里,我们还可以指定放弃处理的规则(其实就是一个BiPredicate条件表达式)。
● submit:该方法可以帮助我们以一个简单的方式来将元素发布给subscriber。从synchronized(this)代码块中的while语句可得知,该方法会阻塞调用,直到资源分配给了当前所有的subscriber。若资源进行了分配但subscriber没拿到,则会重新给,直至所有subscriber都拿到资源。该方法与offer方法的区别是后者有超时机制。
● consume:该方法可以定义请求到的元素要消费的动作(在SubmissionPublisher类定义中有Subscriber接口的内部类实现),接下来我们通过下面的这个例子来清晰明了地进行解释。
public class ConsumeSubmissionPublisher { public static void main(String[] args) throws InterruptedException, ExecutionException { publish(); } public static void publish() throws InterruptedException,Execution Exception { CompletableFuture future = null; try (SubmissionPublisher publisher = new SubmissionPublisher <Long>()) { System.out.println("Subscriber Buffer Size: " + publisher. getMaxBufferCapacity()); future=publisher.consume(System.out::println); LongStream.range(1,10).forEach(publisher::submit); } finally { future.get(); } } }
public CompletableFuture<Void> consume(Consumer<? super T> consumer) { if (consumer == null) throw new NullPointerException(); CompletableFuture<Void> status = new CompletableFuture<>(); subscribe(new ConsumerSubscriber<T>(status,consumer)); return status; }
当调用publisher.consume时,其实就是内部生成一个订阅者对象,并通过subscribe(new ConsumerSubscriber<T>(status,consumer));进行订阅。ConsumerSubscriber是一个通过装饰模式得到的增强类,通过consume方法,我们可以得到一个CompletableFuture实例。这样,就可以通过CompletableFuture实例提供的get方法来做到让应用程序一直运行,直到所有的元素都处理完毕。请看前面例子的运行结果:
Subscriber Buffer Size: 256 1 2 3 4 5 6 7 8 9
1.5.4 响应式编程整合Spring实战案例
@Component public class Stock { private final Map<Product,StockItem> stockItemMap = new ConcurrentHashMap<>(); private StockItem getItem(Product product){ //如果没有此商品,添加一个key,返回null值即可 stockItemMap.putIfAbsent(product,new StockItem()); return stockItemMap.get(product); } public void store(Product product,long amount){ getItem(product).store(amount); } public void remove(Product product,long amount) throws ProductIs OutOfStock { if (getItem(product).remove(amount) != amount) throw new ProductIsOutOfStock(product); } }
public class StockItem { private final AtomicLong amountItemStock = new AtomicLong(0); public void store(long n) { amountItemStock.accumulateAndGet(n,(pre,mount) -> pre + mount); } //下单时,所需商品数量没超过库存数量的话,就用库存数量减去所需商品数量,返回此次 //从库存移除商品的具体数量;超过的话,不对库存做任何操作,返回此次所移除库存商品 //的数量,即为0。 public long remove(long n) { class RemoveData { long remove; } RemoveData removeData = new RemoveData(); amountItemStock.accumulateAndGet(n, (pre,mount) -> pre >= n ? pre - (removeData.remove = mount) : pre - (removeData.remove = 0L)); return removeData.remove; } }
public class StockMaintain implements Flow.Subscriber<Order> {
private static final Logger log = LoggerFactory.
private Stock stock;
private Flow.Subscription subscription = null;
private ExecutorService execService = ForkJoinPool.commonPool();
public StockMaintain(@Autowired Stock stock) {
this.stock = stock;
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
通过查阅submit的源码可知,这个动作会被封装成一个RunnableFuture<V> extends Runnable,Future<V>并返回。这样方便我们获取这个动作在子线程上执行的信息,同时方便操作其行为。而execute(ftask)最后其实就是通过new Thread(r).start来执行的。用一个现实中的场景来讲,就是快递员将快递物品送到你手里,你不会立马使用快递包裹里的东西而让快递员一直等你签收。关于任务的处理过程已经清晰明了地展现在我们面前了,交给系统自己处理吧,而我们要做的就是通过onNext方法获取下一个元素:
@Override public void onNext(Order order) { execService.submit(() -> { log.info("Thread {}",Thread.currentThread().getName()); order.getItems().forEach(item -> { try { stock.remove(item.getProduct(),item.getAmount()); log.info("有{} 件商品从库存消耗",item.getAmount()); } catch (ProductIsOutOfStock productIsOutOfStock) { log.error("商品库存不足"); } }); subscription.request(1); }); }
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task,null); execute(ftask); return ftask; }
//default Executor setup; nearly the same as CompletableFuture /** * Default executor -- ForkJoinPool.commonPool() unless it cannot * support parallelism. */ private static final Executor ASYNC_POOL = (ForkJoinPool.getCommonPoolParallelism() > 1) ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ private static final class ThreadPerTaskExecutor implements Executor { ThreadPerTaskExecutor(){} //prevent access constructor creation public void execute(Runnable r) { new Thread(r).start(); } }
@Test public void teststockRemoval() throws InterruptedException { Stock stock = new Stock(); SubmissionPublisher<Order> p = new SubmissionPublisher<>(); ... }
Product product = new Product(); stock.store(product,40); OrderItem item = new OrderItem(); item.setProduct(product); item.setAmount(10); Order order = new Order(); List<OrderItem> items = new LinkedList<>(); items.add(item); order.setItems(items);
我们将订单提交给publisher 10次,也就是下了10个相同的订单,这样也能测试代码的所有功能,包括超过库存数量的拒绝修改的反馈:
for (int i = 0; i < 10; i++)
for (int j = 0; j < 10; j++) {
log.info("Sleeping a bit...");
17-12-24 01:22:43,161 INFO StockMaintain:33- ******调用onSubscr ibe****** 17-12-24 01:22:43,169 INFO TestStockMaintain:39- 所有订单已经提交完毕 17-12-24 01:22:43,179 INFO TestStockMaintain:41- Sleeping a bit... 17-12-24 01:22:43,187 INFO StockMaintain:41- Thread ForkJoinPool.com monPool-worker-9 17-12-24 01:22:43,187 INFO StockMaintain:41- Thread ForkJoin Pool. commonPool-worker-11 17-12-24 01:22:43,187 INFO StockMaintain:41- Thread ForkJoinPool.com monPool-worker-2 17-12-24 01:22:43,190 INFO TestStockMaintain:41- Sleeping a bit... 17-12-24 01:22:43,202 INFO StockMaintain:45- 有10 件商品从库存消耗 17-12-24 01:22:43,202 INFO StockMaintain:45- 有10 件商品从库存消耗 17-12-24 01:22:43,206 INFO StockMaintain:41- Thread ForkJoinPool.comm onPool-worker-2 17-12-24 01:22:43,207 INFO StockMaintain:45- 有10 件商品从库存消耗 17-12-24 01:22:43,202 INFO StockMaintain:45- 有10 件商品从库存消耗 17-12-24 01:22:43,209 INFO StockMaintain:41- Thread ForkJoinPool.com monPool-worker-2 17-12-24 01:22:43,207 INFO TestStockMaintain:41- Sleeping a bit... 17-12-24 01:22:43,207 INFO StockMaintain:41- Thread ForkJoinPool.comm onPool-worker-4 17-12-24 01:22:43,212 ERROR StockMaintain:47- 商品库存不足 17-12-24 01:22:43,222 INFO StockMaintain:41- Thread ForkJoinPool.com monPool-worker-2 17-12-24 01:22:43,224 ERROR StockMaintain:47- 商品库存不足 17-12-24 01:22:43,225 INFO StockMaintain:41- Thread ForkJoinPool.com monPool-worker-2 17-12-24 01:22:43,226 ERROR StockMaintain:47- 商品库存不足 17-12-24 01:22:43,228 INFO StockMaintain:41- Thread ForkJoinPool.com monPool-worker-13 17-12-24 01:22:43,210 INFO StockMaintain:41- Thread ForkJoinPool.com monPool-worker-9 17-12-24 01:22:43,229 ERROR StockMaintain:47- 商品库存不足 17-12-24 01:22:43,227 INFO TestStockMaintain:41- Sleeping a bit... 17-12-24 01:22:43,214 ERROR StockMaintain:47- 商品库存不足 17-12-24 01:22:43,231 ERROR StockMaintain:47- 商品库存不足 17-12-24 01:22:43,244 INFO TestStockMaintain:41- Sleeping a bit... 17-12-24 01:22:43,256 INFO TestStockMaintain:41- Sleeping a bit... 17-12-24 01:22:43,268 INFO TestStockMaintain:41- Sleeping a bit... 17-12-24 01:22:43,279 INFO TestStockMaintain:41- Sleeping a bit... 17-12-24 01:22:43,290 INFO TestStockMaintain:41- Sleeping a bit... 17-12-24 01:22:43,301 INFO TestStockMaintain:41- Sleeping a bit... 17-12-24 01:22:43,312 INFO TestStockMaintain:45- Publisher已关闭 17-12-24 01:22:43,312 INFO StockMaintain:61- 调用onComplete