RabbitMq学习笔记(二)—— 工作队列(WorkQueue)

WorkQueue:有序的处理消息(etc:流水线作业)


消费者消费数据

boolean durable = true;//持久化,保证数据不丢失

channel.queueDeclare("queue_task", durable, false, false, null); 

        //避免负载不均衡,保证一个消费者处理一个消息

        int prefetchCount = 1;

        channel.basicQos(prefetchCount);

        

        //消费者

        Consumer consumer = new DefaultConsumer(channel){

        @Override

        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,

        byte[] body) throws IOException {

        

        String message = new String(body, "UTF-8");


                System.out.println("Worker 接收 '" + message + "'");

                try {

                    doWork(message);

                } finally {

                    System.out.println("Worker 处理完成");

                    //消息处理完成确认

                    //如果发生异常,未处理完的消息,将会重新放回队列

                    channel.basicAck(envelope.getDeliveryTag(), false);

                }

        }

        };

        //消息消费完成确认

        channel.basicConsume("queue_task", false, consumer);



工作队列

主要场景是消费者需要根据消息中的内容进行业务逻辑处理,这种消息可以看成是一个任务指令,处理起来比较耗时,通过多个消费者来处理这些消息,来提高数据的吞吐能力。工作队列(即任务队列)的主要思想是不用一直等待资源密集型的任务处理完成,这就像一个生产线,将半成品放到生产线中,然后在生产线后面安排多个工人同时对半成品进行处理,这样比一个生产线对应一个工人的吞吐量大几个数量级。


消息确认

如果处理一条消息需要几秒钟的时间,你可能会想,如果在处理消息的过程中,消费者服务器、网络、网卡出现故障挂了,那可能这条正在处理的消息或者任务就没有完成,就会失去这个消息和任务。 
为了确保消息或者任务不会丢失,RabbitMQ支持消息确认–ACK。ACK机制是消费者端从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。如果一个消费者在处理消息时挂掉(网络不稳定、服务器异常、网站故障等原因导致频道、连接关闭或者TCP连接丢失等),那么他就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将此消息重新放入队列中。如果有其他消费者同时在线,RabbitMQ会立即将这个消息推送给这个在线的消费者。这种机制保证了在消费者服务器故障的时候,能不丢失任何消息和任务。 
如果RabbitMQ向消费者发送消息时,消费者服务器挂了,消息也不会有超时;即使一个消息需要非常长的时间处理,也不会导致消息超时。这样消息永远不会从RabbitMQ服务器中删除。只有当消费者正确的发送ACK确认反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。 


消息的ACK确认机制默认是打开的。


忘记确认 


忘记通过basicAck返回确认信息是常见的错误。这个错误非常严重,将导致消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,而且RabbitMQ也不会主动删除这些被退回的消息。 
如果要监控这种错误,可以使用rabbitmqctl messages_unacknowledged命令打印出出相关的信息。


消息持久化

标记为持久化后的消息也不能完全保证不会丢失。虽然已经告诉RabbitMQ消息要保存到磁盘上,但是理论上,RabbitMQ已经接收到生产者的消息,但是还没有来得及保存到磁盘上,服务器就挂了(比如机房断电),那么重启后,RabbitMQ中的这条未及时保存的消息就会丢失。因为RabbitMQ不做实时立即的磁盘同步(fsync)。这种情况下,对于持久化要求不是特别高的简单任务队列来说,还是可以满足的。如果需要更强大的保证,那么你可以考虑使用生产者确认反馈机制。


负载均衡

默认情况下,RabbitMQ将队列消息随机分配给每个消费者,这时可能出现消息调度不均衡的问题。例如有两台消费者服务器,一个服务器可能非常繁忙,消息不断,另外一个却很悠闲,没有什么负载。RabbitMQ不会主动介入这些情况,还是会随机调度消息到每台服务器。 
这是因为RabbitMQ此时只负责调度消息,不会根据ACK的反馈机制来分析那台服务器返回反馈慢,是不是处理不过来啊?

就像下面这个图: 

int prefetchCount = 1;channel.basicQos(prefetchCount);

为了解决这个问题,我们可以使用【prefetchcount = 1】这个设置。这个设置告诉RabbitMQ,不要一次将多个消息发送给一个消费者。这样做的好处是只有当消费者处理完成当前消息并反馈后,才会收到另外一条消息或任务。这样就避免了负载不均衡的事情了。




以上转载

http://blog.csdn.net/chwshuang/article/details/50512057

RabbitMq在线API文档

http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/


评论

© dzxlovelar | Powered by LOFTER