activemq(13):activemq的集群

author author     2022-08-29     356

关键词:

一、简介

1.1 消费者集群(Queue consumer clusters)

ActiveMQ支持Consumer对消息高可靠性的负载平衡消费,如果一个Consumer死掉,该消息会转发到其它的Consumer消费的Queue上。

如果一个Consumer获得消息比其它Consumer快,那么他将获得更多的消息。

因此推荐ActiveMQ的Broker和Client使用failover://transport的方式来配置链接。


1.2 Broker clusters

大部情况下是使用一系列的Broker和Client链接到一起。如果一个Broker死掉了,Client可以自动链接到其它Broker上。

实现以上行为需要用failover协议作为Client。


如果启动了多个Broker,Client可以使用static discover或者 Dynamic discovery容易的从一个broker到另一个broker直接链接。


这样当一个broker上没有Consumer的话,那么它的消息不会被消费的,然而该broker会通过存储和转发的策略来把该消息发到其它broker上。


特别注意:

  ActiveMQ默认的两个broker,static链接后是单方向的,broker-A可以访问消费broker-B的消息,如果要支持双向通信,需要在

  netWorkConnector配置的时候,设置duplex=true 就可以了。


操作,服务端搭建静态网络连接与消息回流

消息者1:

public void test1() throws Exception {
    ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.175.13:61676");
    Connection connection = cf.createConnection();
    connection.start();
		
    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("my-queue");
    for (int i = 0; i < 1; i++) {
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
	    @Override
	    public void onMessage(Message message) {
	        TextMessage m = (TextMessage) message;
	        try {
	            System.out.println("===收到11111111:" + m.getText());
		    session.commit();
	        } catch (JMSException e) {
	            e.printStackTrace();
	        }    
	    }
	});
    }
}

消息者2:

public void test1() throws Exception {
    ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.175.13:61616");
    Connection connection = cf.createConnection();
    connection.start();
		
    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("my-queue");
    for (int i = 0; i < 1; i++) {
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
	    @Override
	    public void onMessage(Message message) {
	        TextMessage m = (TextMessage) message;
	        try {
	            System.out.println("===收到222222222:" + m.getText());
		    session.commit();
	        } catch (JMSException e) {
	            e.printStackTrace();
	        }    
	    }
	});
    }
}

生产者:

public void test1() throws Exception {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("my-queue");
    MessageProducer producer = session.createProducer(destination);
    for (int i = 0; i < 30; i++) {
        TextMessage message = session.createTextMessage("message--" + i);
	Thread.sleep(1000);
	producer.send(message);
    }
    session.commit();
    session.close();
    connection.close();
}

效果:

    技术分享


二、主从节点的集群(Master Slave)

在5.9的版本里面,废除了Pure Master Slave的方式,目前支持:

  1:Shared File System Master Slave:

    基于共享储存的Master-Slave:多个broker实例使用一个存储文件,谁拿到文件锁就是master,其他处于待启动状态,如果master挂掉了,

    某个抢到文件锁的slave变成master

  2:JDBC Master Slave:基于JDBC的Master-Slave:使用同一个数据库,拿到LOCK表的写锁的broker成为master

  3:Replicated LevelDB Store:基于ZooKeeper复制LevelDB存储的Master-Slave机制,这个是5.9新加的


具体的可以到官方察看: http://activemq.apache.org/masterslave.html 


注意:这里可以不要静态连接与回流了


2.1 JDBC Master Slave的方式

2.1.1 简介

利用数据库作为数据源,采用Master/Slave模式,其中在启动的时候Master首先获得独有锁,其它Slaves Broker则等待获取独有锁。

推荐客户端使用Failover来链接Brokers。


具体如下图所示:

  技术分享

Master失败

  如果Master失败,则它释放独有锁,其他Slaver则获取独有锁,其它Slaver立即获得独有锁后此时它将变成Master,并且启动所有的传输链接。

  同时,Client将停止链接之前的Master和将会轮询链接到其他可以利用的Broker即新Master。如上中图所示

Master重启

  任何时候去启动新的Broker,即作为新的Slave来加入集群,如上右图所示


2.1.2 JDBC Master Slave的配置

使用<jdbcPersistenceAdapter/>来配置消息的持久化,自动就会使用JDBC Master Slave的方式。

参考:ActiveMQ消息存储持久化 里的jdbc

去掉静态连接:参考ActiveMQ的静态网络链接

去掉回流:参考集群下的消息回流功能


注意:在配置JDBC时,注意配置useDatabaseLock="true",如下

    <jdbcPersistenceAdapter dataSource="#mysql-ds" useDatabaseLock="true" />

必需设置,不然在保存数据时会报数据主键重复异常


2.1.3 测试

生产30个消息:

public void test1() throws Exception {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("my-queue");
    MessageProducer producer = session.createProducer(destination);
    for (int i = 0; i < 30; i++) {
        TextMessage message = session.createTextMessage("message--" + i);
	Thread.sleep(1000);
	producer.send(message);
    }
    session.commit();
    session.close();
    connection.close();
}

  技术分享


消费3个消息:

public void test1() throws Exception {
    ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");
    Connection connection = cf.createConnection();
    connection.start();
		
    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("my-queue");
    MessageConsumer consumer = session.createConsumer(destination);
    int i=0;
    while(i<3) {
        i++;
	TextMessage message = (TextMessage) consumer.receive();
	session.commit();
	System.out.println("收到消 息:" + message.getText());
    }
    session.close();
    connection.close();
}

  技术分享


然后关闭61616这台mq,进入到61676界面:

   技术分享



本文出自 “我爱大金子” 博客,请务必保留此出处http://1754966750.blog.51cto.com/7455444/1919608

activemq学习系列activemq集群(代码片段)

activemq集群activemq是可以通过 networkConnectors来实现集群的。首页准备多台activemq 1:端口号:8161,服务端口号:616162:端口号:8162,服务端口号:616173:端口号:8163,服务端口号:61618然后在任意一台的/conf/activemq.xml的&nbs... 查看详情

activemq高可用集群方案

...稳定性要求极高的系统中,高可用的是必不可少的,当然ActiveMQ也有自己的集群方案。从ActiveMQ5.9开始,ActiveMQ的集群实现方式取消了传统的Master-Slave方式,增加了基于ZooKeeper+LevelDB的Master-Slave实现方式。相关文章:范例项目:&nbs... 查看详情

activemq实战-集群

原文:http://blog.csdn.net/lifetragedy/article/details/51869032 ActiveMQ的集群 内嵌代理所引发的问题:消息过载管理混乱如何解决这些问题——集群的两种方式:Masterslave BrokerclustersActiveMQ的集群有两种方式:MASTER/SLAVE模式Cluster模... 查看详情

activemq高可用+集群

原文地址:http://blog.csdn.net/lifetragedy/article/details/51869032ActiveMQ的集群 内嵌代理所引发的问题:消息过载管理混乱如何解决这些问题——集群的两种方式:Masterslave BrokerclustersActiveMQ的集群有两种方式:MASTER/SLAVE模式Cluster模... 查看详情

activemq集群配置高可用

自从activemq5.9.0开始,activemq的集群实现方式取消了传统的PureMasterSlave方式,增加了基于zookeeper+leveldb的实现方式,其他两种方式:目录共享和数据库共享依然存在。 1、Master-Slave部署方式 1)、SharedFilesystemMaster-Slave方式 ... 查看详情

activemq_伪集群和主从高可用使用

...sp;   介绍如何在同一台虚拟机上搭建高可用的Activemq服务,集群数量包含3个Activemq,当Activemq可用数>=2时,整个集群可用。    本文Activemq的集群数量为3个,分别命名为mq1,mq2,mq3 二、概念介绍1、... 查看详情

activemq+zookeper高可用集群方案配置

...稳定性要求极高的系统中,高可用的是必不可少的,当然ActiveMQ也有自己的集群方案。从ActiveMQ5.9开始,ActiveMQ的集群实现方式取消了传统的Master-Slave方式,增加了基于ZooKeeper+LevelDB的Master-Slave实现方式。相关文章:范例项目:&nbs... 查看详情

47.activemq集群

...Sec原创)  使用ZooKeeper实现的Master-Slave实现方式,是对ActiveMQ进行高可用的一种有效的解决方案,高可用的原理:使用ZooKeeper(集群)注册所有的ActiveMQBroker。只有其中的一个Broker可以对外提供服务(也就是Master节点),其他... 查看详情

activemq的集群方案对比及部署

转载:http://blog.csdn.net/lifetragedy/article/details/51869032ActiveMQ的集群 内嵌代理所引发的问题:消息过载管理混乱如何解决这些问题——集群的两种方式:Masterslave BrokerclustersActiveMQ的集群有两种方式:MASTER/SLAVE模式Cluster模式&nb... 查看详情

基于docker搭建activemq的高可用集群

  最近刚开始玩Docker和ActiveMQ刚好学习到ActiveMQ集群的搭建,就将其记录了下来给有需要的人,也可以跟大家交流交流。  这里先感谢慕课网和http://blog.csdn.net/lifetragedy/article/details/51869032,在学习ActiveMQ有很大的帮助... 查看详情

activemq集群的搭建

一、环境准备  1、上传 apache-activemq-5.11.1-bin.tar和 zookeeper-3.4.5.tar.gzLinux服务器(/usr/local/install目录下)      zookeeper-3.4.5.tar.gz上传方式相同。  二、Zookeeper方案主机IP消息端口通信端口节点目录/usr/local/software... 查看详情

activemq集群搭建(代码片段)

一、Activemq主备搭建  SharedFilesystemMaster-Slave方式  sharedfilesystemMaster-Slave部署方式主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为mast... 查看详情

activemq集群搭建

集群搭建一:静态网络集群1.简介?当ActiveMQ面对大量消息存储和大量Client交互时,性能消耗将会达到单个broker极限,此时我们需要对ActiveMQ进行水平扩展。ActiveMQ提供了“network”机制,可以把多个broker实例“串联”一起,形成“Fo... 查看详情

activemq消息队列集群的搭建

1.准备activemqapache-activemq-5.12.0-bin.tar2.解压文件3.并将文件cp一份命名为activemq1  进入conf文件进行修改  修改属性为brokerNamet他的名字可以随意修改,但是需要保证唯一 这里我们配置了动态组播 将暂时不需要的属性进行... 查看详情

jms之——activemq高可用+负载均衡集群

一、高可用集群从ActiveMQ5.9开始,ActiveMQ的集群实现方式取消了传统的Master-Slave方式,增加了基于ZooKeeper+LevelDB的Master-Slave实现方式,其他两种方式目录共享和数据库共享方式依然存在.1、文件共享(KahaDB) [html] viewplain&nbs... 查看详情

带有 ActiveMQ 集群的 Apache Camel

】带有ActiveMQ集群的ApacheCamel【英文标题】:ApacheCamelwithActiveMQclustering【发布时间】:2011-01-1414:36:29【问题描述】:我正在尝试确定集群ServiceMix3.3.1/Camel2.1/AMQ5.3应用程序的选项。我正在执行大量消息处理,我需要集群以实现高可... 查看详情

zookeeper+activemq集群整合配置文档

一:使用ZooKeeper实现的MasterSlave实现方式,是对ActiveMQ进行高可用的一种有效的解决方案,高可用的原理:使用ZooKeeper(集群)注册所有的ActiveMQBroker。只有其中的一个Broker可以对外提供服务(也就是Master节点),其他的Broker处于... 查看详情

activemq的安装和使用(代码片段)

一、安装环境  jdk1.8  centos764系统  ActiveMQ5.13.0版本二、安装  解压mq文件即可。  三、启动和停止  启动命令:apache-activemq-5.13.0/bin/activemqstart    查看启动状态:apache-activemq-5.13.0/bin/activemqstatus    停止... 查看详情