streampark集成clouderaflinkldap告警,以及部署常见问题(代码片段)

酥酥饼一号 酥酥饼一号     2023-03-09     152

关键词:

集成背景

我们当前集群使用的是Cloudera CDP,Flink版本为Cloudera Version 1.14,整体Flink安装目录以及配置文件结构与社区版本有较大出入。直接根据Streampark官方文档进行部署,将无法配置Flink Home,以及后续整体Flink任务提交到集群中,因此需要进行针对化适配集成,在满足使用需求上,尽量提供完整的Streampark使用体验。

集成步骤

版本匹配问题解决

首先解决无法识别Cloudera中的Flink Home问题,根据报错主要明确到的事情是无法读取到Flink版本、lib下面的jar包名称无法匹配。

修改对象:

修改源码:(解决无法匹配cloudera jar包)

class FlinkVersion(val flinkHome: String) extends java.io.Serializable with Logger 

  private[this] lazy val FLINK_VER_PATTERN = Pattern.compile("^(\\\\d+\\\\.\\\\d+)(\\\\.)?.*$")

  private[this] lazy val FLINK_VERSION_PATTERN = Pattern.compile("^Version: (\\\\d+\\\\.\\\\d+\\\\.\\\\d)(-csa)?(\\\\d+\\\\.\\\\d+\\\\.\\\\d+\\\\.\\\\d)?, Commit ID: (.*)$")

  private[this] lazy val FLINK_SCALA_VERSION_PATTERN = Pattern.compile("^flink-dist_(\\\\d+\\\\.\\\\d*)-(\\\\d+\\\\.\\\\d+\\\\.\\\\d)(-csa.*)?.jar$")

  lazy val scalaVersion: String = 
    val matcher = FLINK_SCALA_VERSION_PATTERN.matcher(flinkDistJar.getName)
    if (matcher.matches()) 
      matcher.group(1);
     else 
      // flink 1.15 + on support scala 2.12
      "2.12"
    
  

  lazy val fullVersion: String = s"$version_$scalaVersion"

  lazy val flinkLib: File = 
    require(flinkHome != null, "[StreamPark] flinkHome must not be null.")
    require(new File(flinkHome).exists(), "[StreamPark] flinkHome must be exists.")
    val lib = new File(s"$flinkHome/lib")
    require(lib.exists() && lib.isDirectory, s"[StreamPark] $flinkHome/lib must be exists and must be directory.")
    lib
  

  lazy val flinkLibs: List[NetURL] = flinkLib.listFiles().map(_.toURI.toURL).toList

  lazy val version: String = 
    val flinkVersion = new AtomicReference[String]
    val cmd = List(s"java -classpath $flinkDistJar.getAbsolutePath org.apache.flink.client.cli.CliFrontend --version")
    val success = new AtomicBoolean(false)
    val buffer = new mutable.StringBuilder
    CommandUtils.execute(
      flinkLib.getAbsolutePath,
      cmd,
      new Consumer[String]() 
        override def accept(out: String): Unit = 
          buffer.append(out).append("\\n")
          val matcher = FLINK_VERSION_PATTERN.matcher(out)
          if (matcher.find) 
            success.set(true)
            flinkVersion.set(matcher.group(1))
          
        
      )
    logInfo(buffer.toString())
    if (!success.get()) 
      throw new IllegalStateException(s"[StreamPark] parse flink version failed. $buffer")
    
    buffer.clear()
    flinkVersion.get
  

  // flink major version, like "1.13", "1.14"
  lazy val majorVersion: String = 
    if (version == null) 
      null
     else 
      val matcher = FLINK_VER_PATTERN.matcher(version)
      matcher.matches()
      matcher.group(1)
    
  

  lazy val flinkDistJar: File = 
    val distJar = flinkLib.listFiles().filter(_.getName.matches("flink-dist.*\\\\.jar"))
    distJar match 
      case x if x.isEmpty =>
        throw new IllegalArgumentException(s"[StreamPark] can no found flink-dist jar in $flinkLib")
      case x if x.length > 1 =>
        throw new IllegalArgumentException(s"[StreamPark] found multiple flink-dist jar in $flinkLib")
      case _ =>
    
    distJar.head
  

  // StreamPark flink shims version, like "streampark-flink-shims_flink-1.13"
  lazy val shimsVersion: String = s"streampark-flink-shims_flink-$majorVersion"

  override def toString: String =
    s"""
       |----------------------------------------- flink version -----------------------------------
       |     flinkHome    : $flinkHome
       |     distJarName  : $flinkDistJar.getName
       |     flinkVersion : $version
       |     majorVersion : $majorVersion
       |     scalaVersion : $scalaVersion
       |     shimsVersion : $shimsVersion
       |-------------------------------------------------------------------------------------------
       |""".stripMargin

Flink Home指定

由于Cloudera Flink的默认安装路径为/opt/cloudera/parcels/Flink-$version,而执行/opt/cloudera/parcels/Flink-$version/bin/flink 为整体环境配置,vi flink可查看到具体过程

实际的flink提交路径在/opt/cloudera/parcels/Flink-$version/lib/flink/bin/flink,因此/opt/cloudera/parcels/Flink-$version/lib/flink可以理解为真正的Flink Home,具体查看该目录下内容

发现缺少conf目录,倘若配置该目录在Streampark为Flink Home将无法访问到集群,因此可软连接Flink配置或者在该路径下编辑集群中的Flink配置文件。

综上,前置配置和打包好代码(代码中可能会涉及到自己使用上的优化修改)之后,可以进行部署。

注意2.0的版本打包的话直接执行源码中的build.sh即可,选择混合部署,生成的包在dist目录下。

部署流程

前置部署流程建议参考官方步骤安装部署 | Apache StreamPark (incubating)

特别注意需要对元数据库进行初始化以及初始数据插入,执行sql在$streamarkHome/script/data&schema

根据官方的意思需要将mysql的connector添加到lib目录下,不然无法连接数据库。

在conf/application.yml中修改数据源为mysql,配置好集群中使用到的用户(默认hdfs),默认在hdfs创建streampark的工作目录hdfs:///streampark。

部署结果验证

部署完成之后,执行bin下的startup.sh 可以启动集群,在web上进入部署地址ip:port(默认10000)

使用默认账号 admin streampark可以进去

登录进去之后点击设置中心可以进行Flink home的配置

LDAP集成

主要是需要配置conf下application.yml中的ldap配置信息即可,然后重启streampark。

使用与踩坑点:

登录选择LDAP登录

利用公司LDAP登录之后,提示

但是刚才使用ladp登录的用户,在streampark上已经创建对应的用户,须在成员管理里面将刚才创建的用户添加到对应的团队中,刚才那个用户才可以登录。

告警配置

主要配置的是企业微信告警,在设置中心配置企业微信机器人的token(注意公司环境为内网的话,需要在代码中修改对应的url,拼接为内网发送地址)
告警模板在代码中的修改路径为:

中间修改了的告警模板,重新打包一下即可。

一些问题及解决办法

一、Hadoop环境

解决办法:在部署Streampark的节点上添加一下hadoop环境即可

vi /etc/profile

source一下,重启streampark即可

二、依赖jar的初始化

解决办法:在于部署后的streampark在hdfs上的工作目录上lib目录没有正常上传,找到hdfs上初始化的strempark work路径,观察一下hdfs:///streampark/flink/.../下的lib目录是否完整,不完整的话手动将本地Flink Home目录下的lib put上去即可。

集成与持续集成介绍(代码片段)

1.集成与持续集成介绍1.1什么是集成简单来说,就是把开发好的代码,提交到系统中,就是集成。1.2什么是持续集成持续集成就是频繁的(一天多次)将代码集成到主干。1.3使用持续集成带来的好处(1)快速发现错误。每完成一... 查看详情

集成测试

集成测试:在单元测试的基础上,将所有模块按照总体设计的要求组装成为子系统或系统进行的测试。集成测试的对象是模块间的接口,其目的是找出在模块接口上和系统体系结构上的问题。集成测试策略:基于层次的集成:自... 查看详情

集成测试

1.定义:  集成测试,也叫组装测试或联合测试。在单元测试的基础上,将所有模块按照设计要求(如根据结构图)组装成为子系统或系统,进行集成测试。2.方案:  ·自顶向下  ·自底向上  ·核心集成  ·高频集成... 查看详情

浅谈持续集成的理解以及实现持续集成,需要做什么?

一、持续集成是什么?持续集成是一种软件开发的实践,即团队开发成员经常集成他们的工作,通常每个成员每天至少集成一次,也就意味着每天可能会发生多次集成。每次集成都通过自动化的构建(包括编译,发布,自动化测... 查看详情

持续集成简介

一、持续集成持续集成是一种软件开发的实践,即团队开发成员经常集成他们的工作,通常每个成员每天至少集成一次,也就意味着每天可能会发生多次集成。每次集成都通过自动化的构建(包括编译,发布,自动化测试)来验... 查看详情

持续集成与devops

持续集成   持续集成(Continuousintegration,简称C1),简单的说持续集成就是频紧地(一天多次)将代码集成到主干,它的好处主要有两个:1、快速发现错误。每完成一次更新,就集成到主干,可以快速发现错误,定位... 查看详情

集成框架 - Apache Camel 与 Spring 集成? [关闭]

】集成框架-ApacheCamel与Spring集成?[关闭]【英文标题】:Integrationframework-apachecamelvsspringintegration?[closed]【发布时间】:2016-10-2614:30:59【问题描述】:我正在评估一个基于EIP(企业集成模式)用于我的应用程序的集成框架。我有apac... 查看详情

持续集成

持续集成CI:continousintegration是一种软件开发实践开发人员经常集成他们的工作,每个人每天至少集成一次,即每天会发生多次集成。每次集成都通过自动化的构建(包括编译,发布,自动化测试)来验证,从而尽快地发现集成错... 查看详情

roma集成关键技术:增量数据集成

摘要:本文将详解ROMA集成关键技术-增量数据集成技术。本文分享自华为云社区《ROMA集成关键技术(2)-增量数据集成技术》,作者:华为云PaaS服务小智。1.概述ROMA平台的核心系统ROMAConnect源自华为流程IT的集成平台࿰... 查看详情

7.集成学习(ensemblelearning)stacking

1.集成学习(EnsembleLearning)原理2.集成学习(EnsembleLearning)Bagging3.集成学习(EnsembleLearning)随机森林(RandomForest)4.集成学习(EnsembleLearning)Adaboost5.集成学习(EnsembleLearning)GBDT6.集成学习(EnsembleLearning)算法比较7.集成学习(... 查看详情

持续集成

持续集成的概念持续集成的优点持续集成的流程1、检测代码变动2、自动构建编译3、自动测试4、自动打包  查看详情

什么是持续集成?

持续集成(ContinuousIntegration,简称CI)是一种软件开发实践,即团队开发成员经常集成他们的工作,通常每个成员每天至少集成一次,也就意味着每天可能会发生多次集成。在软件测试的工作中也经常会用到持续集成的技术来做... 查看详情

6.集成学习(ensemblelearning)算法比较

1.集成学习(EnsembleLearning)原理2.集成学习(EnsembleLearning)Bagging3.集成学习(EnsembleLearning)随机森林(RandomForest)4.集成学习(EnsembleLearning)Adaboost5.集成学习(EnsembleLearning)GBDT6.集成学习(EnsembleLearning)算法比较1.AdaBoostVsG 查看详情

集成测试

一、集成测试主要内容  指的是在单元测试的基础上,将所有的函数按照概要设计的要求组装称为系统或者子系统所进行的测试  集成测试也叫组装测试、联合测试、子系统测试、部件测试二、集成测试的层次 ... 查看详情

深入浅出ensemblelearning集成学习原理

  集成学习(ensemblelearning)可以说是现在非常火爆的机器学习方法了。它本身不是一个单独的机器学习算法,而是通过构建并结合多个机器学习器来完成学习任务。也就是我们常说的“博采众长”。集成学习可以用于分类问题集... 查看详情

深入浅出ensemblelearning集成学习原理

  集成学习(ensemblelearning)可以说是现在非常火爆的机器学习方法了。它本身不是一个单独的机器学习算法,而是通过构建并结合多个机器学习器来完成学习任务。也就是我们常说的“博采众长”。集成学习可以用于分类问题集... 查看详情

持续集成:

...,已经形成了一套标准流程,最重要的组成部分就是持续集成(Continuousintegration,简称CI)。本文简要介绍持续集成的概念和做法。一、概念持续集成指的是,频繁地(一天多次)将代码集成到主干。它的好处主要有两个。(1)... 查看详情

使用与黄瓜集成的柑橘框架的并行集成测试执行

】使用与黄瓜集成的柑橘框架的并行集成测试执行【英文标题】:Parallelintegrationtestexecutionusingcitrusframeworkintegratedwithcucumber【发布时间】:2017-07-1113:50:31【问题描述】:我正在使用与Cucumber集成的citrus框架进行集成测试(具有不... 查看详情