2026/2/13 22:40:51
网站建设
项目流程
帮忙找人做网站,云端建站快车,深圳专门做写字楼的网站,wordpress 在线演示Disruptor 是一个开源的高性能内存队列#xff0c;由英国外汇交易公司 LMAX 开发的#xff0c;获得了 2011 年的 Oracle 官方的 Dukes Choice Awards(Duke 选择大奖)。
Disruptor 提供的功能类似于 Kafka 、RocketMQ 这类分布式队列#xff0c;不过#xff0c;其作为范围…Disruptor 是一个开源的高性能内存队列由英国外汇交易公司 LMAX 开发的获得了 2011 年的 Oracle 官方的 Dukes Choice Awards(Duke 选择大奖)。Disruptor 提供的功能类似于 Kafka 、RocketMQ 这类分布式队列不过其作为范围是 JVM(内存)。Disruptor 解决了 JDK 内置线程安全队列的性能和内存安全问题。JDK 中常见的线程安全的队列如下队列名字锁是否有界ArrayBlockingQueue加锁ReentrantLock有界LinkedBlockingQueue加锁ReentrantLock有界LinkedTransferQueue无锁CAS无界ConcurrentLinkedQueue无锁CAS无界从上表中可以看出这些队列要不就是加锁有界要不就是无锁无界。而加锁的的队列势必会影响性能无界的队列又存在内存溢出的风险。因此一般情况下我们都是不建议使用 JDK 内置线程安全队列。Disruptor 就不一样了它在无锁的情况下还能保证队列有界并且还是线程安全的。1.广播场景广播场景在我们的开发工作中并不少见比如系统收到上游系统的一个请求消息然后把这个消息发送给多个下游系统来处理。Disruptor 支持广播模式。比如消费者生产的消息由三个消费者来消费public class Broadcast { public static void main(String[] args) throws InterruptedException { int bufferSize 1024; DisruptorLongEvent disruptor new Disruptor(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); EventHandlerLongEvent consumer1 new LongEventHandler(consumer1); EventHandlerLongEvent consumer2 new LongEventHandler(consumer2); EventHandlerLongEvent consumer3 new LongEventHandler(consumer3); disruptor.handleEventsWith(consumer1, consumer2, consumer3); disruptor.start(); RingBufferLongEvent ringBuffer disruptor.getRingBuffer(); ByteBuffer bb ByteBuffer.allocate(8); for (long l 0; true; l) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence, buffer) - event.set(buffer.getLong(0)), bb); Thread.sleep(1000); } } }2.日志收集再来看一个日志收集的例子。这里我们假设一个场景业务系统集群有 3 个节点每个节点打印的业务日志发送到 DisruptorDisruptor 下游有 3 个消费者负责日志收集。这里我们需要重新定义一个日志收集处理类代码如下public class LogCollectHandler implements WorkHandlerLongEvent { public LogCollectHandler(String consumer) { this.consumer consumer; } private String consumer; Override public void onEvent(LongEvent event) { System.out.println(consumer: consumer ,Event: event); } }下面这个代码是绑定消费者的代码public static void main(String[] args) throws InterruptedException { int bufferSize 1024; DisruptorLongEvent disruptor new Disruptor(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); WorkHandlerLongEvent consumer1 new LogCollectHandler(consumer1); WorkHandlerLongEvent consumer2 new LogCollectHandler(consumer2); WorkHandlerLongEvent consumer3 new LogCollectHandler(consumer3); disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3); disruptor.start(); }需要注意的是上面使用的是 Disruptor 的handleEventsWithWorkerPool方法使用的消费者不是EventHandler而是WorkHandler。消费者组里面的消费者如果是WorkHandler那消费者之间就是有竞争的比如一个 Event 已经被 consumer1 消费过那就不再会被其他消费者消费了。消费者组里面的消费者如果是EventHandler那消费者之间是没有竞争的所有消息都会消费。3.责任链责任链这种设计模式我们都比较熟悉了同一个对象的处理有多个不同的逻辑每个逻辑作为一个节点组成责任链比如收到一条告警消息处理节点分为给开发人员发送邮件、给运维人员发送短信、给业务人员发送 OA 消息。Disruptor 支持链式处理消息看下面的示例代码public static void main(String[] args) throws InterruptedException { int bufferSize 1024; DisruptorLongEvent disruptor new Disruptor(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); EventHandlerLongEvent consumer1 new LongEventHandler(consumer1); EventHandlerLongEvent consumer2 new LongEventHandler(consumer2); EventHandlerLongEvent consumer3 new LongEventHandler(consumer3); disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3); disruptor.start(); }Disruptor 也支持多个并行责任链下图是 2 条责任链的场景图片这里给出一个示例代码public static void main(String[] args) throws InterruptedException { int bufferSize 1024; DisruptorLongEvent disruptor new Disruptor(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); EventHandlerLongEvent consumer1 new LongEventHandler(consumer1); EventHandlerLongEvent consumer2 new LongEventHandler(consumer2); EventHandlerLongEvent consumer3 new LongEventHandler(consumer3); EventHandlerLongEvent consumer4 new LongEventHandler(consumer4); EventHandlerLongEvent consumer5 new LongEventHandler(consumer5); EventHandlerLongEvent consumer6 new LongEventHandler(consumer6); disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3); disruptor.handleEventsWith(consumer4).then(consumer5).then(consumer6); disruptor.start(); }4.多任务协作一个经典的例子我们在泡咖啡之前需要烧水、洗被子、磨咖啡粉这三个步骤可以并行但是需要等着三步都完成之后才可以泡咖啡。当然这个例子可以用 Java 中的 CompletableFuture 来实现代码如下public static void main(String[] args){ ExecutorService executor ...; CompletableFuture future1 CompletableFuture.runAsync(() - { try { washCup(); } catch (InterruptedException e) { e.printStackTrace(); } }, executor); CompletableFuture future2 CompletableFuture.runAsync(() - { try { hotWater(); } catch (InterruptedException e) { e.printStackTrace(); } }, executor); CompletableFuture future3 CompletableFuture.runAsync(() - { try { grindCoffee(); } catch (InterruptedException e) { e.printStackTrace(); } }, executor); CompletableFuture.allOf(future1, future2, future3).thenAccept( r - { System.out.println(泡咖啡); } ); System.out.println(我是主线程); }同样使用 Disruptor 也可以实现这个场景看下面代码public static void main(String[] args) throws InterruptedException { int bufferSize 1024; DisruptorLongEvent disruptor new Disruptor(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); EventHandlerLongEvent consumer1 new LongEventHandler(consumer1); EventHandlerLongEvent consumer2 new LongEventHandler(consumer2); EventHandlerLongEvent consumer3 new LongEventHandler(consumer3); EventHandlerLongEvent consumer4 new LongEventHandler(consumer4); disruptor.handleEventsWith(consumer1, consumer2, consumer3).then(consumer4); disruptor.start(); }5.多消费者组类比主流消息队列的场景Disruptor 也可以实现多消费者组的场景组间并行消费互不影响组内消费者竞争消息如下图示例代码如下public static void main(String[] args) throws InterruptedException { int bufferSize 1024; DisruptorLongEvent disruptor new Disruptor(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); WorkHandlerLongEvent consumer1 new LogWorkHandler(consumer1); WorkHandlerLongEvent consumer2 new LogWorkHandler(consumer2); WorkHandlerLongEvent consumer3 new LogWorkHandler(consumer3); WorkHandlerLongEvent consumer4 new LogWorkHandler(consumer4); WorkHandlerLongEvent consumer5 new LogWorkHandler(consumer5); WorkHandlerLongEvent consumer6 new LogWorkHandler(consumer6); disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3); disruptor.handleEventsWithWorkerPool(consumer4, consumer5, consumer6); disruptor.start(); }6.总结通过消费者的灵活组合Disruptor 的使用场景非常丰富。本文介绍了 Disruptor 的 5 个典型使用场景。在选型的时候除了使用场景更多地要考虑到 Disruptor 作为高性能内存队列的这个特点。