#22.flink-高级特性-新特性-异步io原理(代码片段)

to.to to.to     2022-10-21     268

关键词:

22.Flink-高级特性-新特性-异步IO-了解

22.1.原理

22.1.1.异步IO操作的需求

https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html

Async I/O是阿里巴巴贡献给社区的一个呼声非常高的特性,于1.12版本引入。主要目的是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题。

流计算系统中经常需要与外部系统进行交互,我们通常的做法如向数据库发送用户a的查询请求,然后等待结果返回,在这之前,我们的程序无法发送用户b的查询请求。这是一种同步访问方式,如下图所示:

22.2.API

https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> 

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception 
        client = new DatabaseClient(host, post, credentials);
    

    @Override
    public void close() throws Exception 
        client.close();
    

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception 

        // issue the asynchronous request, receive a future for result
        final Future<String> result = client.query(key);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        CompletableFuture.supplyAsync(new Supplier<String>() 

            @Override
            public String get() 
                try 
                    return result.get();
                 catch (InterruptedException | ExecutionException e) 
                    // Normally handled explicitly.
                    return null;
                
            
        ).thenAccept( (String dbResult) -> 
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        );
    


// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

注意:如果要使用异步IO,对应Client有一定要求:
1.该Client要支持发送异步请求,如vertx。
2.如果Client不支持可以使用线程池来模拟异步请求。

DROP TABLE IF EXISTS `t_category`;
CREATE TABLE `t_category` (
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of t_category
-- ----------------------------
INSERT INTO `t_category` VALUES ('1', '手机');
INSERT INTO `t_category` VALUES ('2', '电脑');
INSERT INTO `t_category` VALUES ('3', '服装');
INSERT INTO `t_category` VALUES ('4', '化妆品');
INSERT INTO `t_category` VALUES ('5', '食品');
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.*;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 使用异步io的先决条件
 * 1.数据库(或key/value存储)提供支持异步请求的client。
 * 2.没有异步请求客户端的话也可以将同步客户端丢到线程池中执行作为异步客户端。
 */
public class ASyncIODemo 
    public static void main(String[] args) throws Exception 
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        //数据源中只有id
        //DataStreamSource[1,2,3,4,5]
        DataStreamSource<CategoryInfo> categoryDS = env.addSource(new RichSourceFunction<CategoryInfo>() 
            private Boolean flag = true;
            @Override
            public void run(SourceContext<CategoryInfo> ctx) throws Exception 
                Integer[] ids = 1, 2, 3, 4, 5;
                for (Integer id : ids) 
                    ctx.collect(new CategoryInfo(id, null));
                
            
            @Override
            public void cancel() 
                this.flag = false;
            
        );
        //3.Transformation


        //方式一:Java-vertx中提供的异步client实现异步IO
        //unorderedWait无序等待
        SingleOutputStreamOperator<CategoryInfo> result1 = AsyncDataStream
                .unorderedWait(categoryDS, new ASyncIOFunction1(), 1000, TimeUnit.SECONDS, 10);

        //方式二:MySQL中同步client+线程池模拟异步IO
        //unorderedWait无序等待
        SingleOutputStreamOperator<CategoryInfo> result2 = AsyncDataStream
                .unorderedWait(categoryDS, new ASyncIOFunction2(), 1000, TimeUnit.SECONDS, 10);

        //4.Sink
        result1.print("方式一:Java-vertx中提供的异步client实现异步IO \\n");
        result2.print("方式二:MySQL中同步client+线程池模拟异步IO \\n");

        //5.execute
        env.execute();
    


@Data
@NoArgsConstructor
@AllArgsConstructor
class CategoryInfo 
    private Integer id;
    private String name;


//MySQL本身的客户端-需要把它变成支持异步的客户端:使用vertx或线程池
class MysqlSyncClient 
    private static transient Connection connection;
    private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
    private static final String URL = "jdbc:mysql://localhost:3306/bigdata";
    private static final String USER = "root";
    private static final String PASSWORD = "root";

    static 
        init();
    

    private static void init() 
        try 
            Class.forName(JDBC_DRIVER);
         catch (ClassNotFoundException e) 
            System.out.println("Driver not found!" + e.getMessage());
        
        try 
            connection = DriverManager.getConnection(URL, USER, PASSWORD);
         catch (SQLException e) 
            System.out.println("init connection failed!" + e.getMessage());
        
    

    public void close() 
        try 
            if (connection != null) 
                connection.close();
            
         catch (SQLException e) 
            System.out.println("close connection failed!" + e.getMessage());
        
    

    public CategoryInfo query(CategoryInfo category) 
        try 
            String sql = "select id,name from t_category where id = "+ category.getId();
            Statement statement = connection.createStatement();
            ResultSet rs = statement.executeQuery(sql);
            if (rs != null && rs.next()) 
                category.setName(rs.getString("name"));
            
         catch (SQLException e) 
            System.out.println("query failed!" + e.getMessage());
        
        return category;
    


/**
 * 方式一:Java-vertx中提供的异步client实现异步IO
 */
class ASyncIOFunction1 extends RichAsyncFunction<CategoryInfo, CategoryInfo> 
    private transient SQLClient mySQLClient;

    @Override
    public void open(Configuration parameters) throws Exception 
        JsonObject mySQLClientConfig = new JsonObject();
        mySQLClientConfig
                .put("driver_class", "com.mysql.jdbc.Driver")
                .put("url", "jdbc:mysql://localhost:3306/bigdata")
                .put("user", "root")
                .put("password", "root")
                .put("max_pool_size", 20);

        VertxOptions options = new VertxOptions();
        options.setEventLoopPoolSize(10);
        options.setWorkerPoolSize(20);
        Vertx vertx = Vertx.vertx(options);
        //根据上面的配置参数获取异步请求客户端
        mySQLClient = JDBCClient.createNonShared(vertx, mySQLClientConfig);
    

    //使用异步客户端发送异步请求
    @Override
    public void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception 
        mySQLClient.getConnection(new Handler<AsyncResult<SQLConnection>>() 
            @Override
            public void handle(AsyncResult<SQLConnection> sqlConnectionAsyncResult) 
                if (sqlConnectionAsyncResult.failed()) 
                    return;
                
                SQLConnection connection = sqlConnectionAsyncResult.result();
                connection.query("select id,name from t_category where id = " +input.getId(), new Handler<AsyncResult<io.vertx.ext.sql.ResultSet>>() 
                    @Override
                    public void handle(AsyncResult<io.vertx.ext.sql.ResultSet> resultSetAsyncResult) 
                        if (resultSetAsyncResult.succeeded()) 
                            List<JsonObject> rows = resultSetAsyncResult.result().getRows();
                            for (JsonObject jsonObject : rows) 
                                CategoryInfo categoryInfo = new CategoryInfo(jsonObject.getInteger("id"), jsonObject.getString("name"));
                                resultFuture.complete(Collections.singletonList(categoryInfo));
                            
                        
                    
                );
            
        );
    
    @Override
    public void close() throws Exception 
        mySQLClient.close();
    

    @Override
    public void timeout(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception 
        System.out.println("async call time out!");
        input.setName("未知");
        resultFuture.complete(Collections.singleton(input));
    


/**
 * 方式二:同步调用+线程池模拟异步IO
 */
class ASyncIOFunction2 extends RichAsyncFunction<CategoryInfo, CategoryInfo> 
    private transient MysqlSyncClient client;
    private ExecutorService executorService;//线程池

    @Override
    public void open(Configuration parameters) throws Exception 
        super.open(parameters);
        client = new MysqlSyncClient();
        executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    

    //异步发送请求
    @Override
    public void asyncInvoke(CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws 查看详情  

javascript高级之ecmasript78910新特性(代码片段)

第3章ECMASript7新特性3.1.Array.prototype.includesIncludes方法用来检测数组中是否包含某个元素,返回布尔类型值3.2.指数操作符在ES7中引入指数运算符「**」,用来实现幂运算,功能与Math.pow结果相同第4章ECMASript8新特性4.1.async... 查看详情

vue复习全家桶

...,父子组件,生命周期  (Vue生命周期和考点) Vue高级特性一、自定义v-model  (Vue自定义v-model实现Vue的双向数据绑定--Vue高级特性)二、$nextTickrefs  (Vue自定义v-model实现Vue的双向数据绑定--Vue高级特性)三、slot  ... 查看详情

23.flink-高级特性-新特性-streamingfliesink介绍代码演示flink-高级特性-新特性-flinksql整合hive添加依赖和jar包和配置(代码片段)

23.Flink-高级特性-新特性-StreamingFlieSink23.1.介绍23.2.代码演示24.Flink-高级特性-新特性-FlinkSQL整合Hive24.1.介绍24.2.版本24.3.添加依赖和jar包和配置24.4.FlinkSQL整合Hive-CLI命令行整合24.5.FlinkSQL整合Hive-代码整合23.Flink-高级特性-新特性-Stream... 查看详情

ecmascript2018(es9)新特性简介

简介ES9是ECMA协会在2018年6月发行的一个版本,因为是ECMAScript的第九个版本,所以也称为ES9.今天我们讲解一下ES9的新特性。ES9引入了3大特性和2个小的特性,我们接下来一一讲解。异步遍历在ES6中,引入了同步iteration的概念,随... 查看详情

23.flink-高级特性-新特性-streamingfliesink介绍代码演示flink-高级特性-新特性-flinksql整合hive添加依赖和jar包和配置(代码片段)

23.Flink-高级特性-新特性-StreamingFlieSink23.1.介绍23.2.代码演示24.Flink-高级特性-新特性-FlinkSQL整合Hive24.1.介绍24.2.版本24.3.添加依赖和jar包和配置24.4.FlinkSQL整合Hive-CLI命令行整合24.5.FlinkSQL整合Hive-代码整合23.Flink-高级特性-新特性-Stream... 查看详情

23.flink-高级特性-新特性-streamingfliesink介绍代码演示flink-高级特性-新特性-flinksql整合hive添加依赖和jar包和配置(代码片段)

23.Flink-高级特性-新特性-StreamingFlieSink23.1.介绍23.2.代码演示24.Flink-高级特性-新特性-FlinkSQL整合Hive24.1.介绍24.2.版本24.3.添加依赖和jar包和配置24.4.FlinkSQL整合Hive-CLI命令行整合24.5.FlinkSQL整合Hive-代码整合23.Flink-高级特性-新特性-Stream... 查看详情

es9的新特性:异步遍历asynciteration

...行遍历操作呢?今天要给大家讲一讲ES9中的异步遍历的新特性Asynciteration。异步遍历在讲解异步遍历之前,我们先回想一下ES6中的同步遍历。根据ES6的定义,iteration主要由三部分组成:Iterable先看下Iterable的定义:interfaceIterable 查看详情

h5新特性websocket

websocket也是html5的新增加内容之一,号称是下一代客户端/服务器异步通信办法,私以为虽然有点吹牛的成分,但是以后说不定能成为异步通信的半壁江山,至于取代ajax,我觉的应该不会。websocket的一个很有意思的特点就是双向通... 查看详情

es2017新特性

...布一个版本并将年号作为版本号;算了直接看下es2017的新特性:1.异步函数ECMAScript2017功能“异步函数 ”由BrianTerlson提出。其是Generator的语法糖,简单讲就是用async关键字代替了*,用await(只能在异步函数中使用) 查看详情

20.flink高级特性--新特性--双流joinjoin的分类api代码演示-windowjoin代码演示-intervaljoin(代码片段)

20.Flink高级特性–新特性–双流Join20.1.join的分类20.2.API20.3.代码演示-WindowJoin20.4.代码演示-IntervalJoin20.Flink高级特性–新特性–双流Join20.1.join的分类双流Join是Flink面试的高频问题。一般情况下说明以下几点就可以hold了:Join大... 查看详情

dubbo3高级特性「框架与服务」框架与服务的异步调用实践以及开发模式

异步调用在Dubbo中发起异步调用机制,从而提高对应的服务的调用的吞吐能力和调用机制特性说明技术背景从2.7.0开始,Dubbo的所有异步编程接口开始以CompletableFuture为基础,基于NIO的非阻塞实现并行调用,客户端不需要启动多线... 查看详情

ios系统版本特性(7版本--14版本)

文章目录iOS14新特性:iOS13新特性:iOS12新特性:iOS11新特性:iOS10新特性:iOS9新特性:iOS8新特性:iOS7新特性:iOS14新特性:1.Widgets(小组件)小组件这个功能其实原来就有的,只不过原来是在负一页面(首页左滑)中显示,在iOS14中,... 查看详情

web前端面试高频考点——vue的高级特性(动态组件异步加载keep-alivemixinvuexvue-router)(代码片段)

系列文章目录文章目录系列文章目录一、Vue高级特性1、动态组件2、vue异步加载组件3、vue缓存组件(keep-alive)4、mixin二、Vuex1、Vuex基本概念2、用于Vue组件三、Vue-router1、动态路由2、懒加载一、Vue高级特性1、动态组件按... 查看详情

servlet3.0新特性——异步处理

Servlet3.0之前,一个普通Servlet的主要工作流程大致如下:首先,Servlet接收到请求之后,可能需要对请求携带的数据进行一些预处理;接着,调用业务接口的某些方法,以完成业务处理;最后,根据处理的结果提交响应,Servlet线... 查看详情

java高级特性编程及实战答案

阿里mq消息可靠性,幂等如何保证分布式锁的实现方案比较,为什么选择zookeeper,zookeeper一致性协议原理线程池参数,阻塞队列实现一致性Hash解决什么问题,如何实现?虚拟节点的作用?Java锁的实现方式,比较?AQS实现原理?公平非公平实现... 查看详情

3.java高级特性熟悉网络编程(简单原理)

Java高级特性–网络编程利用网络都能做些什么DNS:DomainNameSystem,域名解析系统DNS是一个分布式服务器系统,实现了域名和ip地址的隐射我们请求域名(www.baidu.com),会先进行DNS的解析,得到ip地址后,继续请... 查看详情

mycat核心概念工作原理及高级特性分析(代码片段)

...行分析数据库中间件中重要的mycat,在使用上有一些高级特性及核心概念、分片解决的办法做了 查看详情