LAMAX Distruptor是一个高性能,低延迟的producer-consumer框架。到底有多吊呢,可以参考Github上给出的性能测试结果:
https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results
其核心就是RingBuffer这个东东, 感兴趣的同学可以参考下面的链接了解更多:
http://mechanitis.blogspot.jp/2011/06/dissecting-disruptor-whats-so-special.html
可以实现多种组合模式,我们这里列举几个比较常用的,在开始之前我们先定义需要在生产者和消费者之间传递的对象,以及产生实例的工厂方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class LongEvent { private long value; public void setValue(long value) { this.value = value; } @Override public String toString(){ return this.value + ""; } public static EventFactory FACTORY = new EventFactory(){ @Override public LongEvent newInstance() { return new LongEvent(); } }; }
|
因为所有的Event对象都是托管给Distruptor由EventFactory创建的,且需要重复利用,所以定义Event的时候必须要提供setter方法。
其次我们需要定义一个处理Event的handler:
1 2 3 4 5 6 7 8 9 10 11
| public class LongEventHandler implements EventHandler { private final String name; public LongEventHandler(String name){ this.name = name; } public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println(this.name + ": " + event); } }
|
然后我们定义一个产生Event的Producer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class LongEventProducer { private final RingBuffer ringBuffer; public LongEventProducer(RingBuffer ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(long val) { long sequence = ringBuffer.next(); try { LongEvent event = ringBuffer.get(sequence); event.setValue(val); } finally { ringBuffer.publish(sequence); } } }
|
Single producer- single consumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args){ int bufferSize = 1024; Executor executor = Executors.newCachedThreadPool(); Disruptor disruptor = new Disruptor(LongEvent.FACTORY, bufferSize, executor, ProducerType.MULTI, new SleepingWaitStrategy()); LongEventHandler handler = new new LongEventHandler("p1-s1"); disruptor.handleEventsWith(handler); LongEventProducer producer = new LongEventProducer(disruptor.getRingBuffer()); disruptor.start(); for (int i = 0; i < 1000; i++) { producer.onData(i); } }
|
Single producer-pipeline consumers:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void main(String[] args){ int bufferSize = 1024; Executor executor = Executors.newCachedThreadPool(); Disruptor disruptor = new Disruptor(LongEvent.FACTORY, bufferSize, executor, ProducerType.MULTI, new SleepingWaitStrategy()); LongEventHandler handler = new LongEventHandler("p1-s1"); LongEventHandler handler1 = new LongEventHandler("p2-s2"); disruptor.handleEventsWith(handler) .handleEventsWith(handler1); LongEventProducer producer = new LongEventProducer(disruptor.getRingBuffer()); disruptor.start(); for (int i = 0; i < 1000; i++) { producer.onData(i); } }
|
可以发现p2-s2的处理都在p1-s2之后完成
Single producer-work pool(多个consumer竞争消费),这种模式是多个handler并发的竞争同一个Event消息,需要实现WorkHandler接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class LongEventHandler2 implements WorkHandler { private final String name; public LongEventHandler2(String name){ this.name = name; } @Override public void onEvent(LongEvent event) throws Exception { System.out.println(this.name + ": " + event); } public static void main(String[] args){ int bufferSize = 1024; Executor executor = Executors.newCachedThreadPool(); Disruptor disruptor = new Disruptor(LongEvent.FACTORY, bufferSize, executor, ProducerType.MULTI, new SleepingWaitStrategy()); LongEventHandler2 handler = new LongEventHandler2("p1-s1"); LongEventHandler2 handler1 = new LongEventHandler2("p2-s2"); disruptor.handleEventsWithWorkerPool(handler, handler1); LongEventProducer producer = new LongEventProducer(disruptor.getRingBuffer()); disruptor.start(); for (int i = 0; i < 1000; i++) { producer.onData(i); } } }
|
这三种基本模式还可以组合到一起实现更为复杂的处理逻辑,具体的使用方面同学们可以自己琢磨,最主要的是对RingBuffer的使用, 详细的可以参考Github上的测试代码
关于等待策略WaitStrategy的选择需要特别注意,默认是选择BlockingWaitStrategy。
- BlockingWaitStrategy:这个策略的内部适用一个锁和条件变量来控制线程的执行和等待(Java基本的同步方法)。BlockingWaitStrategy是最慢的等待策略,但也是CPU使用率最低和最稳定的选项。然而,可以根据不同的部署环境调整选项以提高性能。
- SleepingWaitStrategy: 和BlockingWaitStrategy一样,SpleepingWaitStrategy的CPU使用率也比较低。它的方式是循环等待并且在循环中间调用LockSupport.parkNanos(1)来睡眠,(在Linux系统上面睡眠时间60µs).然而,它的优点在于生产线程只需要计数,而不执行任何指令。并且没有条件变量的消耗。但是,事件对象从生产者到消费者传递的延迟变大了。SleepingWaitStrategy最好用在不需要低延迟,而且事件发布对于生产者的影响比较小的情况下。比如异步日志功能
- YieldingWaitStrategy:YieldingWaitStrategy是可以被用在低延迟系统中的两个策略之一,这种策略在减低系统延迟的同时也会增加CPU运算量。YieldingWaitStrategy策略会循环等待sequence增加到合适的值。循环中调用Thread.yield()允许其他准备好的线程执行。如果需要高性能而且事件消费者线程比逻辑内核少的时候,推荐使用YieldingWaitStrategy策略。例如:在开启超线程的时候
- BusySpinWaitStrategy:BusySpinWaitStrategy是性能最高的等待策略,同时也是对部署环境要求最高的策略。这个性能最好用在事件处理线程比物理内核数目还要小的时候。例如:在禁用超线程技术的时候。
使用的时候需要根据自己的实际需求选择。
使用队列来实现生产者消费者模式的问题:
- 只适应单个生产者,单个消费者的情况
- 队列头和尾,会被频繁修改
- 因为生产者和消费者的速度不一致,队列常常是空的,或者是满的,而且这种情况经常发生。
disruptor获取一个游标后,必须要publish,放到finally代码块中,否则可能会阻塞其他producer对RingBuffer的Commit(Publish)
http://ifeve.com/disruptor/