rocketmq生产者示例程序

XIAO的博客 XIAO的博客     2022-08-10     126

关键词:

  转载请注明出处:http://www.cnblogs.com/xiaodf/

  本示例展示了一个RocketMQ producer的简单实现,通过解析文本文件获取输入数据,将数据经过Avro序列化后发送到RocketMQ。

  程序通过stdin.xml配置文件获取主要参数值,stdin.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<operator>
	<parameters>
		<parameter>
			<key>rocketmq.nameserver.list</key>
			<value>172.16.8.106:9876</value>
		</parameter>
		<parameter>
			<key>rocketmq.group.id</key>
			<value>test006</value>
		</parameter>
		<parameter>
			<key>rocketmq.topic</key>
			<value>TopicTest2</value>
		</parameter>
		<parameter>
			<key>rocketmq.tags</key>
			<value>*</value>
		</parameter>
		<parameter>
			<key>rocketmq.message.key</key>
			<value>OrderID0034</value>
		</parameter>
		<parameter>
			<key>schemaStr</key>
			<value>col1:string,col2:double</value>
		</parameter>
		<parameter>
			<key>filePath</key>
			<value>/home/test/rocketmq/input.txt</value>
		</parameter>
	</parameters>
</operator>

  

生产者示例程序如下:

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.scistor.datavision.operator.common.AvroUtils;
import com.scistor.datavision.operator.common.OperatorConfiguration;
import org.apache.avro.Schema;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.schema.HCatSchema;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


public class RocketProducer {

    // parameters
    private String nameserver;
    private String rocketmqTopic;
    private String tags;
    private String key;
    private String schemaStr;
    private String filePath;

    public RocketProducer configure(OperatorConfiguration conf) {
        this.nameserver = conf.get("rocketmq.nameserver.list");
        this.rocketmqTopic = conf.get("rocketmq.topic");
        this.tags = conf.get("rocketmq.tags");
        this.key = conf.get("rocketmq.message.key");
        this.schemaStr = conf.get("schemaStr");
        this.filePath = conf.get("filePath");
        return this;
    }

    public int run() {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr(nameserver);
        producer.setInstanceName("RocketProducer");
        /**
         * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         * 注意:切记不可以在每次发送消息时,都调用start方法
         */
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        HCatSchema hcatSchema = null;
        Schema schema = null;
        SchemaUtil schemaUtil = new SchemaUtil();
        try {
            hcatSchema = schemaUtil.createHCatSchema(schemaStr);
            schema = schemaUtil.createSchema("com.scistor.rocketmq.producer", rocketmqTopic, hcatSchema);
        } catch (HCatException e) {
            e.printStackTrace();
        }

        List<String> content = RocketProducer.readFileByLines(filePath);

        /**
         * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
         * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
         * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
         * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
         */
        for (int i = 0; i < content.size(); i++) {
            try {
                {
                    String[] fields = content.get(i).split(",");
                    Object[] record = AvroUtils.convert(schema, fields);
                    byte[] bytes = AvroUtils.serialize(schema, record);
                    Message msg = new Message(rocketmqTopic,// topic
                            tags,// tag
                            key,// key
                            bytes);// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            //TimeUnit.MILLISECONDS.sleep(10);
        }

        /**
         * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
         * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
         */
        producer.shutdown();
        return 0;
    }

    public static List<String> readFileByLines(String fileName) {
        List<String> list = new ArrayList<String>();
        File file = new File(fileName);
        BufferedReader reader = null;
        try {
            System.out.println("以行为单位读取文件内容,一次读一整行:");
            reader = new BufferedReader(new FileReader(file));
            String tempString = null;
            int line = 1;
            // 一次读入一行,直到读入null为文件结束
            while ((tempString = reader.readLine()) != null) {
                // 显示行号
                list.add(tempString);
                System.out.println("line " + line + ": " + tempString);
                line++;
            }
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e1) {
                }
            }
        }
        return list;
    }

    public static void main(String[] args) {
        if (args.length < 1) {
            System.err.println("需要: 参数配置文件<stdin.xml>所在的hdfs目录");
            System.exit(-1);
        }
        OperatorConfiguration conf = new OperatorConfiguration(args[0]);
        RocketProducer trainer = new RocketProducer();
        System.exit(trainer.configure(conf).run());
    }
}

  

程序运行输出打印到控制台:

[root@m106 rocketmq]# ./produce.sh 

以行为单位读取文件内容,一次读一整行: line 1: hdfs:///user/xdf/streaming/file-web/file/1.html,1 line 2: hdfs:///user/xdf/streaming/file-web/file/2.html,2 line 3: hdfs:///user/xdf/streaming/file-web/file/3.html,3 line 4: hdfs:///user/xdf/streaming/file-web/file/4.html,4 line 5: hdfs:///user/xdf/streaming/file-web/file,1 line 6: /home/xdf/workflow/file-web/file/1.html,1 line 7: /home/xdf/workflow/file-web/file/2.html,2 line 8: /home/xdf/workflow/file-web/file/3.html,3 line 9: /home/xdf/workflow/file-web/file/4.html,4 line 10: /home/xdf/workflow/file-web/file,1 SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00A36, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=18710] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00AED, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1], queueOffset=18700] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00BA4, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2], queueOffset=18668] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00C5B, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3], queueOffset=18663] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197504, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=0], queueOffset=18649] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E1975B4, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=1], queueOffset=18633] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197663, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=2], queueOffset=18629] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197712, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=3], queueOffset=18626] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00D12, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=18711] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00DC1, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1], queueOffset=18701]

  

rocketmq使用延迟消息(代码片段)

...费2、延迟消息主要通过对Message设置延迟级别实现,生产者和消费者按照正常逻辑进行生产和消费。生产端@TestpublicvoidsendMessage()throwsExceptionDefaultMQProducerde 查看详情

rocketmq之消息的生产与消费(代码片段)

基本示例中提供了以下两个功能RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。RocketMQ可以用来消费消息。1添加依赖maven:<depen... 查看详情

rocketmq之消息的生产与消费(代码片段)

基本示例中提供了以下两个功能RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。RocketMQ可以用来消费消息。1添加依赖maven:<depen... 查看详情

mq任意延时消息基于服务端实现(代码片段)

目录启动RocketMQ公共代码延时等级转换工具类生产者生产者示例消费者消费者示例启动测试优点启动RocketMQ启动nameserver修改broker配置参数,新增messageDelayLevel=1s2s4s8s16s32s64s128s256s512s1024s2048s4096s8192s16384s32768s65536s131072sorg.apache.rocketmq.... 查看详情

rocketmq生产者和消费者

1、生产者:packagecom.ebways.mq.test.mq;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer 查看详情

rocketmq的生产者生产消息

packagecom.bfxy.rocketmq.model;importorg.apache.rocketmq.client.exception.MQClientException;importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.client.producer.SendResult;importorg.apache.rocketmq.common.message.Message;importcom.bfxy.rocketmq.constants.Const;public... 查看详情

rocketmq原理学习--索引

... 4、RocketMQ原理学习--NameServer 5、RocketMQ原理学习---生产者普通消息发送 6、RocketMQ原理学习---生产者事物消息发送 7、 查看详情

rocketmq之消息的生产与消费(代码片段)

基本示例中提供了以下两个功能RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。RocketMQ可以用来消费消息。1添加依赖maven:<depen... 查看详情

rocketmq(12)——生产者介绍(代码片段)

生产者介绍RocketMQ的消费者有基于拉模式的DefaultMQPullConsumer和基于推模式的DefaultMQPushConsumer。而对于生产者而言基本上只有一个DefaultMQProducer,和一个支持事务的TransactionMQProducer。TransactionMQProducer继承自DefaultMQProducer,所以DefaultMQ... 查看详情

rocketmq(12)——生产者介绍(代码片段)

生产者介绍RocketMQ的消费者有基于拉模式的DefaultMQPullConsumer和基于推模式的DefaultMQPushConsumer。而对于生产者而言基本上只有一个DefaultMQProducer,和一个支持事务的TransactionMQProducer。TransactionMQProducer继承自DefaultMQProducer,所以DefaultMQ... 查看详情

rocketmq的使用(代码片段)

1在resources目录下创建config目录,新建文件rocketmq.properties文件#指定namesrv地址suning.rocketmq.namesrvAddr=localhost:9876#生产者group名称suning.rocketmq.producerGroupName=user_group#事务生产者group名称suning.rocketmq.transactionP 查看详情

rocketmq使用延迟消息(代码片段)

...费2、延迟消息主要通过对Message设置延迟级别实现,生产者和消费者按照正常逻辑进行生产和消费。生产端@TestpublicvoidsendMessage()throwsExceptionDefaultMQProducerdefaultMQProducer=RocketMqUtil.getDefaultMQProducer();Messagemessage=newMessage(Roc... 查看详情

rocketmq基本概念

...个MessageQueue中。ConsumerGroup由多个Consumer实例构成。##2消息生产者(Producer)负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送... 查看详情

rocketmq01_概述及背景主题标签队列生产者消费者注册中心工作流程

...tMQ-概述、背景②.消息、主题、标签、队列、唯一标识③.生产者、消费者、NameServer、Broker④.RocketMq-工作流程⑤.Topic的创建模式、读写队列①.RocketMQ-概述、背景①.RocketMQ是⼀款阿⾥巴巴开源的消息中间件。2016年11⽉28⽇,阿⾥巴巴... 查看详情

rocketmq01_概述及背景主题标签队列生产者消费者注册中心工作流程

...tMQ-概述、背景②.消息、主题、标签、队列、唯一标识③.生产者、消费者、NameServer、Broker④.RocketMq-工作流程⑤.Topic的创建模式、读写队列①.RocketMQ-概述、背景①.RocketMQ是⼀款阿⾥巴巴开源的消息中间件。2016年11⽉28⽇,阿⾥巴巴... 查看详情

springboot整合rocketmq实现入门案例

...更加简单的使用!文章目录1创建maven项目2配置文件3生产者4消费者5测试1创建maven项目创建一个maven项目。引入sp 查看详情

rocketmq生产者部署的机器注意事项

报错:org.springframework.beans.factory.BeanCreationException:Errorcreatingbeanwithname‘warningProducer‘:Invocationofinitmethodfailed;nestedexceptionisjava.lang.ExceptionInInitializerErroratorg.springfra 查看详情

rocketmq—生产者客户端详解(代码片段)

前言MQ的生产者必备的特性有:消息的同步发送,异步发送,消息的ACK与重试机制,消息的顺序生产,批量发送等。RocketMQ在具备这些特性的同时,有自己独有的特性。下面我们对RocketMQ的生产者开展讲解。一、... 查看详情