rabbitmqrabbitmq高级特性(代码片段)

niugang0920 niugang0920     2022-12-15     618

关键词:

消息如何保障100%的投递成功

什么是生产端的可靠性投递

  • 保障消息的成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Borker)确认应答
  • 完善的消息进行补偿机制

生产端-可靠性投递(一)

  • 消息落库,对消息状态进行打标
  • 消息的延迟投递,做二次确认,回调检查

生产端-可靠性投递(二)

消息落库,对消息状态进行打标
技术图片

消息落库,对消息进行打标(对消息设置状态,发送中,broker收到,)

定时器轮训,检测未发送的消息,进行二次投递,最大努力尝试(设置最大次数)

  • step1 消息落库(唯一的消息id) ,一定是数据库入库成功以后在进行发送消息
  • step2 发送消息 到MQ Broker
  • step3 Broker Confirm (发送消息确认)
  • step4 生产者ConfirmListener (异步监听,Broker回送的响应)
  • step5 成功,通过messageId更新消息状态

补偿

分布式定时任务,抓取数据(超过第一时长),尝试重发,重试次数限制

生产端-可靠性投递(三)

消息的延迟投递,做二次确认,回调检查 (最大限度的减少消息落库)
技术图片
方案一在高并发场景下,每次消息落库,影响性能(IO操作)

step1: 业务消息落库 ,一定是数据库入库成功以后在进行发送消息

step2:第一次消息的发送

step3:延迟消息的检测

step4:监听,处理完,生成一条新消息

step5:通过队列发送,确认 不是之前的ack

幂等性概念

幂等性是什么

借鉴数据库的乐观锁机制

执行一条更新SQL

 update t_reps set count=count-1,version=version+1  where verison=1

消费端幂等性保障

在业务高峰期,如何避免消息的重复消费问题

消费端实现幂等性,就意味着,我们的消息永远不会被消费多次,即时收到多条一样的消息。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错
https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/how-to-ensure-that-messages-are-not-repeatedly-consumed.md

业界主流的幂等性操作

  • 唯一ID+指纹码机制,利用数据库主键去重
  • 利用Redis的原子性实现

方案一

唯一ID+指纹码机制

  • 唯一ID+指纹码机制,利用数据库主键去重
  • select count(1) from t_order where id=唯一id+指纹码
  • 好处:实现简单
  • 坏处:高并发下有数据库写入的性能瓶颈
  • 解决方案:跟进ID进行分库分表进行路由算法

方案二:利用Redis的原子性实现

Confirm确认消息

理解Confirm消息确认机制:

  • 消息的去人,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答
  • 生产者进行接收应该,用来确定这条消息是否正常的发送到broker,这种方式也是消息的可靠性投递的核心保障

confirm确认消息流程解析
技术图片
confirm确认消息实现

  • 第一步:在channel上开启确认模式: channel.sconfirmSelect()
  • 第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理。

示例

生产者

/**
 * @author niugang
 */
public class Producer 
	public static void main(String[] args) throws Exception 		
		//1 创建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2 获取C	onnection
		Connection connection = connectionFactory.newConnection();
		
		//3 通过Connection创建一个新的Channel
		Channel channel = connection.createChannel();		
		//4 指定我们的消息投递模式: 消息的确认模式 
		channel.confirmSelect();
		
		String exchangeName = "test_confirm_exchange";
		String routingKey = "confirm.save";
		
		//5 发送一条消息
		String msg = "Hello RabbitMQ Send confirm message!";
		channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
		
		//6 添加一个确认监听
		channel.addConfirmListener(new ConfirmListener() 
			//失败
			// deliveryTag 消息唯一标签
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException 
				System.err.println("-------no ack!-----------");
			

			//成功
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException 
				System.err.println("-------ack!-----------");
			
		);		
	


消费者

/**
 * @author niugang
 */
public class Consumer 

	
	public static void main(String[] args) throws Exception 
		
		
		//1 创建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2 获取C	onnection
		Connection connection = connectionFactory.newConnection();
		
		//3 通过Connection创建一个新的Channel
		Channel channel = connection.createChannel();
		
		String exchangeName = "test_confirm_exchange";
		String routingKey = "confirm.#";
		String queueName = "test_confirm_queue";
		
		//4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
		channel.exchangeDeclare(exchangeName, "topic", true);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//5 创建消费者 
		QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, queueingConsumer);
		
		while(true)
			Delivery delivery = queueingConsumer.nextDelivery();
			String msg = new String(delivery.getBody());
			
			System.err.println("消费端: " + msg);
		
		
		
	


Return消息机制

  • Return Listener 用于处理一些不可路由的消息。
  • 我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中取,然后我们的消费者监听队列,进行消费处理操作。
  • 但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener

配置

在基础API上有一个关键的配置项

Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息。

流程
技术图片
生产者

/**
 * @author niugang
 */
public class Producer 

	public static void main(String[] args) throws Exception 
		
		
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		String exchange = "test_return_exchange";
		String routingKey = "return.save";
		String routingKeyError = "abc.save";
		
		String msg = "Hello RabbitMQ Return Message";
		
		
		channel.addReturnListener(new ReturnListener() 
			@Override
			public void handleReturn(int replyCode, String replyText, String exchange,
					String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException 
				
				System.err.println("---------handle  return----------");
				//响应码 312
				System.err.println("replyCode: " + replyCode);
				//NO_ROUTE
				System.err.println("replyText: " + replyText);
				System.err.println("exchange: " + exchange);
				System.err.println("routingKey: " + routingKey);
				System.err.println("properties: " + properties);
				System.err.println("body: " + new String(body));
			
		);
		
		
		channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
		
		//channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());	
	


消费者

/**
 * @author niugang
 */
public class Consumer 
	public static void main(String[] args) throws Exception 
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		String exchangeName = "test_return_exchange";
		String routingKey = "return.#";
		String queueName = "test_return_queue";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);	
		QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, queueingConsumer);
		
		while(true)
			Delivery delivery = queueingConsumer.nextDelivery();
			String msg = new String(delivery.getBody());
			System.err.println("消费者: " + msg);
			
	


消费者自定义监听

  • 我们一般就是在代码中编写while循环,进行consumer.nextDelivery方法获取下一条消息,然后进行消费处理!
  • 但是我们使用自定义的Consumer更加的方便,解耦性更加的强,也是在实际工作中最常用的使用方式!

实现方式

自定义类,继承 DefaultConsumer
技术图片
生产者

/**
 * @author niugang
 */
public class Producer 

	
	public static void main(String[] args) throws Exception 
		
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		String exchange = "test_consumer_exchange";
		String routingKey = "consumer.save";
		
		String msg = "Hello RabbitMQ Consumer Message";
		
		for(int i =0; i<5; i ++)
			channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
		
		
	

自定义消费者

/**
 * 自定义消费者
 * @author niugang
 */
public class MyConsumer extends DefaultConsumer 


	public MyConsumer(Channel channel) 
		super(channel);
	

	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
		System.err.println("-----------consume message----------");
		//消费标签
		System.err.println("consumerTag: " + consumerTag);
		System.err.println("envelope: " + envelope);
		System.err.println("properties: " + properties);
		System.err.println("body: " + new String(body));
	



消费者

/**
 * @author niugang
 */
public class Consumer 

	
	public static void main(String[] args) throws Exception 
		
		
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		
		String exchangeName = "test_consumer_exchange";
		String routingKey = "consumer.#";
		String queueName = "test_consumer_queue";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		channel.basicConsume(queueName, true, new MyConsumer(channel));
		
	
	

消费端限流

什么是消费端的限流

  • 假设一个场景,首先,我们RabbitMQ服务器上有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况:
  • 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据;
  • RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息(autoAck为false)的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
  • void BasicQos(uint prefetchSize,ushort prefetchCount,bool global);
  • prefetchSize:0 #这里为0表示不限制
  • prefetchCount: 会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack; (prefetchCount等于1即可)
  • global:truefalse 是否将上面设置应用于channel
  • 简单来说,就是上面限制是channel级别的还是consumer级别;

生产者

/**
 * @author niugang
 */
public class Producer 

	
	public static void main(String[] args) throws Exception 
		
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhosy");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		String exchange = "test_qos_exchange";
		String routingKey = "qos.save";
		
		String msg = "Hello RabbitMQ QOS Message";
		
		for(int i =0; i<5; i ++)
			channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
		
		
	


自定义消费者

public class MyConsumer extends DefaultConsumer 


	private Channel channel ;
	
	public MyConsumer(Channel channel) 
		super(channel);
		this.channel = channel;
	

	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
		System.err.println("-----------consume message----------");
		System.err.println("consumerTag: " + consumerTag);
		System.err.println("envelope: " + envelope);
		System.err.println("properties: " + properties);
		System.err.println("body: " + new String(body));
		//ack 注释掉后 控制台只会接收到一条消息
		channel.basicAck(envelope.getDeliveryTag(), false);
		
	


消费者

/**
 * @author niugang
 */
public class Consumer 
	public static void main(String[] args) throws Exception 
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		String exchangeName = "test_qos_exchange";
		String queueName = "test_qos_queue";
		String routingKey = "qos.#";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//1 限流方式  第一件事就是 autoAck设置为 false
		//prefetchCount broker 给 消费者 最大推送消息数量
		channel.basicQos(0, 1, false);
		//手工签收
		channel.basicConsume(queueName, false, new MyConsumer(channel));	
	

消息ACK与重回队列

消费端的手工ACK和NACK

  • 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿
  • 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功

消费端的重回队列

  • 消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker (requeue属性设置)

  • 一般我们在实际应用中,都会关闭重回队列,也就是设置为false;

生产者

/**
 * ack 测试生产者
 * @author niugang
 */
public class Producer 
	public static void main(String[] args) throws Exception 
		
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("localhost");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		String exchange = "test_ack_exchange";
		String routingKey = "ack.save";
		for(int i =0; i<5; i ++)
			
			Map<String, Object> headers = new HashMap<String, Object>();
			headers.put("num", i);
			//设置消息属性
			AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
					.deliveryMode(2).expiration("1000")
					.contentEncoding("UTF-8")
					.headers(headers)
					.build();
			String msg = "Hello RabbitMQ ACK Message " + i;
			channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
		
		
	


自定义消费者

/**
 * 自定义消费者
 * @author niugang
 */
public class MyConsumer extends DefaultConsumer 


	private Channel channel ;
	
	public MyConsumer(Channel channel) 
		super(channel);
		this.channel = channel;
	

	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
		System.err.println("-----------consume message----------");
		System.err.println("body: " + new String(body));
		try 
			Thread.sleep(2000);
		 catch (InterruptedException e) 
			e.printStackTrace();
		
		if((Integer)properties.getHeaders().get("num") == 0) 
			//multiple 是否是批量
			//requeue 重新添加到队列尾部
			channel.basicNack(envelope.getDeliveryTag(), false, true);
		 else 
			channel.basicAck(envelope.getDeliveryTag(), false);
		
		
	

消费者

/**
 * 消费者
 * @author niugang
 */
public class Consumer 
    public static void main(String[] args) throws Exception 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.#";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        // 手工签收 必须要关闭 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));

    


TTL队列/消息

TTL

  • TTL是Time To Live的缩写,也就是生存时间
  • RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。
  • RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的删除。

在控制台创建队列
技术图片
在控制台创建exchange 并添加binding 然后发送消息,然后在队列页面可以看到消息自动被队列剔除
技术图片
原生API设置TTL

	AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
					.expiration("1000").build();

Spring AMQP设置TTL

MessageProperties messageProperties = new MessageProperties();
 //消息过期时间
messageProperties.setExpiration("1000");
Message stringMessage = new Message("Hello Springboot RabbitMQ".getBytes(), messageProperties);

死信队列

死信队列:DLX ,Dead-Letter-Exchange

  • 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX;

消息变成死信有以下几种情况

  • 消息被拒绝(basic.reject/basic.nack) 并且requeue=false;
  • 消息TTL过期
  • 队列达到最大长度

死信队列详细解释

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能;

死信队列具体设置

step1:首先需要设置死信队列的exchange和queue,然后进行绑定

例如定义如下exchange和queue

  • Exchange:dlx.echange
  • Queue:dlx.queue
  • RoutingKey:#

step2:

然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:arguments.put(“x-dead-letter-exchange”,“dlx.exchange”);

这样消息在过期、requeue、队列在达到最大长度时,消息就可以直接路由到死信队列。

技术图片
生产者

/**
 * 私信队列 生产端
 *
 * @author niugang
 */
public class Producer 
    public static void main(String[] args) throws Exception 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //自定义普通的exchange
        String exchange = "test_dlx_exchange";
        String routingKey = "dlx.save";
        String msg = "Hello RabbitMQ DLX Message";
        for (int i = 0; i < 1; i++) 
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    //过期时间为10s
                    .expiration("10000")
                    .build();
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        

    

自定义消息消费

/**
 * 自定义消息消费
 *
 * @author niugang
 */
public class MyConsumer extends DefaultConsumer 
    public MyConsumer(Channel channel) 
        super(channel);
    

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    

生产者

/**
 * 死信队列消费端
 *
 * @author niugang
 */
public class Consumer 
    public static void main(String[] args) throws Exception 
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 这就是一个普通的交换机 和 队列 以及路由
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        String queueName = "test_dlx_queue";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);

        Map<String, Object> agruments = new HashMap<String, Object>(16);
        //设置死信队列exchange  这些具体的参数可以通过rabbitmq控制台查看
        agruments.put("x-dead-letter-exchange", "dlx.exchange");
        //这个agruments属性,要设置到声明队列上
        channel.queueDeclare(queueName, true, false, false, agruments);
        channel.queueBind(queueName, exchangeName, routingKey);

        //要进行死信队列的声明:
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare("dlx.queue", true, false, false, null);
        channel.queueBind("dlx.queue", "dlx.exchange", "#");

        channel.basicConsume(queueName, true, new MyConsumer(channel));
    

技术图片


















23.flink-高级特性-新特性-streamingfliesink介绍代码演示flink-高级特性-新特性-flinksql整合hive添加依赖和jar包和配置(代码片段)

23.Flink-高级特性-新特性-StreamingFlieSink23.1.介绍23.2.代码演示24.Flink-高级特性-新特性-FlinkSQL整合Hive24.1.介绍24.2.版本24.3.添加依赖和jar包和配置24.4.FlinkSQL整合Hive-CLI命令行整合24.5.FlinkSQL整合Hive-代码整合23.Flink-高级特性-新特性-Stream... 查看详情

#22.flink-高级特性-新特性-异步io原理(代码片段)

22.Flink-高级特性-新特性-异步IO-了解22.1.原理22.1.1.异步IO操作的需求https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.htmlAsyncI/O是阿里巴巴贡献给社区的一个呼声非常高的特性,于1.12版本引入。主要目的是为... 查看详情

高级oop特性(代码片段)

PHP不支持的高级OPP特性 PHP不支持通过函数重载实现多态PHP不支持多重继承PHP不支持根据所修改数据类型为操作符赋予新的含义对象克隆克隆实例 在对象前面添加clone关键字来克隆对象,对象的属性值都被继承,克隆的对... 查看详情

20.flink高级特性--新特性--双流joinjoin的分类api代码演示-windowjoin代码演示-intervaljoin(代码片段)

20.Flink高级特性–新特性–双流Join20.1.join的分类20.2.API20.3.代码演示-WindowJoin20.4.代码演示-IntervalJoin20.Flink高级特性–新特性–双流Join20.1.join的分类双流Join是Flink面试的高频问题。一般情况下说明以下几点就可以hold了:Join大... 查看详情

23.flink-高级特性-新特性-streamingfliesink介绍代码演示flink-高级特性-新特性-flinksql整合hive添加依赖和jar包和配置(代码片段)

23.Flink-高级特性-新特性-StreamingFlieSink23.1.介绍23.2.代码演示24.Flink-高级特性-新特性-FlinkSQL整合Hive24.1.介绍24.2.版本24.3.添加依赖和jar包和配置24.4.FlinkSQL整合Hive-CLI命令行整合24.5.FlinkSQL整合Hive-代码整合23.Flink-高级特性-新特性-Stream... 查看详情

23.flink-高级特性-新特性-streamingfliesink介绍代码演示flink-高级特性-新特性-flinksql整合hive添加依赖和jar包和配置(代码片段)

23.Flink-高级特性-新特性-StreamingFlieSink23.1.介绍23.2.代码演示24.Flink-高级特性-新特性-FlinkSQL整合Hive24.1.介绍24.2.版本24.3.添加依赖和jar包和配置24.4.FlinkSQL整合Hive-CLI命令行整合24.5.FlinkSQL整合Hive-代码整合23.Flink-高级特性-新特性-Stream... 查看详情

python高级特性(代码片段)

切片(tuple)有了切片操作,很多地方循环就不再需要了。Python的切片非常灵活,一行代码就可以实现很多行循环才能完成的操作。操作对象:list、tuple、str;替代了其他语言的截取函数,如substring()应用场景:对经常取指定索... 查看详情

python高级特性(代码片段)

...、迭代器此文章参考廖雪峰大神的官网,地址:高级特性-廖雪峰的官方网站(liaoxuefeng.com)一、切片在python的使用中,对于列表、元组的元素取值是非常常见的,例如:注意:切片是顾头不顾尾的>> 查看详情

rabbitmq--高级特性(代码片段)

在上一篇文章讲解MQ消息可靠性投递和幂等性中有提到confirm机制的重要性,现在更相信的说明一下一、Confirm机制  Confirm就是消息确认,当Producer发送消息,如果Broker收到消息,会回复一个应答,我们可以以此来确认消息是否... 查看详情

021css高级特性(代码片段)

一:元素的显示与影藏1.比较常见的单词  dispaly,visibility,overflow  2.display案例  如果影藏了,这个元素就看不见了,然后也不保留位置1<!DOCTYPEhtml>2<htmllang="en">3<head>4<metacharset="UTF-8">5<title>Document</... 查看详情

jvm高级特性与实践(十三):线程实现与java线程调度(代码片段)

JVM高级特性与实践(一):Java内存区域与内存溢出异常JVM高级特性与实践(二):对象存活判定算法(引用)与回收JVM高级特性与实践(三):垃圾收集算法与垃圾收集器实现JVM高级特... 查看详情

ts高级特性api(代码片段)

PartialRequiredReadonlyPick<T,KextendskeyofT>Record<Kextendskeyofany,T>Exclude<T,U>Extract<T,U>Omit<T,Kextendskeyofany>PartialPartial将属性变为可选属性。举个栗子,iUser这个接口name和a 查看详情

ts高级特性api(代码片段)

PartialRequiredReadonlyPick<T,KextendskeyofT>Record<Kextendskeyofany,T>Exclude<T,U>Extract<T,U>Omit<T,Kextendskeyofany>PartialPartial将属性变为可选属性。举个栗子,iUser这个接口name和a 查看详情

ts高级特性api(代码片段)

PartialRequiredReadonlyPick<T,KextendskeyofT>Record<Kextendskeyofany,T>Exclude<T,U>Extract<T,U>Omit<T,Kextendskeyofany>PartialPartial将属性变为可选属性。举个栗子,iUser这个接口name和a 查看详情

python的高级特性(代码片段)

...、迭代器此文章参考廖雪峰大神的官网,地址:高级特性-廖雪峰的官方网站(liaoxuefeng.com)一、切片在python的使用中,对于列表、元组的元素取值是非常常见的,例如:注意:切片是顾头不顾尾的>> 查看详情

消息队列rabbitmq高级特性(代码片段)

一.消息的可靠投递在使用RabbitMq的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败的场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性rabbitMQ整个消息投递过程为: producer-> rabbitMQbroker->exchange-... 查看详情

rabbitmqrabbitmq入门程序——helloworld-转(代码片段)

   首先说一下,MQ全称为MessageQueue消息队列是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消... 查看详情

springboot高级特性-缓存(代码片段)

缓存:将相应数据存储起来以避免数据的重复创建、处理和传输,可有效提高性能springboot中使用缓存可以缓存方法的返回值等等避免多次查询数据库springboot的缓存有以下层级关系CachingProvider缓存提供者—>管理和控制... 查看详情