rabbitmq

pyrene pyrene     2022-08-29     208

关键词:

一、简介

解释RabbitMQ,就不得不提到AMQP(Advanced Message Queuing Protocol)协议。 AMQP协议是一种基于网络的消息传输协议,它能够在应用或组织之间提供可靠的消息传输。RabbitMQ是该AMQP协议的一种实现,利用它,可以将消息安全可靠的从发 送方传输到接收方。简单的说,就是消息发送方利用RabbitMQ将信息安全的传递给接收方。

     RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

下面是rabbitmq 图示

二、安装

rabbitmq for windows

1 erlang:http://www.erlang.org/download/otp_win64_17.3.exe 2 3

下载地址:http://www.rabbitmq.com/download.html 

安装API

1 pip install pika
2 or
3 easy_install pika
4 or
5 源码
6  
7 https://pypi.python.org/pypi/pika

 

rabbitmq for linux

1 安装配置epel源
2    $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
3  
4 安装erlang
5    $ yum -y install erlang
6  
7 安装RabbitMQ
8    $ yum -y install rabbitmq-server

 

安装API

1 pip install pika
2 or
3 easy_install pika
4 or
5 源码
6  
7 https://pypi.python.org/pypi/pika

 

for mac:

http://my.oschina.net/u/998693/blog/547873

三、简单队列模型

基于Queue实现生产者消费者模型
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading


message = Queue.Queue(10)


def producer(i):
    while True:
        message.put(i)


def consumer(i):
    while True:
        msg = message.get()


for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()
View Code

 

  对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

基于rabbitmq的简单队列模型:在于交换机不工作的情况下

#/usr/bin/env python

import pika

# ######################### 生产者 #########################
#封装socket逻辑部分
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
#拿到操作句柄
channel = connection.channel()
#创建队列,并且给队列取一个名字
channel.queue_declare(queue='hello')
#这里面三个参数,第一个是交换机,第二个参数指定队列,第三个参数是传递的数据
#一个队列的时候exchange交换机设置为空,routing_key指定把body(内容)放到那个队列里面
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
View Code

 

channel.queue_declare(queue='hello')  这里创建队列的时候可能会出现durable为true的提示。

durable=Ture:就是创建持续化的队列

durable=False: 是创建非持续化的队列

如果创建channel.queue_declare(queue='hello'durable=Ture)就是创建了持续化队列。如果创建了持续化队列,就不要创建非持续化的队列

 

生产者:

首先导入pika模块

1、  封装socket逻辑部分

2、  拿到操作句柄,创建队列并且给这个队列取一个名字

3、  进行操作把生产的内容放入到队列中,这里有三个参数

a)         第一个残害苏是交换机,如果只有一个队列交换机默认为空,

b)         第三个参数是需要传递的内容

c)         第二个队列是设置吧内容传递给那个队列

 1 #/usr/bin/env python
 2 
 3 import pika
 4 
 5 # ########################## 消费者 ##########################
 6 #封装socket逻辑部分
 7 connection = pika.BlockingConnection(pika.ConnectionParameters(
 8         host='localhost'))
 9 #生成句柄
10 channel = connection.channel()
11 #创建队列,内部做了判断,如果没有这个队列会创建,如果已经有了队列就不会创建
12 channel.queue_declare(queue='hello')
13 
14 def callback(ch, method, properties, body):
15     print(" [x] Received %r" % body)
16 
17 #回调函数,首先去指定队列中获取数据,放到body中,然后执行回调函数
18 #no_ack:等于True是无应答,如果为false就是有应答
19 #有应答是用户如果连接的时候突然断了,这个时候在队列中这个数据还保存,用户连接的时候还有
20 #这个队列会一直等这个用户。这个是比较安全的,但是效率不会太高
21 channel.basic_consume(callback,
22                       queue='hello',
23                       no_ack=True)
24 
25 print(' [*] Waiting for messages. To exit press CTRL+C')
26 channel.start_consuming()
消费者

1、  首先导入pika模块

2、  封装socket逻辑部分,并且声称句柄

3、  创建队列(内部会做一个判断,如果这个队列没有那么就会创建这个队列,如果有就不会创建)

4、  执行回调函数来获取内容

a)         第一个参数是回调函数

b)         第二个参数指定队列,把获取的内容放入到回调函数中

c)         第三个参数应答参数用来设置是否有应答(消息丢不丢失)

四、重要参数

1、  acknowledgment消息丢不丢失(这个参数是用户如果崩溃异常消息丢不丢失)

要设置下面两部分内容:首先设置no_ack=False,然后设置下面黄色部分

import pika

 

connection = pika.BlockingConnection(pika.ConnectionParameters(

        host='10.211.55.4'))

channel = connection.channel()

 

channel.queue_declare(queue='hello')

 

def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print 'ok'

    ch.basic_ack(delivery_tag = method.delivery_tag)

 

channel.basic_consume(callback,

                      queue='hello',

                      no_ack=False)

 

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

第二个参数:

durable 消息不丢失:这个是rabbitmq如果崩溃异常,数据会不会丢失

1、  在生产者中把下面黄色部分设置 :这样重启的时候数据会恢复原因是吧数据从内存放入到了磁盘

a)         创建队列的时候durable=True

b)         把数据放入到队列中的时候把delivery_mode=2

#!/usr/bin/env python

import pika

 

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

channel = connection.channel()

 

# make message persistent

channel.queue_declare(queue='hello', durable=True)

 

channel.basic_publish(exchange='',

                      routing_key='hello',

                      body='Hello World!',

                      properties=pika.BasicProperties(

                          delivery_mode=2, # make message persistent

                      ))

print(" [x] Sent 'Hello World!'")

connection.close()

消费者

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import pika

 

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

channel = connection.channel()

 

# make message persistent

channel.queue_declare(queue='hello', durable=True)

 

 

def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print 'ok'

    ch.basic_ack(delivery_tag = method.delivery_tag)

 

channel.basic_consume(callback,

                      queue='hello',

                      no_ack=False)

 

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

消费获取顺序

在rabbitmq中默认的消息队列中的数据是按照顺序被消费者拿走的。假如有三个消费者,默认按照顺序在队列中获取数据,但是由于机器配置等原因,有些会拿的慢,有些拿的快,默认情况下,拿的快的机器会等待拿的慢的机器获取完毕后才去获取新数据,这样就造成了机器闲置状态。如果想打破这种默认模式,而是谁获取完毕之后谁就去获取新数据,这样效率就会提高。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照顺序排列获取数据

这条数据要放到消费者里面。如下:黄色部分

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import pika

 

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))

channel = connection.channel()

 

# make message persistent

channel.queue_declare(queue='hello')

 

 

def callback(ch, method, properties, body):

    print(" [x] Received %r" % body)

    import time

    time.sleep(10)

    print 'ok'

    ch.basic_ack(delivery_tag = method.delivery_tag)

 

channel.basic_qos(prefetch_count=1)

 

channel.basic_consume(callback,

                      queue='hello',

                      no_ack=False)

 

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

五、exchange工作模型(fanout,direct,topic)

1、 发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中

exchange type = fanout

 

发布者:让数据发送到交换机中,然后由交换机决定让数据放入到那个队列中

发布者

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
#exchange=”logs”表示给交换机创建一个名字,创建了一个交换机,
channel.exchange_declare(exchange='logs',
                         type='fanout')
#把数据
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
#把数据放入到交换机中,这里不需要指定队列,所以routing_key为空,第三个参数是吧数据放入message
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
订阅者

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
#1、创建交换机
channel.exchange_declare(exchange='logs',
                         type='fanout')
#2、随机创建一个队列
result = channel.queue_declare(exclusive=True)
#3、给创建的队列随机命名
queue_name = result.method.queue

#4、创建的队列进行绑定交换机(拿到队列的名字,交换机的名字进行绑定)
channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')
#7、获取了数据然后执行
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
#6、这里获取生产者的数据,然后执行callback函数
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

#5、这里进行一个阻塞,直到生产者放出一个数据的时候就会执行上面的方法
channel.start_consuming()
View Code

 

生产者启动的时候会创建交换机,然后把数据放入到交换机中,然后消费者启动的时候首先创建队列,并且检查有没有交换机,如果有交换机就进行绑定,如果没有就需要自己创建一个交换机,然后进行阻塞等待生产者把数据放入到交换机中,如果有数据,那么消费者就会从交换机中获取数据通过队列获取

---------------------------------------------------------------------------------------------------------------------

2关键字发送

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

exchange type = direct

详细解释:生产者创建交换器,消费者创建队列绑定队列并且绑定交换器。这个时候队列还需要绑定关键字,如果绑定的有关键字,那么生产者发送消息到交换器,交换器根据哪个队列绑定的有关键字,然后把数据发送到那个队列,然后那个消费者绑定这个交换器,那么那个消费者就能获取到这个数据

注意:一个队列可以绑定多个字符串

 

消费者

绑定交换器和队列的时候绑定关键字

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
   #这里绑定的时候除了绑定交换机,和队列名字,还要绑定关键字,上面进行了for循环,所以这里是绑定了多个关键字
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

 

生产者:

1、创建交换器,并且让参数为direct

2、设置一个或者多个关键字

3、让关键字放入到交换器中

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

--------------------------------------------------------------------------------------------------------------------------

3、模糊匹配

 

exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • * 表示只能匹配 一个 单词

 

发送者路由值              队列中
old.boy.python          old.*  -- 不匹配
old.boy.python          old.#  -- 匹配

 

模糊匹配和关键词匹配除了参数不一样其他的都是一样的

生产者

1、  创建交换器,并且加入模糊匹配的参数

2、  把关键字放入到交换器

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

消费者

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
 
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

六、rabbitmq的应用

http://www.rabbitmq.com/getstarted.html  这是rabbitmq教程

 

处理大并发设计模式,用户访问并且操作web服务器的时候,web服务器只是把用户访问操作等数据 提交到队列中,并且同时在数据库中给数据库给这个请求标记状态。

根据队列的的属性,后台服务器按照顺序处理这些请求,处理之后在数据库中返回其状态,这样用户刷新访问数据库就能看到是否处理了用户的请求。这样可以减轻web服务的并发。例子(活动抢票)

队列的应用之rpc

通过rabbitmq实现远程调用模式

用户想要调用远程多个服务器,这个时候可以产生两个队列,其中A这个端口远程服务器一直连接等待用户输入内容,其中一个队列B通过主机端口等来给远程服务器发送指令,然后服务器接收到指令之后通过用户产生的另一个队列B给用户返回消息,如果用户不想收到消息,直接把创建的另一个队列B删除掉就可以了

 

rabbitmq消息中间件

RabbitMQ消息中间件RabbitMQ简介windows下安装RabbitMQRabbitMQ基本概念RabbitMQ简单模式RabbitMQ工作队列模式RabbitMQ发布订阅模式RabbitMQ路由模式RabbitMQ主题模式RabbitMQRPC模式RabbitMQ发布确认模式 查看详情

rabbitmq学习系列:rabbitmq安装与配置

上一篇,简单介绍了RabbitMQ的情况还有一些相关的概念,这一篇,会讲讲 RabbitMQ安装与配置。  1.安装    RabbitMQ是建立在强大的ErlangOTP平台上,因此安装RabbitMQ之前要先安装Erlang。    erlang:http://www.erlang.org/downl... 查看详情

rabbitmq消息队列系列教程认识rabbitmq

摘要RabbitMQ是最为流行的消息中间件,是处理高并发业务的利器。本系列教程,将跟大家一起学习RabbitMQ。目录RabbitMQ是什么?RabbitMQ的特点是什么?一、RabbitMQ是什么?RabbitMQ是基于Erlang开发的目前最流行的开源消息中间件,类似... 查看详情

rabbitmq常用的命令

rabbitMQ常用的命令启动监控管理器:rabbitmq-plugins enable rabbitmq_management关闭监控管理器:rabbitmq-pluginsdisablerabbitmq_management启动rabbitmq:rabbitmq-service start关闭rabbitmq:rabbitmq-service stop查 查看详情

rabbitmq使用(代码片段)

 1.rabbitmq使用 -目的:可以下载镜像,使用rabbitmq-操作流程:RabbitMQ介绍和使用1,RabbitMQ介绍消息队列是消息在传输的过程中保存消息的容器。现在主流消息队列有:RabbitMQ、ActiveMQ、Kafka等等。RabbitMQ和ActiveMQ比较:系统吞吐量:Ra... 查看详情

菜鸟刷面试题(rabbitmq篇)

目录:rabbitmq的使用场景有哪些?rabbitmq有哪些重要的角色?rabbitmq有哪些重要的组件?rabbitmq中vhost的作用是什么?rabbitmq的消息是怎么发送的?rabbitmq怎么保证消息的稳定性?rabbitmq怎么避免消息丢失?要保证消息持久化成功的... 查看详情

【rabbitmq】rabbitmq集群节点重新加入集群相关操作

参考技术ArabbitMQ架构为  rabbitMQ+keepalived 镜像模式。rabbitmq01 192.168.1.101rabbitmq01 192.168.1.102vip192.168.1.110 在rabbitmq02上现在rabbitmq01故障,起不来,队列数据同步rabbbit02有问题。解决方法是将rabbitmq01这个故障节点... 查看详情

rabbitmq一文读懂(代码片段)

目录1、RabbitMQ介绍应用场景其他消息队列选择RabbitMQ原因2、AMQP消息队列其他相关知识什么是AMQP?什么是JMS?3、RabbitMQ快速入门RabbitMQ的工作原理RabbitMQ消息发送和接受流程梳理RabbitMQ消息发送RabbitMQ消息接受RabbitMQ安装Rabbi... 查看详情

rabbitmq初识rabbitmq(代码片段)

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。哪些大厂在用RabbitMQ,为什么滴滴、美团、头条开源、性能优秀、稳... 查看详情

yum安装rabbitmq(代码片段)

安装RabbitMQ http://www.rabbitmq.com/releases/rabbitmq-server/1.下载RabbitMQ官方下载地址http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-generic-unix-3.6.1.tar.xz2.安装 RabbitMQ3. 查看详情

rabbitmq:docker环境下搭建rabbitmq集群

 RabbitMQ作为专业级消息队列;如何在微服务框架下搭建 使用组件文档:https://github.com/bijukunjummen/docker-rabbitmq-cluster下载镜像:gitclonehttps://github.com/bijukunjummen/docker-rabbitmq-cluster.git cddocker-rabbitmq 查看详情

rabbitmq核心功能介绍(代码片段)

RabbitMQ核心功能一.MQ的概念与功能介绍二.RabbitMQ的介绍和入门案例三.RabbitMQ的工作队列四.RabbitMQ的工作模式五.RabbitMQ的发布确认六.RabbitMQ的死性队列七.RabbitMQ的延迟队列本文对RabbitMQ核心功能的介绍,没有介绍RabbitMQ的安装与... 查看详情

rabbitmq核心功能介绍(代码片段)

RabbitMQ核心功能一.MQ的概念与功能介绍二.RabbitMQ的介绍和入门案例三.RabbitMQ的工作队列四.RabbitMQ的工作模式五.RabbitMQ的发布确认六.RabbitMQ的死性队列七.RabbitMQ的延迟队列本文对RabbitMQ核心功能的介绍,没有介绍RabbitMQ的安装与... 查看详情

rabbitmq:dockercompose部署rabbitmq(代码片段)

创建目录,用于存放DockerCompose部署RabbitMQ的yaml文件:mkdir-p/root/composefile/rabbitmq写入该yaml文件:vim/root/composefile/rabbitmq/rabbitmq.yaml内容如下所示:version:'3'services:rabbitmq:image:rabbitmq:managementcontainer_name:rabbitmqrestar... 查看详情

rabbitmq

一、RabbitMQ安装#Centos7安装#注意/etc/hosts文件ip和主机名对应wgethttps://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_6_10/rabbitmq-server-3.6.10-1.el7.noarch.rpmyuminstallepel-release-yyuminstal 查看详情

rabbitmq环境搭建(代码片段)

RabbitMQ环境搭建基本概念Windows环境准备安装软件ErlangRabbitMQ环境变量配置ErlangRabbitMQ安装RabbitMQ插件rabbitmq_managementRabbitMQWeb端管理rabbitmq_management用户Linux安装依赖文件安装Erlang安装RabbitMQ启动RabbitMQ添加RabbitMQ用户权限添加RabbitMQ用... 查看详情

rabbitmq环境搭建(代码片段)

RabbitMQ环境搭建基本概念Windows环境准备安装软件ErlangRabbitMQ环境变量配置ErlangRabbitMQ安装RabbitMQ插件rabbitmq_managementRabbitMQWeb端管理rabbitmq_management用户Linux安装依赖文件安装Erlang安装RabbitMQ启动RabbitMQ添加RabbitMQ用户权限添加RabbitMQ用... 查看详情

rabbitmq第五篇:spring集成rabbitmq

  前面几篇讲解了如何使用rabbitMq,这一篇主要讲解spring集成rabbitmq。  首先引入配置文件org.springframework.amqp,如下<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring 查看详情