企业做网站电话约见客户的对话对于网站反爬虫如何做
2026/4/4 7:31:11 网站建设 项目流程
企业做网站电话约见客户的对话,对于网站反爬虫如何做,惠州制作公司网站,做网站用什么语言简单1.1#xff1a;MQ如何保证消息不丢失 1.1.1#xff1a;哪些环节会有丢消息的可能#xff1f; 其中#xff0c;1#xff08;发送消息的时候#xff09;#xff0c;2#xff08;消息到达服务端持久化的时候#xff09;#xff0c;4#xff08;消费组消费消息的时候MQ如何保证消息不丢失1.1.1哪些环节会有丢消息的可能其中1发送消息的时候2消息到达服务端持久化的时候4消费组消费消息的时候三个场景都是跨网络的而跨网络就肯定会有丢消息的可能。然后关于3这个环节通常MQ存盘时都会先写入操作系统的缓存page cache中然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差就可能会造成消息丢失。如果服务挂了缓存中还没有来得及写入硬盘的消息就会丢失。1.1.2生产者发送消息如何保证消息不丢失生产者确认生成者发送消息后服务端给生产者一个确认的通知告知生产者这个消息在broker是否写入完成了。1rocketmqrocketmq中提供了三种不同的发送消息的方式//异步发送这种不需要broker确认会有丢失消息的可能 myProducer.sendOneway(msg); System.out.println(异步发送成功sendOneway); //同步发送需要等待broker的确认消息最安全但是效率低 sendResult myProducer.send(msg, 20 * 1000); System.out.println(发送成功同步send sendResult); // 这种为异步发生方式,不会阻塞会返回结果收到broker确认后会调用回调函回。在效率与安全之间比较均衡但不是这种就是最好的。因为使用这种方式那么主线程就不能立即杀掉也就是说不能立马调用myProducer.shutdown();因为主线程一旦杀掉那么子线程获取返回结果的也就不能获取到结果了对性能消耗比较大会增加客服端的负担。 myProducer.send(msg, new SendCallback() { Override public void onSuccess(SendResult sendResult) { System.out.println(发送成功异步 sendResult); } Override public void onException(Throwable e) { System.out.println(发送失败异步 e.getMessage()); } });2kafkakafka同样也提供了这种异步和同步的发送方式//直接send发送消息返回的是一个Future。这就相当于异步调用 FutureRecordMetadata future producer.send(record); //调用future的get方法才会实际获取到发送的结果生产者收到这个结果后就可以指定消息是否成功发送到broker了这个过程就变成了一个同步的过程。 RecordMetadata recordData producer.send(record).get();3RabbitMQRabbitMQ则是提供了一个Publisher Confirms生产者确认机制Publisher收到Broker响应后再触发对应的回调方法。//获取Channel Channel channel ? //添加两个回调一个处理ack响应一个nack响应 channel.addConfirmListener(ConfirmCallback ackCallBack, ConfirmCallback nackCallBack);这些不同的处理方法的背后都是一个思路那就是给生产者响让生产者知道消息有没有发送成功如果没有也可以由生产者自行补救重发也可以抛出异常反正都是让生产者自行处理。4RocketMq的事务消息机制RocketMQ的事务消息机制就是为了保证零丢失来设计的并且经过阿里的验证肯定是非常靠谱的。详情见本章节5.81为什么要发送个half消息有什么用这个half消息是在订单系统进行下单操作前发送并且对下游服务的消费者是不可见的。那这个消息的作用更多的体现在确认RocketMQ的服务端是否正常。相当于嗅探下RocketMQ服务是否正常并且通知RocketMQ我马上就要发一个很重要的消息了你做好准备。2half消息如果写入失败了怎么办如果没有half消息这个流程那我们通常是会在订单系统中先完成下单再发送消息给MQ。这时候写入消息到MQ如果失败就会非常尴尬了。而half消息如果写入失败我们就可以认为MQ的服务是有问题的这时就不能通知下游服务了。我们可以在下单时给订单一个状态标记然后等待MQ服务正常后再进行补偿操作等MQ服务正常后重新下单通知下游服务。3订单系统写数据库失败了怎么办如果没有使用事务消息我们只能判断下单失败抛出了异常那就不往MQ发消息了这样至少保证不会对下游服务进行错误的通知。但是这样的话如果过一段时间数据库恢复过来了这个消息就无法再次发送了。当然也可以设计另外的补偿机制例如将订单数据缓存起来再启动一个线程定时尝试往数据库写。而如果使用事务消息机制就可以有一种更优雅的方案。如果下单时写数据库失败(可能是数据库崩了需要等一段时间才能恢复)。那我们可以另外找个地方把订单消息先缓存起来(Redis、文本或者其他方式)然后给RocketMQ返回一个UNKNOWN状态。这样RocketMQ就会过一段时间来回查事务状态。我们就可以在回查事务状态时再尝试把订单数据写入数据库如果数据库这时候已经恢复了那就能完整正常的下单再继续后面的业务。这样这个订单的消息就不会因为数据库临时崩了而丢失。4half消息写入成功后RocketMQ挂了怎么办在事务消息的处理机制中未知状态的事务状态回查是由RocketMQ的Broker主动发起的。也就是说如果出现了这种情况那RocketMQ就不会回调到事务消息中回查事务状态的服务。这时我们就可以将订单一直标记为新下单的状态。而等RocketMQ恢复后只要存储的消息没有丢失RocketMQ就会再次继续状态回查的流程。5下单成功后如何优雅的等待支付成功最简单的方式是启动一个定时任务每隔一段时间扫描订单表比对未支付的订单的下单时间将超过时间的订单回收。这种方式显然是有很大问题的需要定时扫描很庞大的一个订单信息这对系统是个不小的压力。其次是可以使用RocketMQ提供的延迟消息机制。往MQ发一个延迟1分钟的消息消费到这个消息后去检查订单的支付状态如果订单已经支付就往下游发送下单的通知。而如果没有支付就再发一个延迟1分钟的消息。最终在第十个消息时把订单回收。这个方案就不用对全部的订单表进行扫描而只需要每次处理一个单独的订单消息。如果使用上了事务消息。我们就可以用事务消息的状态回查机制来替代定时的任务。在下单时给Broker返回一个UNKNOWN的未知状态。而在状态回查的方法中去查询订单的支付状态。这样整个业务逻辑就会简单很多。我们只需要配置RocketMQ中的事务消息回查次数(默认15次)和事务回查间隔时间(messageDelayLevel)就可以更优雅的完成这个支付状态检查的需求。6事务消息机制的作用整体来说在订单这个场景下消息不丢失的问题实际上就还是转化成了下单这个业务与下游服务的业务的分布式事务一致性问题。而事务一致性问题一直以来都是一个非常复杂的问题。而RocketMQ的事务消息机制实际上只保证了整个事务消息的一半他保证的是订单系统下单和发消息这两个事件的事务一致性而对下游服务的事务并没有保证。但是即便如此也是分布式事务的一个很好的降级方案。1.1.3Broker刷盘写入数据如何保证不丢失1RocketMq保证消息在broker端不丢失就涉及到一个概念就是PageCache缓存。数据会先写入缓存再刷到磁盘这个过程中如果服务器发生了异常比如断电等等缓存中的数据还没写入到磁盘数据就会丢失。所以我们就可以采用同步刷盘的方式可以调用操作系统的提供的sync系统调用申请一次刷盘操作主动的将PageCache中的数据写入到磁盘RocketMq的broker提供了配置项flushDiskType有两个可选项分别是SYNC_FLUSH同步刷盘和ASYNC_FLUSH异步刷盘SYNC_FLUSH同步刷盘Broker每往日志文件中写入一条数据就会申请一次刷盘操作。ASYNC_FLUSH异步刷盘Broker每隔固定的时间可以配置默认200ms才会去调用一次刷盘操作。异步刷盘性能更加文档但是会有丢失消息的可能。同步刷盘安全性更高但是操作系统的压力会更大。在RocketMq中就算是同步刷盘其实也不是写一次消息就刷一次盘其同步刷盘方式实现方式其实是以10ms的间隔去调用的刷盘操作从理论上来说还是会有丢失消息的可能但是这一套同步刷盘机制已经很不错了可以满足绝大部分业务场景。2Kafka在kafka中并没有同步刷盘和异步刷盘的区别不过可以使用一些参数来管理刷盘的频率flsh.ms:多久进行一次强制刷盘 log.flsh.interval.message:表示的那个同一个Partiton的消息数量达到这个数量时就会申请一次刷盘操作默认是Long.MAX log.flsh.interval.ms:当一个消息在内存中保留的时间达到这个值的时候就会申请一次刷盘操作默认值是空的如果这个为空那么下一个参数就会生效 log.flsh.scheduler.interval.ms:检查是否有日志文件需要进行刷盘的评率默认值也是Long.MAX3RabbitMq对于Classic经典队列即便声明了持久化RabbitMQ服务端也不会实时调用fsync因此无法保证服务端消息不丢失对于Stream流式队列则更加直接不会调用fsync进行刷盘而是交由操作系统自行刷盘。1.1.4Broker主从同步如何保证消息不丢失对于RocketMq的broker来说通常slave的作用就是做数据备份的那个master节点失效宕机甚至是磁盘坏了后就可以从slave子节点获取信息但是如果主从同步的时候失败了那么在broker中这一层保证就会失效因此主从同步也有可能会造成数据丢失。这里我们就可以用Dleger高可用集群。1RocketMQ的消息持久化机制CommitLog:消息真正的存储文件,所有的消息都存在CommitLog文件中。RoktMO默认会将数据先存储在内存中俗一个缓存区中每当缓冲区中的数据积累到一定的数量或者一定的时间后就会将缓存区中的消品批量的写入到磁盘上的CommitLog文件中。消息写入CommitLog文件中后就可以被消费者消费了。Commitlog文件的大小固定1G写满之后生成新的文件并且采用的是顺序写的方式。ConsumeQueue:消息消费的逻辑队列类似数据库的索引文件。RoketMQ 中每个主题下的每个消息队列都会对应一个 ConsumeQueueConsumeQueu存储了消息的ofise以及该offset对应的消息在CommitLog文件中的位置信息便于消费者速定位并消费消息。每个ConsumeQueue文件固定由30万个固定大小2obyte的数据块组成。内容包括msgPhyOffset(8byte消息在文件中的其实位置)msgSize4byte消息在文件中占用的长度msgTagCode8byte消息tag的hash值。IndexFile:消息索引文件主要存储消息Key与offset的对应关系提升消息检索速度。如果生产者在发送消息时设置了消息Key那么RocketMQ会将消息Key值和消息的物理偏移量(offset)存储在IndexFile文件中这样当消费者需要根据消息Key查消息时就可以直接在IndexFile文件中查找对应的offset然后通过ConsumeQueue文件快速定位并消费消息。2三个角色构成的消息储存结构如下3消息存储过程4Dleger高可用集群dleger文件同步过程在使用Dledger技术搭建的RocketMQ集群中Dledger会通过两阶段提交的方式保证文件在主从之间成功同步。简单来说数据同步会通过两个阶段一个是uncommitted阶段一个是commited阶段。Leader Broker上的Dledger收到一条数据后会标记为uncommitted状态然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。接着Follower Broker的DledgerServer收到uncommitted消息之后必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后就会把消息标记为committed状态。再接下来 Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer让他们把消息也标记为committed状态。这样就基于Raft协议完成了两阶段的数据同步。1.1.5消费者如何保证消息不丢失消费组在消息处理完成后需要给broker一个响应表示消息被正常处理了。如果broker端没有收到这个响应不管是consumer没有拿到这个消息还是处理完成了没有给出响应broker端都会认为没有处理成功就会重新投递这些消息。rockermq和kafka是依据offset机制来重新投递的而Rabbitmq消息重新入队来处理的。正常情况下消费者端都是需要先处理本地事务然后再给MQ一个ACK响应这时MQ就会修改Offset将消息标记为已消费从而不再往其他消费者推送消息。所以在Broker的这种重新推送机制下消息是不会在传输过程中丢失的。但是如果消费者端启动新线程来处理业务逻辑然后主线程中给broker响应CONSUM_SUCCESS结果处理业务逻辑的线程执行失败了也是会造成消息丢失的所以不建议这么做或者要控制好线程并发的异常情况。1.1.6MQ服务全挂了如何保证不丢失设计一个降级方案来处理这个问题了。例如在订单系统中如果多次尝试发送RocketMQ不成功那就只能另外找给地方(Redis、文件或者内存等)把订单消息缓存下来然后起一个线程定时的扫描这些失败的订单消息尝试往RocketMQ发送。这样等RocketMQ的服务恢复过来后就能第一时间把这些消息重新发送出去。1.1.7MQ消息零丢失方案总结生产者使用事务消息机制。Broker配置同步刷盘Dledger主从架构消费者不要使用异步消费。整个MQ挂了之后准备降级方案1.2MQ如何保证消息顺序代码见博客5.5章节MQ的顺序问题分为全局有序和局部有序。全局有序整个MQ系统的所有消息严格按照队列先入先出顺序进行消费。局部有序只保证一部分关键消息的消费顺序。其实在大部分的MQ业务场景我们只需要能够保证局部有序就可以了。例如我们用QQ聊天只需要保证一个聊天窗口里的消息有序就可以了。而对于电商订单场景也只要保证一个订单的所有消息是有序的就可以了。至于全局消息的顺序并不会太关心。而通常意义下全局有序都可以压缩成局部有序的问题。例如以前我们常用的聊天室就是个典型的需要保证消息全局有序的场景。但是这种场景通常可以压缩成只有一个聊天窗口的QQ来理解。即整个系统只有一个聊天通道这样就可以用QQ那种保证一个聊天窗口消息有序的方式来保证整个系统的全局消息有序。会通过MessageQueue轮询的方式保证消息尽量均匀的分布到所有的MessageQueue上而消费者也就同样需要从多个MessageQueue上消费消息。而MessageQueue是RocketMQ存储消息的最小单元他们之间的消息都是互相隔离的在这种情况下是无法保证消息全局有序的。而对于局部有序的要求只需要将有序的一组消息都存入同一个MessageQueue里这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。RocketMQ中可以在发送者发送消息时指定一个MessageSelector对象让这个对象来决定消息发入哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里。1.2.1生产者端发送消息使用MessageQueueSelector编写有序消息生产者有序消息生产者会按照一定的规则将消息发送到同一个队列中从而保证同一个队列中的消息是有序的。RocketMQ并不保证整个主题内所有队列的消息都是按照发送顺序排列的。public class Producer { public static void main(String[] args) throws MQClientException { try { DefaultMQProducer producer new DefaultMQProducer(OrderlyMessageTest); producer.setNamesrvAddr(localhost:9876); producer.start(); String[] tags new String[] {TagA, TagB, TagC, TagD, TagE}; for (int i 0; i 20; i) { int orderId i % 10; Message msg new Message(OrderlyTopic, tags[i % tags.length], KEY i, (Hello RocketMQ i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult producer.send(msg, new MessageQueueSelector() { Override public MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) { Integer id (Integer) arg; int index id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf(%s%n, sendResult); } producer.shutdown(); } catch (Exception e) { e.printStackTrace(); throw new MQClientException(e.getMessage(), null); } } }1.2.2消费者端消费消息1push模式使用MessageListenerOrderly进行顺序消费与之对应的MessageListenerConcurrently并行消费这种不能保证消费的顺序。MessageListenerOrderly是RocketMq专门提供的一种顺序消费的接口他可以让消费组按照消息发送的顺序一个一个的处理。通过加队列锁的方式实现(有超时机制)一个队列同时只有一个消费者;并且存在一个定时任务每隔一段时间就会延长锁的时间直到整个消息队列全部消费结束。public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer new DefaultMQPushConsumer(OrderlyMessageTest); consumer.setNamesrvAddr(localhost:9876); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe(OrderlyTopic, TagA || TagC || TagD); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes new AtomicLong(0); Override public ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) 0) { return ConsumeOrderlyStatus.SUCCESS; } else if ((this.consumeTimes.get() % 5) 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf(Consumer Started.%n); } }这个时候如果消费失败了如果选中的消费模式为MessageListenerConcurrently那么就会返回RECONSUME_LATER将失败的这个消息转发到重试队列中然后接着消费后面的消息这就无法保证消费的顺序了。相反如果选择的是MessageListenerOrderly那么就会返回SUSPEND_CURRENT_QUEUE_A_MOMENT。意思就是阻塞一段时间当前的队列然后继续从失败的哪里开始消费。2pull模式消费者端自己保证消费的顺序消费组并发消费时保证消费线程数为1。RocketMq的消费者可以开启多个消费线程同时消费一个队列中的消息如果要保证消费的顺序需要加消费线程数设置为1这样在同一个队列中每个消息只会被单个消线程消费从而保证消息的顺序性。另外通常所谓的保证Topic全局消息有序的方式就是将Topic配置成只有一个MessageQueue队列(默认是4个)。这样天生就能保证消息全局有序了。这个说法其实就是我们将聊天室场景压缩成只有一个聊天窗口的QQ一样的理解方式。而这种方式对整个Topic的消息吞吐影响是非常大的如果这样用基本上就没有用MQ的必要了。这个接口支持按照消息的重试次数进行顺序消费、订单ID等作为消息键来实现顺序消费、批量消费等操作。1.3MQ如何保证消息的幂等性1.4MQ如何快速处理积压的消息

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询