RabbitMq学习笔记(六)—— RPC调用

RpcServer

            //生成Rpc队列

            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);  

            channel.basicQos(1);  

            //创建Rpc队列消费者

            QueueingConsumer consumer = new QueueingConsumer(channel);  

            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);  

            System.out.println("RPCServer [x] Awaiting RPC requests");  

            while (true) {  

                String response = "";      //返回RpcClient回调结果

                //获取队列中消息,如果没有则会一直阻塞

                QueueingConsumer.Delivery delivery = consumer.nextDelivery();    

                BasicProperties props = delivery.getProperties();  

                //开启Rpc

                BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();  

               try {  

                    //获取消息

                    String message = new String(delivery.getBody(), "UTF-8");  

                    int n = Integer.parseInt(message);  

                    System.out.println("RPCServer [.] fib(" + message + ")");  

                    response = "" + fib(n);  

                } catch (Exception e) {  

                    System.out.println(" [.] " + e.toString());  

                    response = "";  

                } finally {  

                    //完成

                    channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));  

                    //传输失败,回退

                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

                }  


RpcClient

    private Connection connection;  

    private Channel channel;  

    private String requestQueueName = "rpc_queue";  

    private String replyQueueName;  

    private QueueingConsumer consumer;  

    

    public RpcClient() throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException{

         ConnectionFactory factory = new ConnectionFactory();  

         factory.setUri("amqp://admin:admin@172.16.41.31:5672");  

         connection = factory.newConnection();  

         channel = connection.createChannel();  

         replyQueueName = channel.queueDeclare().getQueue();  

         consumer = new QueueingConsumer(channel);  

         channel.basicConsume(replyQueueName, true, consumer);  

    }

    

    //计算结果,返回给RpcServer

    public String call(String message) throws Exception {  

    String response = null;  

        String corrId = UUID.randomUUID().toString();  

        BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();  

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));  

        while (true) {  

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

            if (delivery.getProperties().getCorrelationId().equals(corrId)) {  

                response = new String(delivery.getBody(), "UTF-8");  

                break;  

            }  

        }  

        return response;  

    }  

    

    public void close() throws Exception {  

        connection.close();  

    }  




远程方法调用的注意事项:

        RPC在软件开发中非常常见,也经常被批评。当一个程序员对代码不熟悉的时候,跟踪RPC的性能问题是出在本地还是远程服务器就非常麻烦,对于RPC的使用,有几点需要特别说明:

  • 使用远程调用时的本地函数最好独立出来

  • 保证代码组件之间的依赖关系清晰明了,并用日志记录不同的执行过程和时间

  • 发生客户端运行缓慢或者假死时,先确认RPC服务器是否还活着!

  • 尽量使用异步队列来处理RPC请求,尽量不要用同步阻塞的方式运行RPC请求


消息属性

传输一条消息,AMQP协议预定义了14个属性,下面几个是使用比较频繁的几个属性:

  • deliveryMode:配置一个消息是否持久化。(2表示持久化)这个在第二章中有说明。

  • contentType :用来描述编码的MIME类型。与html的MIME类型类似,例如,经常使用JSON编码是将此属性设置为一个很好的做法:application/json。

  • replyTo : 回调队列的名称。

  • correlationId:RPC响应请求的相关编号。

关联编号  Correlation Id

        如果一个客户端有很多的计算任务,按照上面的代码,我们会为每个任务创建一个请求,然后等待返回的结果,这种方法貌似很耗时,如果把所有的任务都放到同一个连接中,那么我们又没法分辨出返回的结果是那个任务的?为了解决这个问题,RabbitMQ提供了一个correlationid属性来解决这个问题。RabbitMQ为每个请求提供唯一的编号,然后在返回队列里如果看到了这个编号,就知道我们的任务处理完成了,如果收到的编号不认识,就可以安全的忽略。

        你可能会疑问,如果忽略了,那么想知道这个返回结果的客户端是不是就收不到这个结果了?这个基本上不会出现,但是,理论上也可能发生,例如一个RPC服务器,在发送确认消息前挂了,你收到的消息可能就是不完整的。这种情况,RabbitMQ会重新发送任务处理请求。这也是为什么客户端必须处理这些重复请求以及RPC启用幂次模式。

总结:


RPC工作方式:

  1. 当客户端启动时,会创建一个匿名的回调队列

  2. 在RPC请求中,定义了两个属性:replyTo,表示回调队列的名称; correlationId,表示请求任务的唯一编号,用来区分不同请求的返回结果。

  3. 将请求发送到rpc_queue队列中

  4. RPC服务器等待rpc_queue队列的请求,如果有消息,就处理,它将计算结果发送到请求中的回调队列里。

  5. 客户端监听回调队列中的消息,如果有返回消息,它根据回调消息中的correlationid进行匹配计算结果。


以上转载

http://blog.csdn.net/chwshuang/article/details/50518570


参考RabbitMQ中文文档

http://rabbitmq.mr-ping.com/tutorials_with_python/[5]Topics.html

http://redis.cn/


评论

© dzxlovelar | Powered by LOFTER