spark集成hbase与hive数据转换与代码练习

超大的皮卡丘 超大的皮卡丘     2022-08-22     172

关键词:

  帮一个朋友写个样例,顺便练手啦~一直在做平台的各种事,但是代码后续还要精进啊。。。

 1 import java.util.Date
 2 
 3 import org.apache.hadoop.hbase.HBaseConfiguration
 4 import org.apache.hadoop.hbase.client.{Put, Scan, Result}
 5 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 6 import org.apache.hadoop.hbase.mapred.TableOutputFormat
 7 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 8 import org.apache.hadoop.hbase.util.Bytes
 9 import org.apache.hadoop.mapred.JobConf
10 import org.apache.log4j.{Level, Logger}
11 import org.apache.spark.rdd.RDD
12 import org.apache.spark.sql.DataFrame
13 import org.apache.spark.sql.hive.HiveContext
14 import org.apache.spark.{SparkContext, SparkConf}
15 
16 /**
17  * Created by ysy on 2/10/17.
18  */
19 object test {
20 
21     case class ysyTest(LS_certifier_no: String,loc: String,LS_phone_no: String)
22 
23     def main (args: Array[String]) {
24       val sparkConf = new SparkConf().setMaster("local").setAppName("ysy").set("spark.executor.memory", "1g")
25       val sc = new SparkContext(sparkConf)
26       val sqlContext = new HiveContext(sc)
27       sqlContext.sql("drop table pkq")
28       val columns = "LS_certifier_no,LS_location,LS_phone_no"
29       val hbaseRDD = dataInit(sc,"EVENT_LOG_LBS",columns).map(data =>{
30         val id =Bytes.toString(data._2.getValue("f1".getBytes, "LS_certifier_no".getBytes))
31         val loc = Bytes.toString(data._2.getValue("f1".getBytes, "LS_location".getBytes))
32         val phone = Bytes.toString(data._2.getValue("f1".getBytes, "LS_phone_no".getBytes))
33         (id,loc,phone)
34       })
35       val showData = hbaseRDD.foreach(println)
36       val datas = hbaseRDD.filter(_._1 != null).filter(_._2 != null).filter(_._3 != null)
37       val hiveDF = initHiveTableFromHbase(sc:SparkContext,sqlContext,datas)
38       writeHiveTableToHbase(sc,hiveDF)
39 
40 
41     }
42 
43   def initHiveTableFromHbase(sc:SparkContext,sqlContext: HiveContext,hiveRDD:RDD[(String,String,String)]) : DataFrame = {
44     val hRDD = hiveRDD.map(p => ysyTest(p._1,p._2,p._3))
45       val hiveRDDSchema = sqlContext.createDataFrame(hiveRDD)
46       hiveRDDSchema.registerTempTable("pkq")
47       hiveRDDSchema.show(10)
48       hiveRDDSchema
49   }
50 
51   def dataInit(sc : SparkContext,tableName : String,columns : String) : RDD[(ImmutableBytesWritable,Result)] = {
52     val configuration = HBaseConfiguration.create()
53     configuration.addResource("hbase-site.xml")
54     configuration.set(TableInputFormat.INPUT_TABLE,tableName )
55     val scan = new Scan
56     val column = columns.split(",")
57     for(columnName <- column){
58       scan.addColumn("f1".getBytes(),columnName.getBytes())
59     }
60     val hbaseRDD = sc.newAPIHadoopRDD(configuration,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
61     System.out.println(hbaseRDD.count())
62     hbaseRDD
63   }
64 
65   def writeHiveTableToHbase(sc : SparkContext,hiveDF : DataFrame) = {
66     val configuration = HBaseConfiguration.create()
67     configuration.addResource("hbase-site.xml ")
68     configuration.set(TableOutputFormat.OUTPUT_TABLE,"EVENT_LOG_LBS")
69     val jobConf = new JobConf(configuration)
70     jobConf.setOutputFormat(classOf[TableOutputFormat])
71 
72     val putData = hiveDF.map(data =>{
73       val LS_certifier_no = data(0)
74       val LS_location = data(1)
75       val LS_phone_no = data(2)
76       (LS_certifier_no,LS_location,LS_phone_no)
77     })
78 
79     val rdd = putData.map(datas =>{
80       val put = new Put(Bytes.toBytes(Math.random()))
81       put.addColumn("f1".getBytes(),"LS_certifier_no".getBytes(),Bytes.toBytes(datas._1.toString))
82       put.addColumn("f1".getBytes(),"LS_location".getBytes(),Bytes.toBytes(datas._2.toString))
83       put.addColumn("f1".getBytes(),"LS_phone_no".getBytes(),Bytes.toBytes(datas._3.toString))
84       (new ImmutableBytesWritable, put)
85     })
86     val showRdd = rdd.foreach(println)
87     rdd.saveAsHadoopDataset(jobConf)
88   }
89 
90   }

hbase与hive集成(代码片段)

...据,不适合做关联查询,join延迟较低,高效的数据访问集成使用案例一建立Hive表,关联HBase表,插入数据到Hive表的同时能够影响HBase表在Hive中创建表同时关联HBaseCREATETABLEhive_hbase_emp_table(empnoint,enamestring,jobstring,mgrint,hiredatestring,s... 查看详情

spark访问与hbase关联的hive表

...除hive表,hbase表不受影响;hive使用的还是存储在hbase中的数据。这里创 查看详情

Spark 2.3.0 SQL 无法将数据插入 hive hbase 表

...发布时间】:2018-06-0411:51:09【问题描述】:使用与hive2.2.0集成的Spark2.3thriftserver。从火花直线运行。尝试将数据插入hivehbase表(以hbase作为存储的hive表)。插入hive本机表是可以的。插入hivehba 查看详情

hbase(代码片段)

续接(三)3habse(1.2)集成hive(1.2.1)===》不兼容集成,需要自己编译!!!    hive1.x与hbase0.98版本兼容    hive2.x与hbase1.x版本以上兼容    hive0.x与hbase0.98以下兼容  Hive提供了与HBase的集成,使得能够在HBase表... 查看详情

新闻实时分析系统hive与hbase集成进行数据分析

(一)Hive概述 (二)Hive在Hadoop生态圈中的位置 (三)Hive架构设计  (四)Hive的优点及应用场景 (五)Hive的下载和安装部署1.Hive下载Apache版本的Hive。Cloudera版本的Hive。这里选择下载Apache稳定版本apache-hive-0.1... 查看详情

建立hive和hbase的映射关系,通过spark将hive表中数据导入clickhouse(代码片段)

...表,通过Hive与HBase建立映射关系,实现双方新增数据后彼此都可以查询到。通过spark将Hive中的数据读取到并经过处理保存到ClickHouse中一Hbase1Hbase表操作1.1创建命名空间hbase(main):008:0>create_namespace 查看详情

毕设三spark与phoenix集成插入数据/解析json数组(代码片段)

需求:将前些日子采集的评论存储到hbase中思路:先用fastjson解析评论,然后构造rdd,最后使用spark与phoenix交互,把数据存储到hbase中部分数据:1[23"referenceName":"AppleiPhoneXR64GB黑色移动联通电信4G全网通手机双卡双待",4"creationTime":"2019-04-0801:... 查看详情

hbase与hive(代码片段)

二者对比1.Hive数据仓库:本质其实就相当于将hdfs中已经存储的文件在Mysql中做了一个双射关系,以方便用HQL去管理查询。用于数据分析、清洗:Hive适用于离线的数据分析和清洗,延迟较高。基于HDFS、MapReduce:Hive存储的数据依旧... 查看详情

Spark 与 Hive 的集成

】Spark与Hive的集成【英文标题】:SparkIntegrationwithhive【发布时间】:2020-01-0606:39:36【问题描述】:目前在我们的项目中,我们使用的是HDInsights3.6,其中我们默认启用了spark和hive集成,因为它们共享相同的目录。现在我们要迁移HD... 查看详情

数据库与数据仓库的比较hbase——hive

...仓库(DataWarehouse)是一个面向主题的(SubjectOriented)、集成的(Integrate)、相对稳定的(Non-Volatile)、反映历史变化(TimeVariant)的数据集合,用于支持管理决策。(1)面向主题:指数据仓库中的数据是按照一定的主题域进行... 查看详情

新闻网大数据实时分析可视化系统项目——12hive与hbase集成进行数据分析

(一)Hive概述(二)Hive在Hadoop生态圈中的位置(三)Hive架构设计(四)Hive的优点及应用场景(五)Hive的下载和安装部署1.Hive下载Apache版本的Hive。Cloudera版本的Hive。这里选择下载Apache稳定版本apache-hive-0.13.1-bin.tar.gz,并上传至b... 查看详情

flink与hbase交互(代码片段)

...原始的高效操作方式,而第二、第三则分别是Spark、Flink集成HBase的方式,最后一种是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中调用。注意:这里我们使用HBase2.1.2版本,flink1.7.2版本,scala-2.12版... 查看详情

hbase 与 pyspark 的集成

】hbase与pyspark的集成【英文标题】:hbaseintegrationwithpyspark【发布时间】:2015-09-2109:23:53【问题描述】:我正在尝试从HDP2.3中的pyspark访问hbase现在我只是尝试使用以下命令执行spark目录中给出的示例程序:spark-submit--driver-class-path/us... 查看详情

hive教程---整合hbase

目录6.4.1HBase与Hive的对比6.4.2HBase与Hive集成使用6.4.1HBase与Hive的对比Hive(1)数据仓库Hive的本质其实就相当于将HDFS中已经存储的文件在Mysql中做了一个双射关系,以方便使用HQL去管理查询。(2)用于数据分析、清洗Hive适用于离线的数据... 查看详情

新闻实时分析系统hive与hbase集成进行数据分析clouderahue大数据可视化分析

1.Hue概述及版本下载1)概述Hue是一个开源的ApacheHadoopUI系统,最早是由ClouderaDesktop演化而来,由Cloudera贡献给开源社区,它是基于PythonWeb框架Django实现的。通过使用Hue我们可以在浏览器端的Web控制台上与Hadoop集群进行交互来分析... 查看详情

hive与hbase的区别与联系

...ve与HBase的区别与联系二者区别Hive:Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能。Hive本身不存储和计算数据,它完全依赖于HDFS和MapReduce,Hive中的表纯逻辑。hive... 查看详情

hbase(代码片段)

...duce将本地数据导入到HBase4.3.3自定义HBase-MapReduce4.4与Hive的集成4.4.1HBase与Hive的对比4.4.2HBase与Hive集成使用5HBase优化5.1高可用5.2预分区5.3RowKey设计5.4内存优化5.5基础优化6HBase实战之谷粒微博总结#1HBase简介1.1HBase定义HBase是一种分布式... 查看详情

hive与hbase的区别

从使用方面讲Hive是一个构建在Hadoop平台上的数据仓库,可以将结构化的数据文件映射为一张数据库表。通过Hive可以使用HQL语言查询存放在HDFS上的数据。HQL是一种类SQL语言,这种语言最终被转化成Map/Reduce。HBase是基于HDFS平台的Ke... 查看详情