0x00摘要0x01综述功能0x02示例代码0x03定义3.1定义3.2Queue0x04Init4.1处理调用4.1.1queues4.1.2channel4.1.3on_message4.2建立联系4.2.1channel与queue4.2.2channel与exchange4.2.3ExchangeBinding4.2.3.1Channelbinding4.2.3.2使用0x05完善联系5.1遍历Queue5.2consumeinQueue5.3consumeinChannel0x06消费消息6.1drain_eventsinConnection6.2drain_eventsinTransport6.3getinMultiChannelPoller6.3.1_register_BRPOPinMultiChannelPoller6.3.2registerin_poll6.3.3poll(timeout)inMultiChannelPoller6.3.4注册到redis驱动,负载均衡6.3.4handle_eventinMultiChannelPoller6.3.5on_readableinMultiChannelPoller6.3.6_brpop_readinChannel6.3.7从redis读取6.3.8回到_brpop_read6.3.9_deliverinTransport6.3.10basic_consumeinChannel6.3.11_receive_callbackinConsumer0xFF参考
0x00摘要本系列我们介绍消息队列Kombu。Kombu的定位是一个兼容AMQP协议的消息队列抽象。通过本文,大家可以了解Kombu中的Consumer概念。
0x01综述功能Consumer的作用主要如下:
Exchange:MQ路由,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。
Queue:对应的队列抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息;
Consumers是接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息。就是说,从用户角度,知道了一个exchange,就可以从中读取消息,具体这个消息就是从queue中读取的。
在具体的实现中,Consumer把queue与channel联系起来。queue里面有一个channel,用来访问redis。Queue也有Exchange,知道访问具体redis哪个key(就是queue对应的那个key)。即Consumer消费消息是通过Queue来消费,然后Queue又转嫁给Channel。
所以服务端的逻辑大致为:
建立连接;
创建Exchange;
创建Queue,并将Exchange与Queue绑定,Queue的名称为routing_key;
创建Consumer对Queue监听;
0x02示例代码下面使用如下代码来进行说明。
本示例来自