kafka2.5.0自定义数据序列化类(代码片段)

zhuwenjoyce zhuwenjoyce     2022-11-30     559

关键词:

kafka只接收bytes字节数组,所以自定义序列化器内部实现需按照bytes字节数组转换为标准。

重点:本例子只是提供参考怎样写自定义序列化器,因为关系到性能,一般默认使用StringSerializer即可,效率很高。

小知识:Kafka支持Avro序列化器,比较适用于生产者和消费者在版本升级差距拉大时使用,但同时要注意性能。参考文章《使用kafka中提供的Avro序列化框架实现序列化

1) 自定义序列化类,转换成bytes字节数组:

import cn.enjoyedu.vo.DemoUser;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.ByteBuffer;
import java.util.Map;

/**
 * @author King老师*/
public class MySerializer implements Serializer<DemoUser> 
    public void configure(Map<String, ?> configs, boolean isKey) 
        //do nothing
    

    public byte[] serialize(String topic, DemoUser data) 
        try 
            byte[] name;
            int nameSize;
            if(data==null)
                return null;
            
            if(data.getName()!=null)
                name = data.getName().getBytes("UTF-8");
                //字符串的长度
                nameSize = data.getName().length();
            else
                name = new byte[0];
                nameSize = 0;
            
            /*id的长度4个字节,字符串的长度描述4个字节,
            字符串本身的长度nameSize个字节*/
            ByteBuffer buffer = ByteBuffer.allocate(4+4+nameSize);
            buffer.putInt(data.getId());//4
            buffer.putInt(nameSize);//4
            buffer.put(name);//nameSize
            return buffer.array();
         catch (Exception e) 
            throw new SerializationException("Error serialize DemoUser:"+e);
        
    

    public void close() 
        //do nothing
    

2) 自定义反序列化类,从bytes字节数组转换成自定义对象:

import cn.enjoyedu.vo.DemoUser;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.nio.ByteBuffer;
import java.util.Map;

/**
 * @author King老师 
 */
public class MyDeserializer implements Deserializer<DemoUser> 


    public void configure(Map<String, ?> configs, boolean isKey) 
        //do nothing
    

    public DemoUser deserialize(String topic, byte[] data) 
        try 
            if(data==null)
                return null;
            
            if(data.length<8)
                throw new SerializationException("Error data size.");
            
            ByteBuffer buffer = ByteBuffer.wrap(data);
            int id;
            String name;
            int nameSize;
            id = buffer.getInt();
            nameSize = buffer.getInt();
            byte[] nameByte = new byte[nameSize];
            buffer.get(nameByte);
            name = new String(nameByte,"UTF-8");
            return new DemoUser(id,name);
         catch (Exception e) 
            throw new SerializationException("Error Deserializer DemoUser."+e);
        

    

    public void close() 
        //do nothing
    

3) 配置序列化类

定义好自定义数据序列化类,需配置到kafka的配置里(参考《kafka2.5.0生产者与消费者配置详解》):

ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, MySerializer.class

ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MySerializer.class

end.

flink实战系列flink1.14.0消费kafka数据自定义反序列化器(代码片段)

Flink1.14.0消费kafka数据自定义反序列类在最近刚发布的Flink1.14.0版本中Source接口进行了重构,API的变化还是非常大的,那在新的接口下消费kafka的时候如何自定义反序列类呢?KafkaSource使用Kafkasource提供了一个构建类来构造KafkaSource的实... 查看详情

kafka2.5.0主题topic(代码片段)

kafka基本命令查看博客《kafka2.5.0基本命令》本博文所使用kafka版本2.5.0,操作系统centos8.1)创建主题创建my-topic主题,该主题有1个副本,8个分区:$bin/kafka-topics.sh--create--bootstrap-serverlocalhost:9092--replication-factor1--partitions8--topicmy-top 查看详情

kafka2.5.0基本命令(代码片段)

1)启动zookeeper演示用的话,直接启动kafka自带的zookeeper即可:cdkafkaDirectory/kafka_2.12-2.5.0bin/zookeeper-server-start.shconfig/zookeeper.properties生产上建议连接到zookeeper集群,需更改配置文件config/server.properties 里更改zookeepe 查看详情

自定义异常类jackson序列化jsonmappingexception异常(代码片段)

遇到一个场景,使用slf4j记录异常的时候发现一直抛jsonMappingException@ExceptionHandler(Exception.class,RuntimeException.class)@ResponseBodypublicClientResulthandleException(Exceptionex)if(exinstanceofCusto 查看详情

0802drf视图(代码片段)

昨日回顾:1.Serializer(序列化)1.ORM对应的query_set和ORM对象转换成JSON格式的数据1.在序列化类中定义自定义的字段:SerializerMethodField在类中定义get_自定义字段名(self,obj)方法2.read_only只在显示(查询)时才会有效。2.对前端POST过来的... 查看详情

spark-自定义排序(代码片段)

考察spark自定义排序方式一:自定义一个类继承Ordered和序列化,Driver端将数据变成RDD,整理数据转成自定义类类型的RDD,使用本身排序即可。packagecom.rz.spark.baseimportorg.apache.spark.rdd.RDDimportorg.apache.spark.SparkConf,SparkContext//自定义排... 查看详情

@jsonserialize(代码片段)

文章目录使用自定义序列化类实体标注注解此注解用于属性或者getter方法上,用于在序列化时嵌入开发者自定义的代码。比如将一个Date类型的变量转换成Long类型,或是序列化一个double时在其后面限制两位小数点。使用自定义序... 查看详情

jackson注解自定义数据脱敏策略(代码片段)

...1.前言2.脱敏注解3.定义好一套需要脱敏的规则4.自定义JSON序列化5.在实体类上标注对应的脱敏规则5.写一个接口进行测试1.前言有时候,我们返回给前端的数据需要脱敏,避免用户信息被泄漏,就像你点外卖一样,... 查看详情

kafka2.5.0详解核心配置文件server.properties(代码片段)

$cat-nconfig/server.propertiesbroker.id=0    //brokerID, 集群模式下该ID必须唯一,且永恒不变 listeners=PLAINTEXT://your_host_name:9092    // 配置你的应用所在IP地址,我理解为访问白名 查看详情

android——一个简单的闹钟app(代码片段)

...置实现侧滑回调方法绑定RecyclerView删除子项新增闹钟子项序列化实体类定义实体类,并实现序列化取出序列化实体类对象TimePicker自定义TimePicker文字大小及颜色获取时间数据返回时间数据存储数据取出数据子项添加自定义Switc... 查看详情

kafka2.5.0生产者与消费者配置详解(代码片段)

1)引入maven依赖我这里使用的是springboot2.1.3.RELEASE 版本:<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>会引入一对的ka 查看详情

serializer组件(代码片段)

Serializer组件(序列化器-Serializer)1.定义序列化器DjangoRESTframework中的Serializer使用类来定义,需要继承自rest_framework.serializers.Serializer.1.#先定义数据库模型类Userfromdjango.dbimportmodelsclassUser(models.Model):CHOICES_SEX=((0,'男'),(1,'女'))... 查看详情

drf---序列化(代码片段)

...2)在settings中配置执行自定义异常处理函数"""序列化家族"""1、Serializer类:底层序列化类-了解类重点:单表序列化2、ModelSerializer类:模型序列化类-核心类重点:多表序列化3、ListSerializer类:群操作序列化... 查看详情

在kotlin序列化中使用datastore(代码片段)

...方法。这两个DataStore版本都会在后台使用Protos对数据进行序列化。您也可以使用Kotlin序列化,结合使用DataStore与自定义数据类。这有助于减少样板代码,且无需学习或依赖于Protobuf库,同时仍可以为数据提供架构。您... 查看详情

android--每日一问:自定义一个类让其实现parcelable,大致流程是什么?(代码片段)

典型回答1).复写writeToParcel将对象数据序列化成一个Parcel对象(序列化之后成为Parcel对象.以便Parcel容器取出数据,其中flags标识有两种值:0或1。为1时标识当前对象需要作为返回值返回,不能立刻释放资源,即P... 查看详情

没有@Serializable 的数据类的自定义序列化程序

】没有@Serializable的数据类的自定义序列化程序【英文标题】:Customserializerfordataclasswithout@Serializable【发布时间】:2021-03-2402:29:04【问题描述】:我正在尝试将JSON文件反序列化为使用kotlinx.serialization无法控制的Kotlin数据类。这个... 查看详情

自定义异常类(代码片段)

自定义异常类MyExceptionpackagecom.aff.excep;//自定义异常类://1.自定义的异常类继承现有的异常类//2.提供一个序列号,提供几个重载的构造器publicclassMyExceptionextendsRuntimeExceptionstaticfinallongserialVersionUID=-7034897190745766939L;publicMyException()pu... 查看详情

序列化组件serializer之序列化与反序列化(代码片段)

一、知识补充1、序列化与反序列化"""1)序列化组件单表序列化(后台数据返回给前台):将后台的数据对象,转换成能用于网络传输的过程,即又是将对象转换成二进制字符串单表反序列化(前台提交数据给后台):拿内存的数据转... 查看详情