Elasticsearch-hadoop & Elasticsearch-spark sql - 语句跟踪扫描&滚动

     2023-03-27     175

关键词:

【中文标题】Elasticsearch-hadoop & Elasticsearch-spark sql - 语句跟踪扫描&滚动【英文标题】:Elasticsearch-hadoop & Elasticsearch-spark sql - Tracing of statements scan&scroll 【发布时间】:2015-11-13 07:27:12 【问题描述】:

我们正在尝试将 ES(1.7.2,4 节点集群)与 Spark(1.5.1,使用 hive 编译和 hadoop 与 scala 2.11,4 节点集群)集成,hdfs 进入方程(hadoop 2.7,4节点)和thrift jdbc服务器和elasticsearch-hadoop-2.2.0-m1.jar

因此,在 ES 上执行语句有两种方式。

    使用 scala 激发 SQL

    val conf = new  SparkConf().setAppName("QueryRemoteES").setMaster("spark://node1:37077").set("spark.executor.memory","2g")
    conf.set("spark.logConf", "true")
    conf.set("spark.cores.max","20")
    conf.set("es.index.auto.create", "false")
    conf.set("es.batch.size.bytes", "100mb")
    conf.set("es.batch.size.entries", "10000")
    conf.set("es.scroll.size", "10000")
    conf.set("es.nodes", "node2:39200")
    conf.set("es.nodes.discovery","true")
    conf.set("pushdown", "true")
    
    sc.addJar("executorLib/elasticsearch-hadoop-2.2.0-m1.jar")
    sc.addJar("executorLib/scala-library-2.10.1.jar")
    
    sqlContext.sql("CREATE TEMPORARY TABLE geoTab USING org.elasticsearch.spark.sql OPTIONS (resource 'geo_2/kafkain')" )
    
    val all: DataFrame = sqlContext.sql("SELECT count(*) FROM geoTab WHERE transmittersID='262021306841042'")
    .....
    

    Thrift 服务器(在 Spark 上执行的代码)

    ....
    polledDataSource = new ComboPooledDataSource()
    polledDataSource.setDriverClass("org.apache.hive.jdbc.HiveDriver")
    polledDataSource.setJdbcUrl("jdbc:hive2://node1:30001")
    polledDataSource.setMaxPoolSize(5)
    dbConnection = polledDataSource.getConnection
    dbStatement = dbConnection.createStatement
    
    val dbResult = dbStatement.execute("CREATE TEMPORARY EXTERNAL TABLE IF NOT EXISTS geoDataHive6(transmittersID STRING,lat DOUBLE,lon DOUBLE) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'geo_2/kafkain','es.query'='\"query\":\"term\":\"transmittersID\":\"262021306841042\"','es.nodes'='node2','es.port'='39200','es.nodes.discovery' = 'false','es.mapping.include' = 'trans*,point.*','es.mapping.names' = 'transmittersID:transmittersID,lat:point.lat,lon:point.lon','pushdown' = 'true')")
    
    dbStatement.setFetchSize(50000)
    dbResultSet = dbStatement.executeQuery("SELECT count(*) FROM geoDataHive6")
    .....
    

我有以下问题,由于它们是相互关联的,我决定将它们打包到堆栈上的一个问题中:

    似乎使用 Spark SQL 的方法支持下推 WHERE 后面的内容(无论是否指定 es.query),执行时间相同并且可以接受。但是解决方案 1 绝对不支持聚合函数的 pushdow,即呈现的 count(*) 不会在 ES 一侧执行,但只有在检索到所有数据之后 - ES 返回行并且 Spark SQL 对它们进行计数。请确认这是否是正确的行为

    第一个解决方案的行为很奇怪,无论下推是真还是假,时间都是相等的

    解决方案 2 似乎不支持下推,我尝试以什么方式指定子查询并不重要,无论是表定义的一部分还是语句的 WHERE 子句,似乎是只需获取所有巨大的索引,然后对其进行数学计算。是不是 Thrift-hive 无法对 ES 进行下推?

    我想在弹性搜索中跟踪查询,我做了以下设置:

    //logging.yml
    index.search.slowlog: TRACE, index_search_slow_log_file
    index.indexing.slowlog: TRACE, index_indexing_slow_log_file
    
    additivity:
      index.search.slowlog: true
      index.indexing.slowlog: true
    

所有 index.search.slowlog.threshold.query、index.search.slowlog.threshold.fetch 甚至 index.indexing.slowlog.threshold.index 都设置为 0ms。 而且我确实在慢日志文件中看到了从感觉执行的常见语句(所以它可以工作)。但我没有看到针对 ES 执行的 Spark SQL 或 thrift 语句。我想这些是 scan&scroll 语句,因为如果我从感觉执行 scan&scroll,这些也不会被记录。是否有可能以某种方式在 ES 一侧跟踪扫描和滚动?

【问题讨论】:

【参考方案1】:

    据我所知,这是一种预期行为。我所知道的所有来源的行为方式都完全相同,而且直觉上它是有道理的。 SparkSQL 专为分析查询而设计,在本地获取数据、缓存和处理更有意义。另见Does spark predicate pushdown work with JDBC?

    我认为conf.set("pushdown", "true") 根本没有任何作用。如果要配置特定于连接的设置,则应将其作为OPTION 映射传递,就像在第二种情况下一样。使用es 前缀应该也可以

    这确实很奇怪。 Martin Senne 使用 PostgreSQL 报告了 a similar issue,但我无法重现。

【讨论】:

【参考方案2】:

我在 elasticsearch 讨论组与 Costin Leau 讨论后,他指出了以下内容,我应该与您分享:

您的设置存在许多问题:

    您提到使用 Scala 2.11,但使用的是 Scala 2.10。请注意,如果您想选择您的 Scala 版本,应使用 elasticsearch-sparkelasticsearch-hadoop 仅提供 Scala 2.10 的二进制文件。

    下推功能只能通过 Spark 数据源使用。如果您不使用这种类型的声明,pushdown 不会传递给 ES(这就是 Spark 的工作方式)。因此声明pushdown 是不相关的。

    1234563 DS)

    使用临时表确实算作数据源,但是您需要在那里使用pushdown。如果你不这样做,它会默认被激活,因此你为什么看不到你的运行之间的区别;您没有更改任何相关参数。

    Spark 不会下推计数和其他聚合。根据 Databricks 团队的说法,未来可能会有一些东西,但目前没有任何东西。对于计数,您可以使用dataFrame.rdd.esCount 进行快速呼叫。但这是个特例。

    我不确定 Thrift 服务器是否真的算作数据源,因为它从 Hive 加载数据。您可以通过启用将 org.elasticsearch.hadoop.spark 包登录到 DEBUG 来仔细检查这一点。您应该查看 SQL 是否确实被转换为 DSL。

我希望这会有所帮助!

【讨论】:

我找不到任何关于 elasticsearch-spark 的信息。我认为elasticsearch-hadoop 包含 Spark 支持? 我不明白你的问题。 是否有像elasticsearch-hadoop 一样的名为elasticsearch-spark 的可下载项目?我在github上没有找到。

是否可以在 ElasticSearch 中使用 presto 或 Hive (ElasticSearch-Hadoop) 的任何 ES 连接器进行 JOIN 操作?

】是否可以在ElasticSearch中使用presto或Hive(ElasticSearch-Hadoop)的任何ES连接器进行JOIN操作?【英文标题】:IsJOINoperationpossibleinElasticSearchusinganyESConnectorforprestoorHive(ElasticSearch-Hadoop)?【发布时间】:2015-08-1322:30:52【问题描述】:我们知... 查看详情

elasticsearch安装篇

Elasticsearch安装篇1.先去elasticsearch官网  https://www.elastic.co/Jar下载http://jcenter.bintray.com/org/elasticsearch/elasticsearch-hadoop下载好后直接解压tar-zxvf  elasticsearch-2.4.3.tar.gz修改conf 查看详情

elasticsearch系列4---windows下安装kibana(代码片段)

...ES外它还可以跟Elastic家族的其他组件进行整合如logstash、Elasticsearch-Hadoop等。相比我们之前讲过的elasticsearch-head,Kibana的功能要强大的多,更重要的是,Kibana为初学者准备了13059条的航班线路 查看详情

Pyspark 将 rdd 转换为具有空值的数据帧

...】:2017-01-1312:02:01【问题描述】:我正在使用pyspark(1.6)和elasticsearch-hadoop(5.1.1)。我通过以下方式将我的数据从elasticsearch转换为rdd格式:es_rdd=sc.newAPIHadoopRDD(inputFormat 查看详情

SparkContext 对象没有属性 esRDD(elasticsearch-spark 连接器)

...17-02-0412:09:39【问题描述】:在spark-shell中,我成功使用了elasticsearch-hadoop连接器(特别是为spark开发的连接器:elasti 查看详情

keil调试问题记录

1、错误类型:L6218E:Underfinedsymbol&&&&&&&&&(referredform&&&&&.o).现象说明:明明已经定义了&&&&&&&&&函数,也有&&&&& 查看详情

Java String 将 '&' 替换为 & 而不是 & 到 &

】JavaString将\\\'&\\\'替换为&而不是&到&【英文标题】:JavaStringReplace\'&\'with&butnot&to&JavaString将\'&\'替换为&而不是&到&【发布时间】:2014-10-2223:55:01【问题描述】:我有一个大字... 查看详情

besttimetobuyandsellstocki&&ii&&iii

题目1:BestTimetoBuyandSellStockSayyouhaveanarrayforwhichthe ith elementisthepriceofagivenstockonday i.Ifyouwereonlypermittedtocompleteatmostonetransaction(ie,buyoneandselloneshareofthesto 查看详情

besttimetobuyandsellstocki&&ii&&iii

题目1:BestTimetoBuyandSellStockSayyouhaveanarrayforwhichthe ith elementisthepriceofagivenstockonday i.Ifyouwereonlypermittedtocompleteatmostonetransaction(ie,buyoneandselloneshareofthesto 查看详情

为啥是'&&'而不是'&'?

】为啥是\\\'&&\\\'而不是\\\'&\\\'?【英文标题】:Why\'&&\'andnot\'&\'?为什么是\'&&\'而不是\'&\'?【发布时间】:2011-11-1200:00:06【问题描述】:为什么&&优于&和||优于|?我问了一个编程多年... 查看详情

要求的参考折叠规则的简明解释:(1) A& & -> A& , (2) A& && -> & , (3) && &a

】要求的参考折叠规则的简明解释:(1)A&&->A&,(2)A&&&->&,(3)&&&->&,&(4A&&&&->&&【英文标题】:Conciseexplanationofreferencecollapsingrulesrequested:(1)A&&- 查看详情

aninterestingcombinationalproblem

...nistoshow $$( extrmthelengthofblack)-( extrmthelengthofwhite)=1$$Forexample,if$m=3,n=5$, $$eginarrayc*310&&&&&&3&&&&&&6&&&&&&9&&&&&&12&&&&&&15\\mid&lacksqu 查看详情

void *p = &&abc; 中的 && 是啥意思

】void*p=&&abc;中的&&是啥意思【英文标题】:Whatdoes&&meaninvoid*p=&&abc;void*p=&&abc;中的&&是什么意思【发布时间】:2011-08-3116:18:42【问题描述】:我遇到了一段代码void*p=&&abc;。&&... 查看详情

直方图均衡化的计算以及matlab实现

...算一.手算直方图均衡例如一个矩阵[源矩阵=left[eginmatrix4&4&4&4&4&4&4&04&5&5&5&5&5&4&04&5&6&6&6&5&4&04&5&6&7&6&5&4&04&5&6&6&6&5&4&04&5&... 查看详情

lvs原理介绍(dr)

1.NAT 地址转换技术2.DR 直接路由模式*****3.ip隧道(ip tunneling)4.FullNAT模式&&&&&&&&&&&&&&&&&&&&&&&&&& 查看详情

同构应用程序,TypeORM && TypeScript && Express && Webpack 设置问题

】同构应用程序,TypeORM&&TypeScript&&Express&&Webpack设置问题【英文标题】:Isomoprhicapplication,problemwithTypeORM&&TypeScript&&Express&&Webpacksetup【发布时间】:2020-01-0513:40:56【问题描述】:我正在尝试制作... 查看详情

cookie&&session&&token

CookiesCookie的由来:HTTP本身是一个无状态的request/response协议.server接收一个来自client的request,处理完以后返回一个response。可是这个过程中,server差点儿没有什么信息能够用来判定是哪个client(用户)发来的request,也无法记录用户的请求... 查看详情

&&(短路与)&|||(短路或)

Java语言中的&&(短路与)、&、|、||(短路或)区别:  &&是逻辑   &是位  当&两边是整数时执行的是位运算,而两边是boolean值时执行的是逻辑运算。  代码如下:    运行结果如下:   ... 查看详情