(1). RingBuffer组装依赖

通过代码:RingBuffer来组装EventHandler.

(2). 组装代码

package help.lixin.disruptor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;

import help.lixin.disruptor.consumer.UserEventHandler1;
import help.lixin.disruptor.consumer.UserEventHandler2;
import help.lixin.disruptor.event.UserEvent;
import help.lixin.disruptor.factory.UserEventFactory;
import help.lixin.disruptor.pojo.User;
import help.lixin.disruptor.producer.UserEventProducer;

public class UserEventTest3 {
	public static void main(String[] args) throws Exception {
		/**
		 * SequenceBarrier sequenceBarrier1 = ringBuffer.newBarrier(); <br/>
		 * SequenceBarrier sequenceBarrier2 = ringBuffer.newBarrier(eventProcessorH1.getSequence()); <br/>
		 * 最终无非不过就是两个EvenHandler串行处理,如下图:
		 * -> h1 -> h2 <br/>
		 */
		
		// 1. 初始化RingBuffer的大小,必须是2的指数
		int bufferSize = 1024;

		// 2.Event处理器(消费者)
		UserEventHandler1 h1 = new UserEventHandler1();
		UserEventHandler2 h2 = new UserEventHandler2();

		// 3. 创建RingBuffer
		RingBuffer<UserEvent> ringBuffer = RingBuffer.createMultiProducer( //
				new UserEventFactory(), //
				bufferSize, //
				new BlockingWaitStrategy());
		// 
		SequenceBarrier sequenceBarrier1 = ringBuffer.newBarrier();
		BatchEventProcessor<UserEvent> eventProcessorH1 = new BatchEventProcessor<UserEvent>(ringBuffer,
				sequenceBarrier1, h1);
		eventProcessorH1.setExceptionHandler(new ExceptionHandler<UserEvent>() {
			@Override
			public void handleEventException(Throwable ex, long sequence, UserEvent event) {
			}

			@Override
			public void handleOnStartException(Throwable ex) {
			}

			@Override
			public void handleOnShutdownException(Throwable ex) {
			}
		});

		SequenceBarrier sequenceBarrier2 = ringBuffer.newBarrier(eventProcessorH1.getSequence());
		BatchEventProcessor<UserEvent> eventProcessorH2 = new BatchEventProcessor<UserEvent>(ringBuffer,
				sequenceBarrier2, h2);
		eventProcessorH2.setExceptionHandler(new ExceptionHandler<UserEvent>() {
			@Override
			public void handleEventException(Throwable ex, long sequence, UserEvent event) {
			}

			@Override
			public void handleOnStartException(Throwable ex) {
			}

			@Override
			public void handleOnShutdownException(Throwable ex) {
			}
		});
		ringBuffer.addGatingSequences(eventProcessorH2.getSequence());
		
		// 5. 创建线程池
		ExecutorService pool = Executors.newCachedThreadPool();
		// 把消息处理器提交给线程池
		pool.execute(eventProcessorH1);
		pool.execute(eventProcessorH2);

		UserEventProducer producer = new UserEventProducer(ringBuffer);
		ExecutorService executorService = Executors.newCachedThreadPool();
		// 只定义一个,方便查看结果
		int count = 1;
		for (int i = 0; i < count; i++) {
			final int id = i;
			executorService.submit(() -> {
				User user = new User();
				user.setId(id);
				user.setName("lixin " + id);
				user.setAge(id + 10);
				user.setAddress("深圳 " + id);
				producer.publish(user);
			});
		}
		TimeUnit.SECONDS.sleep(1000000);
	}
}

(3). 输出结果

# 1处理完之后,交给2处理.
1开始1606320078710 [pool-1-thread-1 ] UserEvent->UserEvent [id=0, name=lixin 0, age=10, address=深圳 0]
1结束1606320079715 [pool-1-thread-1 ] UserEvent->UserEvent [id=0, name=lixin 0, age=10, address=深圳 0  Handler1]

2开始1606320079715 [pool-1-thread-2 ] UserEvent->UserEvent [id=0, name=lixin 0, age=10, address=深圳 0  Handler1]
2结束1606320079715 [pool-1-thread-2 ] UserEvent->UserEvent [id=0, name=lixin 0, age=10, address=深圳 0  Handler1  Handler2]

(4). 总结

多个Barrier之间,若有依赖,则属于串行运行.