关键词:
一、简介
解释RabbitMQ,就不得不提到AMQP(Advanced Message Queuing Protocol)协议。 AMQP协议是一种基于网络的消息传输协议,它能够在应用或组织之间提供可靠的消息传输。RabbitMQ是该AMQP协议的一种实现,利用它,可以将消息安全可靠的从发 送方传输到接收方。简单的说,就是消息发送方利用RabbitMQ将信息安全的传递给接收方。
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
下面是rabbitmq 图示
二、安装
rabbitmq for windows
1 erlang:http://www.erlang.org/download/otp_win64_17.3.exe 2 3
下载地址:http://www.rabbitmq.com/download.html
安装API
1 pip install pika 2 or 3 easy_install pika 4 or 5 源码 6 7 https://pypi.python.org/pypi/pika
rabbitmq for linux
1 安装配置epel源 2 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 3 4 安装erlang 5 $ yum -y install erlang 6 7 安装RabbitMQ 8 $ yum -y install rabbitmq-server
安装API
1 pip install pika 2 or 3 easy_install pika 4 or 5 源码 6 7 https://pypi.python.org/pypi/pika
for mac:
http://my.oschina.net/u/998693/blog/547873
三、简单队列模型
基于Queue实现生产者消费者模型
#!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
基于rabbitmq的简单队列模型:在于交换机不工作的情况下
#/usr/bin/env python import pika # ######################### 生产者 ######################### #封装socket逻辑部分 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) #拿到操作句柄 channel = connection.channel() #创建队列,并且给队列取一个名字 channel.queue_declare(queue='hello') #这里面三个参数,第一个是交换机,第二个参数指定队列,第三个参数是传递的数据 #一个队列的时候exchange交换机设置为空,routing_key指定把body(内容)放到那个队列里面 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
channel.queue_declare(queue='hello') 这里创建队列的时候可能会出现durable为true的提示。
durable=Ture:就是创建持续化的队列
durable=False: 是创建非持续化的队列
如果创建channel.queue_declare(queue='hello',durable=Ture)就是创建了持续化队列。如果创建了持续化队列,就不要创建非持续化的队列
生产者:
首先导入pika模块
1、 封装socket逻辑部分
2、 拿到操作句柄,创建队列并且给这个队列取一个名字
3、 进行操作把生产的内容放入到队列中,这里有三个参数
a) 第一个残害苏是交换机,如果只有一个队列交换机默认为空,
b) 第三个参数是需要传递的内容
c) 第二个队列是设置吧内容传递给那个队列
1 #/usr/bin/env python 2 3 import pika 4 5 # ########################## 消费者 ########################## 6 #封装socket逻辑部分 7 connection = pika.BlockingConnection(pika.ConnectionParameters( 8 host='localhost')) 9 #生成句柄 10 channel = connection.channel() 11 #创建队列,内部做了判断,如果没有这个队列会创建,如果已经有了队列就不会创建 12 channel.queue_declare(queue='hello') 13 14 def callback(ch, method, properties, body): 15 print(" [x] Received %r" % body) 16 17 #回调函数,首先去指定队列中获取数据,放到body中,然后执行回调函数 18 #no_ack:等于True是无应答,如果为false就是有应答 19 #有应答是用户如果连接的时候突然断了,这个时候在队列中这个数据还保存,用户连接的时候还有 20 #这个队列会一直等这个用户。这个是比较安全的,但是效率不会太高 21 channel.basic_consume(callback, 22 queue='hello', 23 no_ack=True) 24 25 print(' [*] Waiting for messages. To exit press CTRL+C') 26 channel.start_consuming()
1、 首先导入pika模块
2、 封装socket逻辑部分,并且声称句柄
3、 创建队列(内部会做一个判断,如果这个队列没有那么就会创建这个队列,如果有就不会创建)
4、 执行回调函数来获取内容
a) 第一个参数是回调函数
b) 第二个参数指定队列,把获取的内容放入到回调函数中
c) 第三个参数应答参数用来设置是否有应答(消息丢不丢失)
四、重要参数
1、 acknowledgment消息丢不丢失(这个参数是用户如果崩溃异常消息丢不丢失)
要设置下面两部分内容:首先设置no_ack=False,然后设置下面黄色部分
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.211.55.4'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='hello',
no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
第二个参数:
durable 消息不丢失:这个是rabbitmq如果崩溃异常,数据会不会丢失
1、 在生产者中把下面黄色部分设置 :这样重启的时候数据会恢复原因是吧数据从内存放入到了磁盘
a) 创建队列的时候durable=True
b) 把数据放入到队列中的时候把delivery_mode=2
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
# make message persistent
channel.queue_declare(queue='hello', durable=True)
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent 'Hello World!'")
connection.close()
消费者
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
# make message persistent
channel.queue_declare(queue='hello', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='hello',
no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消费获取顺序
在rabbitmq中默认的消息队列中的数据是按照顺序被消费者拿走的。假如有三个消费者,默认按照顺序在队列中获取数据,但是由于机器配置等原因,有些会拿的慢,有些拿的快,默认情况下,拿的快的机器会等待拿的慢的机器获取完毕后才去获取新数据,这样就造成了机器闲置状态。如果想打破这种默认模式,而是谁获取完毕之后谁就去获取新数据,这样效率就会提高。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照顺序排列获取数据
这条数据要放到消费者里面。如下:黄色部分
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
# make message persistent
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='hello',
no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
五、exchange工作模型(fanout,direct,topic)
1、 发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中
exchange type = fanout
发布者:让数据发送到交换机中,然后由交换机决定让数据放入到那个队列中
发布者
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
#exchange=”logs”表示给交换机创建一个名字,创建了一个交换机,
channel.exchange_declare(exchange='logs',
type='fanout')
#把数据
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
#把数据放入到交换机中,这里不需要指定队列,所以routing_key为空,第三个参数是吧数据放入message
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
订阅者 #!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #1、创建交换机 channel.exchange_declare(exchange='logs', type='fanout') #2、随机创建一个队列 result = channel.queue_declare(exclusive=True) #3、给创建的队列随机命名 queue_name = result.method.queue #4、创建的队列进行绑定交换机(拿到队列的名字,交换机的名字进行绑定) channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') #7、获取了数据然后执行 def callback(ch, method, properties, body): print(" [x] %r" % body) #6、这里获取生产者的数据,然后执行callback函数 channel.basic_consume(callback, queue=queue_name, no_ack=True) #5、这里进行一个阻塞,直到生产者放出一个数据的时候就会执行上面的方法 channel.start_consuming()
生产者启动的时候会创建交换机,然后把数据放入到交换机中,然后消费者启动的时候首先创建队列,并且检查有没有交换机,如果有交换机就进行绑定,如果没有就需要自己创建一个交换机,然后进行阻塞等待生产者把数据放入到交换机中,如果有数据,那么消费者就会从交换机中获取数据通过队列获取
---------------------------------------------------------------------------------------------------------------------
2、关键字发送
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
exchange type = direct
详细解释:生产者创建交换器,消费者创建队列绑定队列并且绑定交换器。这个时候队列还需要绑定关键字,如果绑定的有关键字,那么生产者发送消息到交换器,交换器根据哪个队列绑定的有关键字,然后把数据发送到那个队列,然后那个消费者绑定这个交换器,那么那个消费者就能获取到这个数据
注意:一个队列可以绑定多个字符串
消费者
绑定交换器和队列的时候绑定关键字
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
#这里绑定的时候除了绑定交换机,和队列名字,还要绑定关键字,上面进行了for循环,所以这里是绑定了多个关键字
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
生产者:
1、创建交换器,并且让参数为direct
2、设置一个或者多个关键字
3、让关键字放入到交换器中
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
--------------------------------------------------------------------------------------------------------------------------
3、模糊匹配
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
- # 表示可以匹配 0 个 或 多个 单词
- * 表示只能匹配 一个 单词
发送者路由值 队列中
old.boy.python old.* -- 不匹配
old.boy.python old.# -- 匹配
模糊匹配和关键词匹配除了参数不一样其他的都是一样的
生产者
1、 创建交换器,并且加入模糊匹配的参数
2、 把关键字放入到交换器
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
消费者
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
六、rabbitmq的应用
http://www.rabbitmq.com/getstarted.html 这是rabbitmq教程
处理大并发设计模式,用户访问并且操作web服务器的时候,web服务器只是把用户访问操作等数据 提交到队列中,并且同时在数据库中给数据库给这个请求标记状态。
根据队列的的属性,后台服务器按照顺序处理这些请求,处理之后在数据库中返回其状态,这样用户刷新访问数据库就能看到是否处理了用户的请求。这样可以减轻web服务的并发。例子(活动抢票)
队列的应用之rpc
通过rabbitmq实现远程调用模式
用户想要调用远程多个服务器,这个时候可以产生两个队列,其中A这个端口远程服务器一直连接等待用户输入内容,其中一个队列B通过主机端口等来给远程服务器发送指令,然后服务器接收到指令之后通过用户产生的另一个队列B给用户返回消息,如果用户不想收到消息,直接把创建的另一个队列B删除掉就可以了
rabbitmq消息中间件
RabbitMQ消息中间件RabbitMQ简介windows下安装RabbitMQRabbitMQ基本概念RabbitMQ简单模式RabbitMQ工作队列模式RabbitMQ发布订阅模式RabbitMQ路由模式RabbitMQ主题模式RabbitMQRPC模式RabbitMQ发布确认模式 查看详情
rabbitmq学习系列:rabbitmq安装与配置
上一篇,简单介绍了RabbitMQ的情况还有一些相关的概念,这一篇,会讲讲 RabbitMQ安装与配置。 1.安装 RabbitMQ是建立在强大的ErlangOTP平台上,因此安装RabbitMQ之前要先安装Erlang。 erlang:http://www.erlang.org/downl... 查看详情
rabbitmq消息队列系列教程认识rabbitmq
摘要RabbitMQ是最为流行的消息中间件,是处理高并发业务的利器。本系列教程,将跟大家一起学习RabbitMQ。目录RabbitMQ是什么?RabbitMQ的特点是什么?一、RabbitMQ是什么?RabbitMQ是基于Erlang开发的目前最流行的开源消息中间件,类似... 查看详情
rabbitmq常用的命令
rabbitMQ常用的命令启动监控管理器:rabbitmq-plugins enable rabbitmq_management关闭监控管理器:rabbitmq-pluginsdisablerabbitmq_management启动rabbitmq:rabbitmq-service start关闭rabbitmq:rabbitmq-service stop查 查看详情
rabbitmq使用(代码片段)
1.rabbitmq使用 -目的:可以下载镜像,使用rabbitmq-操作流程:RabbitMQ介绍和使用1,RabbitMQ介绍消息队列是消息在传输的过程中保存消息的容器。现在主流消息队列有:RabbitMQ、ActiveMQ、Kafka等等。RabbitMQ和ActiveMQ比较:系统吞吐量:Ra... 查看详情
菜鸟刷面试题(rabbitmq篇)
目录:rabbitmq的使用场景有哪些?rabbitmq有哪些重要的角色?rabbitmq有哪些重要的组件?rabbitmq中vhost的作用是什么?rabbitmq的消息是怎么发送的?rabbitmq怎么保证消息的稳定性?rabbitmq怎么避免消息丢失?要保证消息持久化成功的... 查看详情
【rabbitmq】rabbitmq集群节点重新加入集群相关操作
参考技术ArabbitMQ架构为 rabbitMQ+keepalived 镜像模式。rabbitmq01 192.168.1.101rabbitmq01 192.168.1.102vip192.168.1.110 在rabbitmq02上现在rabbitmq01故障,起不来,队列数据同步rabbbit02有问题。解决方法是将rabbitmq01这个故障节点... 查看详情
rabbitmq一文读懂(代码片段)
目录1、RabbitMQ介绍应用场景其他消息队列选择RabbitMQ原因2、AMQP消息队列其他相关知识什么是AMQP?什么是JMS?3、RabbitMQ快速入门RabbitMQ的工作原理RabbitMQ消息发送和接受流程梳理RabbitMQ消息发送RabbitMQ消息接受RabbitMQ安装Rabbi... 查看详情
rabbitmq初识rabbitmq(代码片段)
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。哪些大厂在用RabbitMQ,为什么滴滴、美团、头条开源、性能优秀、稳... 查看详情
yum安装rabbitmq(代码片段)
安装RabbitMQ http://www.rabbitmq.com/releases/rabbitmq-server/1.下载RabbitMQ官方下载地址http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-generic-unix-3.6.1.tar.xz2.安装 RabbitMQ3. 查看详情
rabbitmq:docker环境下搭建rabbitmq集群
RabbitMQ作为专业级消息队列;如何在微服务框架下搭建 使用组件文档:https://github.com/bijukunjummen/docker-rabbitmq-cluster下载镜像:gitclonehttps://github.com/bijukunjummen/docker-rabbitmq-cluster.git cddocker-rabbitmq 查看详情
rabbitmq核心功能介绍(代码片段)
RabbitMQ核心功能一.MQ的概念与功能介绍二.RabbitMQ的介绍和入门案例三.RabbitMQ的工作队列四.RabbitMQ的工作模式五.RabbitMQ的发布确认六.RabbitMQ的死性队列七.RabbitMQ的延迟队列本文对RabbitMQ核心功能的介绍,没有介绍RabbitMQ的安装与... 查看详情
rabbitmq核心功能介绍(代码片段)
RabbitMQ核心功能一.MQ的概念与功能介绍二.RabbitMQ的介绍和入门案例三.RabbitMQ的工作队列四.RabbitMQ的工作模式五.RabbitMQ的发布确认六.RabbitMQ的死性队列七.RabbitMQ的延迟队列本文对RabbitMQ核心功能的介绍,没有介绍RabbitMQ的安装与... 查看详情
rabbitmq:dockercompose部署rabbitmq(代码片段)
创建目录,用于存放DockerCompose部署RabbitMQ的yaml文件:mkdir-p/root/composefile/rabbitmq写入该yaml文件:vim/root/composefile/rabbitmq/rabbitmq.yaml内容如下所示:version:'3'services:rabbitmq:image:rabbitmq:managementcontainer_name:rabbitmqrestar... 查看详情
rabbitmq
一、RabbitMQ安装#Centos7安装#注意/etc/hosts文件ip和主机名对应wgethttps://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_6_10/rabbitmq-server-3.6.10-1.el7.noarch.rpmyuminstallepel-release-yyuminstal 查看详情
rabbitmq环境搭建(代码片段)
RabbitMQ环境搭建基本概念Windows环境准备安装软件ErlangRabbitMQ环境变量配置ErlangRabbitMQ安装RabbitMQ插件rabbitmq_managementRabbitMQWeb端管理rabbitmq_management用户Linux安装依赖文件安装Erlang安装RabbitMQ启动RabbitMQ添加RabbitMQ用户权限添加RabbitMQ用... 查看详情
rabbitmq环境搭建(代码片段)
RabbitMQ环境搭建基本概念Windows环境准备安装软件ErlangRabbitMQ环境变量配置ErlangRabbitMQ安装RabbitMQ插件rabbitmq_managementRabbitMQWeb端管理rabbitmq_management用户Linux安装依赖文件安装Erlang安装RabbitMQ启动RabbitMQ添加RabbitMQ用户权限添加RabbitMQ用... 查看详情
rabbitmq第五篇:spring集成rabbitmq
前面几篇讲解了如何使用rabbitMq,这一篇主要讲解spring集成rabbitmq。 首先引入配置文件org.springframework.amqp,如下<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring 查看详情