activemq-初识mq--主题topic(代码片段)

闲言博客 闲言博客     2022-12-05     420

关键词:

1.导入依赖

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>

2.编写生产者代码

编写生产者步骤

  1. 获取连接工厂
  2. 获取连接 connection
  3. 获取session
  4. 创建目的地 queue
  5. 创建创建生产者
  6. 创建消息
  7. 发送消息
  8. 释放资源
public class JmsProduce_topic 
    private static String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    private static String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws Exception
        //1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码。
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通过连接工厂,获得连接 connection 并启动访问。
        Connection connection = factory.createConnection();
        connection.start();
        // 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4 创建目的地:topic
        Topic topic = session.createTopic(TOPIC_NAME);
        // 5 创建消息的生产者
        MessageProducer producer = session.createProducer(topic);
        // 6 通过messageProducer 生产 3 条 消息发送到消息队列中
        TextMessage textMessage = session.createTextMessage();
        for (int i = 0; i < 5; i++) 
            // 7  创建消息
            textMessage.setText("这是"+i+"条消息");
            // 8  通过messageProducer发送给mq
            producer.send(textMessage);
        

        // 9 关闭资源
        producer.close();
        session.close();
        connection.close();
    

注意:生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者

3.编写消费者代码(while阻塞方式)

编写生产者步骤

  1. 获取连接工厂
  2. 获取连接 connection
  3. 获取session
  4. 创建目的地 queue
  5. 创建创建消费者
  6. 消费消息
  7. 释放资源【处于阻塞状态的话不会释放】
public class JmsConsumer_topic 
    private static String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    private static String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws JMSException 
        //获取工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //获取连接
        Connection connection = factory.createConnection();
        connection.start();
        //获取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4 创建目的地:主题
        Topic topic = session.createTopic(TOPIC_NAME);
        //创建消费者
        MessageConsumer consumer = session.createConsumer(topic);
        /* 通过监听的方式来消费消息,是异步非阻塞的方式消费消息。
         */
        while (true)
            // reveive() 一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式 。和socket的accept方法类似的。
            // reveive(Long time) : 等待n毫秒之后还没有收到消息,就是结束阻塞。
            // 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage
            TextMessage receive = (TextMessage)consumer.receive();
            if (receive != null)
                String msg = receive.getText();
                System.out.println(msg);
            else 
                break;
            
        
        consumer.close();
        session.close();
        connection.close();
    


3.编写消费者代码(监听器方式)

public class JmsConsumerTopicAsyn 
    private static String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    private static String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws JMSException, IOException 
        //获取工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //获取连接
        Connection connection = factory.createConnection();
        connection.start();
        //获取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4 创建目的地 (两种 : 队列/主题   这里用主题)
        Topic topic = session.createTopic(TOPIC_NAME);
        //创建消费者
        MessageConsumer consumer = session.createConsumer(topic);
        /* 通过监听的方式来消费消息,是异步非阻塞的方式消费消息。
           通过messageConsumer 的setMessageListener 注册一个监听器,当有消息发送来时,系统自动调用MessageListener 的 onMessage 方法处理消息
         */
        consumer.setMessageListener(new JmsConsumerListener());
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    


public class JmsConsumerListener implements MessageListener 

    //有消息时会触发该方法
    @Override
    public void onMessage(Message message) 
        //监听
        TextMessage textMessage = (TextMessage) message;
        if (null != textMessage)
            try 
                System.out.println(textMessage.getText());
             catch (JMSException e) 
                e.printStackTrace();
            
        
    

4.查看后台

启动消费者前


启动消费者后

启动消费者后,再启动生产者,这样生产的消息才会被消费到


activemq-初识mq--主题topic(代码片段)

1.导入依赖<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.9</version></dependency>2.编写生产者代码编写生产者步骤获取连接工厂获取连接connection获取session创建目的地q... 查看详情

activemq-初识mq--队列queue(代码片段)

1.导入依赖<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.9</version></dependency>2.编写生产者代码编写生产者步骤获取连接工 查看详情

activemq-初识mq--队列queue(代码片段)

1.导入依赖<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.9</version></dependency>2.编写生产者代码编写生产者步骤获取连接工 查看详情

activemq之topic主题模式(代码片段)

开发环境我们使用的是ActiveMQ5.11.1Release的Windows版,官网最新版是ActiveMQ5.12.0Release,大家可以自行下载,下载地址。需要注意的是,开发时候,要将apache-activemq-5.11.1-bin.zip解压缩后里面的activemq-all-5.11.1.jar包加入到classpath下面,这... 查看详情

消息中间件activemq学习笔记[java编码mq,消费者生产者基本模型](代码片段)

近期计划学习一下消息队列;找到的学习视频地址:尚硅谷ActiveMQ教程快速入门文章目录1.Java编码MQ,模拟基础生产者消费者自定义消息生产者自定义同步阻塞式的消息消费者异步监听方式的消费者关于3种常见的消费者问题队列案例... 查看详情

activemq-初识mq--队列queue(代码片段)

1.导入依赖<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.9</version></dependency>2.编写生产者代码编写生产者步骤获取连接工厂获取连接connection获取session创建目的地q... 查看详情

activemq学习--002--topic消息例子程序(代码片段)

一、非持久的Topic消息示例注意此种方式消费者只能接收到消费者启动之后,发送者发送的消息。发送者packagecom.lhy.mq.helloworld;importjava.util.concurrent.TimeUnit;importjavax.jms.Connection;importjavax.jms.ConnectionFactory;importjavax.jms.DeliveryM 查看详情

activemq(代码片段)

activemq前言使用步骤1.安装2.java操作activemq2.1队列模式2.1主题模式2.3topic和queue的对比总结3.jms4.发布订阅5.事务6.签收7.broker8.springboot整合activemq8.1activemq-produce(生产者)8.2activemq-consumer(消费者)8.3topic模式9.传输协议9.1nio协议(重点)9.2auto 查看详情

activemq初步学习

今天开始学习activeMq。什么是MQ,MQ是消息中间件。消息模型:点对点模型和发布订阅模型。点对点模型:生产者生产消息,放到MQ中,消费者消费消息。每个消息如果不过期,则会被一个消费者消费。消费者接收到消息会返回成... 查看详情

activemq之队列和主题发布订阅实例(代码片段)

...据流 增加maven依赖<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.10.0</version></dependen 查看详情

01-消息中间件概述和activemq入门(代码片段)

1.mq解决的问题系统异步处理应用解耦流量削峰日志处理消息通信2.消息中间件的2中模型2.1Point-to-Point(P2P)/点对点/类比:送快递特点:+一个消费生产者必须有一个消息消费者。一对一的关系+一个消息发送到queue中,如果mqserver重启... 查看详情

activemq初识-2(代码片段)

activeMq简单实例:packagecom.gordon;importorg.apache.activemq.ActiveMQConnectionFactory;importjavax.jms.*;/***生产者*Createdbygordonon2018/9/8.*/publicclassSenderpublicstaticfinalStringuserName="admin";publi 查看详情

jms-mq-发布/订阅

1,Tomcat配置  <Resourcename="topic/connectionFactory"auth="Container"type="org.apache.activemq.ActiveMQConnectionFactory"description="JMSConnectionFactory"factory="org.apache.activemq.jndi.JNDIRefere 查看详情

01-初识消息队列mq&&rabbit相关概念介绍

...2.1、流量消峰2.2、引用解耦2.3、异步处理二、MQ的分类1.ActiveMQ2.Kafka3.RocketMQ4.RabbitMQ三、RabbitMQ1、RabbitMQ的基本概念2、RabbitMQ四大核心概念2.1、生产者2.2、交换机2.3、队列2.4、消费者3、RabbitMQ工作的原理&# 查看详情

activemq获取topic数据(代码片段)

<?php//error_reporting(0);error_reporting(E_ERROR|E_WARNING|E_PARSE);set_time_limit(0);$broker=‘tcp://localhost:61613‘;$queue=‘/topic/test‘;$stomp=newStomp($broker);//php需安装stomp扩展,引入stomp类$stomp-& 查看详情

springboot整合activemq之topic,不懂也得懂了吧

前言?今天和大家分享springboot整合activeMq之topic(主题)--发布/订阅模式,类似微信公众号,我们关注公共就可以收到消息,topic需要消费者先订阅才能收到消息,如果没有消费者订阅,生产者产生的消息就是废消息(发布/订阅模式,... 查看详情

消息队列activemq

1ActiveMq JMS-->javaMessage 2ActiveMQ的消息形式  有2种类型:    2.1 一种是点对点的,一个生产者和一个消费者一一对应      多个生产者----Queue----多个消费者    2.2另一种是发布、订阅模式,一个生产... 查看详情

kafka初识

参考来自Kafka入门实战pdf1.kafka的基本概念1.主题:  Kafka将一组消息抽象归纳为一个主题(Topic),也就是说,一个主题就是对消息的一个分类。生产者将消息发送到特定主题,消费者订阅主题或主题的某些分区进行消费。2.消... 查看详情