Deserialize Kafka AVRO messages using Apache Beam(使用Apache Beam反序列化Kafka Avro消息)
问题描述
主要目标是聚合两个Kafka主题,一个是压缩的慢速移动数据,另一个是每秒接收的快速移动数据。
我已经能够在KV(Long,String)等简单场景中使用如下内容消费消息:
PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long,
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
PCollection<String> output = input.apply(Values.<String>create());
但当您需要从Avro反序列化时,这似乎不是一种方法。我有一个KV(String,avro)需要消费。
我尝试从avro架构生成Java类,然后将它们包含在"Apply"中,例如:
PCollection<MyClass> output = input.apply(Values.<MyClass>create());
但这似乎不是正确的方法。
有没有什么文档/示例可以指给我看,这样我就可以了解您将如何使用Kafka Avro和Beam?
我已更新代码:
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;
public class Main {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
);
p.run();
}
}
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
@DefaultCoder(AvroCoder.class)
public class Myclass{
String name;
String age;
Myclass(){}
Myclass(String n, String a) {
this.name= n;
this.age= a;
}
}
但我现在收到以下错误
incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >
我一定是导入了错误的序列化程序?
推荐答案
我也遇到过同样的问题。在这个邮件档案里找到了解决方案。 http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E
在您的情况下,您需要定义自己的Deserializer<MyClass>
,它可以从AbstractKafkaAvroDeserializer
扩展,如下所示。
public class MyClassKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public MyClass deserialize(String s, byte[] bytes) {
return (MyClass) this.deserialize(bytes);
}
@Override
public void close() {} }
然后将您的KafkaAvroAnti ializer指定为ValueAnti ializer。
p.apply(KafkaIO.<Long, MyClass>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(MyClassKafkaAvroDeserializer.class) );
这篇关于使用Apache Beam反序列化Kafka Avro消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:使用Apache Beam反序列化Kafka Avro消息


- GC_FOR_ALLOC 是否更“严重"?在调查内存使用情况时? 2022-01-01
- 在 Java 中,如何将 String 转换为 char 或将 char 转换 2022-01-01
- 如何指定 CORS 的响应标头? 2022-01-01
- 将 Java Swing 桌面应用程序国际化的最佳实践是什么? 2022-01-01
- 获取数字的最后一位 2022-01-01
- 未找到/usr/local/lib 中的库 2022-01-01
- Eclipse 的最佳 XML 编辑器 2022-01-01
- java.lang.IllegalStateException:Bean 名称“类别"的 BindingResult 和普通目标对象都不能用作请求属性 2022-01-01
- 如何使 JFrame 背景和 JPanel 透明且仅显示图像 2022-01-01
- 转换 ldap 日期 2022-01-01