如何在 Scala 中实现 Kafka Consumer

     2023-03-10     116

关键词:

【中文标题】如何在 Scala 中实现 Kafka Consumer【英文标题】:How do I implement Kafka Consumer in Scala 【发布时间】:2016-12-10 00:04:35 【问题描述】:

我正在尝试在 scala 中实现一个 kafka 消费者。我已经看过一百万篇关于如何用 Java 进行操作的教程,甚至有些 (like this one) 说它是针对 scala 的,但它是用 Java 编写的。

有谁知道我在哪里可以找到如何用 Scala 编写它的示例?我才刚刚开始学习 Scala,所以即使链接示例是用 Java 或其他东西编写的,也可能可以在 Scala 中使用,但老实说,我现在不知道我在做什么。我在谷歌上搜索的所有内容都只是将我链接到如何用 Java 进行操作。

【问题讨论】:

您可以使用 Scala 中的所有 Java 代码,只需很少的更改。 我可以在 Java 中创建类,然后将其导入到我想要使用的类中吗?或者我是否需要将所有变量和东西重写为 scala? 没关系,我的 scala 测试无法识别 java 类。这是 Java 中的类 (pastebin.com/tnS9Amie),我只是不太了解 scala 来转换它。看起来有可能吗? 您能详细说明“不会识别”吗?这是怎么回事?您的项目结构如何? 【参考方案1】:

您看到大多数 Java 示例的原因是,从 0.8.2.2 开始的新 KafkaProducer 是用 Java 编写的。

假设您使用 sbt 作为构建系统,并假设您使用 Kafka 0.8.2.2(您可以根据需要更改版本),您将需要:

libraryDependencies ++= 
  Seq(
    "org.apache.kafka" %% "kafka" % "0.8.2.2",
    "org.apache.kafka" % "kafka-clients" % "0.8.2.2",
  )

一个简单的例子应该让你开始:

import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer 

object KafkaExample 
  def main(args: Array[String]): Unit = 
    val properties = new Properties()
    properties.put("bootstrap.servers", "localhost:9092")
    properties.put("group.id", "consumer-tutorial")
    properties.put("key.deserializer", classOf[StringDeserializer])
    properties.put("value.deserializer", classOf[StringDeserializer])

    val kafkaConsumer = new KafkaConsumer[String, String](properties)
    kafkaConsumer.subscribe("firstTopic", "secondTopic")

    while (true) 
      val results = kafkaConsumer.poll(2000).asScala
      for ((topic, data) <- results) 
        // Do stuff
      
    

【讨论】:

消费者不应该与 zookeeper 客户而不是经纪人交谈吗? @AvihooMamka Kafka 不再“需要”ZooKeeper 来跟踪偏移量。如何做到这一点完全取决于您。通常,消费者会与经纪人交谈以进行消费。 results 始终为 null。我错过了什么? @ItayB 你确定你从正确的主题消费?它有消息吗?你的消费策略是什么?最早还是最晚? @ItayB 你需要this,查看那里的实现。我会更新我的答案。【参考方案2】:

您还可以在这里查看完全基于 Scala 构建的工作模板:https://github.com/knoldus/activator-kafka-scala-producer-consumer 此应用程序包含您要在此处使用的代码。

希望我能解决您的问题,谢谢!

【讨论】:

如何在 Scala 中实现真正的 Singleton

】如何在Scala中实现真正的Singleton【英文标题】:HowtoimplementarealSingletoninScala【发布时间】:2016-01-1006:09:21【问题描述】:我有混合Java/Scala项目。有一些用Java实现并使用一些Scala类的Quartz作业。这些类应该使用相同的SparkContext实... 查看详情

您将如何在 Scala 中实现缓存方面

】您将如何在Scala中实现缓存方面【英文标题】:HowwouldyouimplementacachingaspectinScala【发布时间】:2014-05-0710:16:20【问题描述】:头脑风暴:我正在开发一个Scala项目,我们在该项目中进行服务调用,并且需要使用memcache缓存返回值... 查看详情

如何在火花聚合函数中实现scala类型安全

】如何在火花聚合函数中实现scala类型安全【英文标题】:Howtoimplementscalatypesafetyinsidesparkaggregationfunction【发布时间】:2021-09-1322:34:45【问题描述】:如何对agg函数中聚合的值实现类型安全?我的目标是在运行前进行类型检查。$&... 查看详情

如何在 spark scala 中实现 uniqueConcatenate、uniqueCount [关闭]

】如何在sparkscala中实现uniqueConcatenate、uniqueCount[关闭]【英文标题】:howtoimplementuniqueConcatenate,uniqueCountinsparkscala[closed]【发布时间】:2022-01-1017:01:58【问题描述】:我正在尝试转换数据,旧代码在Tibco中并使用uniqueConcatenate、uniqueC... 查看详情

Scala - 如何在 Spark 的 map 函数中实现 Try

】Scala-如何在Spark的map函数中实现Try【英文标题】:Scala-howtoimplementTryinsideamapfunctioninSpark【发布时间】:2019-03-2106:45:03【问题描述】:由于map转换中的函数抛出java.lang.NullPointerException,我的Spark作业的一个阶段失败。我的想法是... 查看详情

如何在scala中实现一个真正的singleton(代码片段)

我有混合Java/Scala项目。有一些Quartz作业用Java实现,并使用一些Scala类。这些类应该使用相同的SparkContext实例,所以我实现了一些应该是singleton的东西,如下所示:objectSparkContextLoadervarhasSC=falsevarsc:Any=0defgetSC(workers):SparkContext=if(!ha... 查看详情

如何在 Apache Spark Java 或 Scala 中实现这一点?

】如何在ApacheSparkJava或Scala中实现这一点?【英文标题】:HowdoIachievethisinApacheSparkJavaorScala?【发布时间】:2018-08-0102:14:44【问题描述】:汽车上的设备在行程开始时不会发送TRIPID,但会在行程结束时发送一个。如何将对应的TRIPIDS... 查看详情

如何在kafka中实现多个生产者和多个消费者

】如何在kafka中实现多个生产者和多个消费者【英文标题】:HowdoIimplementmultipleproducerandmultipleconsumerinkafka【发布时间】:2016-01-1909:19:55【问题描述】:我是kafka的新手,我有很多服务器会产生大量日志的要求,我想创建多个生产... 查看详情

我将如何在播放框架 2.4.3 (Scala) 中实现拦截器/过滤器

】我将如何在播放框架2.4.3(Scala)中实现拦截器/过滤器【英文标题】:HowwouldIimplementanInterceptor/Filterinplayframework2.4.3(Scala)【发布时间】:2015-11-1723:33:59【问题描述】:我是PlayFramework的新手,我想实现一个拦截器或过滤器,在请求... 查看详情

java示例代码_在Scala中实现这种Java机制

java示例代码_在Scala中实现这种Java机制 查看详情

在 Scala 控制器中实现 WebSocketClient

】在Scala控制器中实现WebSocketClient【英文标题】:ImplementingaWebSocketClientinScalacontroller【发布时间】:2016-02-0317:57:33【问题描述】:我有一个scala控制器,我在其中使用Play的WSapi调用外部Web服务!返回json的框架。现在将使用WebSocketC... 查看详情

使用 scala 在 spark 中实现类似 MergeSort 的功能

】使用scala在spark中实现类似MergeSort的功能【英文标题】:ImplementaMergeSortlikefeatureinsparkwithscala【发布时间】:2016-05-1023:58:31【问题描述】:Spark版本1.2.1Scala版本2.10.4我有2个由数字字段关联的SchemaRDD:RDD1:(Bigtable-aboutamillionrecords)[A,... 查看详情

在Scala spark中实现动态字符串插值?

】在Scalaspark中实现动态字符串插值?【英文标题】:AchivedynamicstringinterpolationinScalaspark?【发布时间】:2020-01-1910:03:05【问题描述】:我有一个字符串,其中包含需要进入我预期数据帧的.agg函数的函数。我的数据数据框看起来像v... 查看详情

在 Scala Slick 中实现类实例成员修改的最佳方法?

】在ScalaSlick中实现类实例成员修改的最佳方法?【英文标题】:BestwaytoimplementmembermodificationofclassinstancesinScalaSlick?【发布时间】:2013-11-2116:30:59【问题描述】:我正在尝试在一个真实案例中使用Slick的提升嵌入方法(体育俱乐部... 查看详情

如何在 Java 中实现遗传算法的高斯变异算子

】如何在Java中实现遗传算法的高斯变异算子【英文标题】:HowtoimplementtheGaussianmutationoperatorforageneticalgorithminJava【发布时间】:2011-09-1016:12:06【问题描述】:我尝试为我的项目学习和实现一个简单的遗传算法库。此时,进化、种... 查看详情

在 spark 中实现 informatica 逻辑

...icinspark【发布时间】:2018-06-2411:30:11【问题描述】:我们如何在spark中实现以下逻辑?如果列值为空,那么它应该返回\'\'如果ltrim(rtrim(column))为null那么它应该返回\'\'否则它应该填充该列的值【问题讨论】:我想你的意思是,scala... 查看详情

springboot中实现kafa指定offset消费

kafka消费过程难免会遇到需要重新消费的场景,例如我们消费到kafka数据之后需要进行存库操作,若某一时刻数据库down了,导致kafka消费的数据无法入库,为了弥补数据库down期间的数据损失,有一种做法我们可以指定kafka消费者... 查看详情

在 FRP 中实现快照

】在FRP中实现快照【英文标题】:ImplementingsnapshotinFRP【发布时间】:2012-03-2610:53:57【问题描述】:我正在Scala中实现FRP框架,但我似乎遇到了问题。出于某种想法,这个问题我决定限制我的框架的公共接口,因此只能在“现在”... 查看详情