CARVIEW |
Class KafkaIO
- java.lang.Object
-
- org.apache.beam.sdk.io.kafka.KafkaIO
-
public class KafkaIO extends java.lang.Object
An unbounded source and a sink for Kafka topics.Read from Kafka as
UnboundedSource
Reading from Kafka topics
KafkaIO source returns unbounded collection of Kafka records as
PCollection<KafkaRecord<K, V>>
. AKafkaRecord
includes basic metadata like topic-partition and offset, along with key and value associated with a Kafka record.Although most applications consume a single topic, the source can be configured to consume multiple topics or even a specific set of
TopicPartition
s.To configure a Kafka source, you must specify at the minimum Kafka bootstrapServers, one or more topics to consume, and key and value deserializers. For example:
pipeline .apply(KafkaIO.<Long, String>read() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics. .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class) // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>> // Rest of the settings are optional : // you can further customize KafkaConsumer used to read the records by adding more // settings for ConsumerConfig. e.g : .withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1")) // set event times and watermark based on 'LogAppendTime'. To provide a custom // policy see withTimestampPolicyFactory(). withProcessingTime() is the default. // Use withCreateTime() with topics that have 'CreateTime' timestamps. .withLogAppendTime() // restrict reader to committed messages on Kafka (see method documentation). .withReadCommitted() // offset consumed by the pipeline can be committed back. .commitOffsetsInFinalize() // Specified a serializable function which can determine whether to stop reading from given // TopicPartition during runtime. Note that only {@link ReadFromKafkaDoFn} respect the // signal. .withCheckStopReadingFn(new SerializedFunction<TopicPartition, Boolean>() {}) //If you would like to send messages that fail to be parsed from Kafka to an alternate sink, //use the error handler pattern as defined in {@link ErrorHandler} .withBadRecordErrorHandler(errorHandler) // finally, if you don't need Kafka metadata, you can drop it.g .withoutMetadata() // PCollection<KV<Long, String>> ) .apply(Values.<String>create()) // PCollection<String> ...
Kafka provides deserializers for common types in
org.apache.kafka.common.serialization
. In addition to deserializers, Beam runners needCoder
to materialize key and value objects if necessary. In most cases, you don't need to specifyCoder
for key and value in the resulting collection because the coders are inferred from deserializer types. However, in cases when coder inference fails, they can be specified explicitly along with deserializers usingKafkaIO.Read.withKeyDeserializerAndCoder(Class, Coder)
andKafkaIO.Read.withValueDeserializerAndCoder(Class, Coder)
. Note that Kafka messages are interpreted using key and value deserializers.Read From Kafka Dynamically
For a given kafka bootstrap_server, KafkaIO is also able to detect and read from availableTopicPartition
dynamically and stop reading from un. KafkaIO usesWatchForKafkaTopicPartitions
to emit any new addedTopicPartition
and usesReadFromKafkaDoFn
to read from eachKafkaSourceDescriptor
. Dynamic read is able to solve 2 scenarios:- Certain topic or partition is added/deleted.
- Certain topic or partition is added, then removed but added back again
checkStopReadingFn
, there are 2 more cases that dynamic read can handle:- Certain topic or partition is stopped
- Certain topic or partition is added, then stopped but added back again
- A TopicPartition is removed, but added backed again
- A TopicPartition is stopped, then want to read it again
WatchForKafkaTopicPartitions
andReadFromKafkaDoFn
react to the signal from removed/stoppedTopicPartition
but we cannot guarantee that both DoFns perform related actions at the same time.Here is one example for failing to emit new added
TopicPartition
:- A
WatchForKafkaTopicPartitions
is configured with updating the current tracking set every 1 hour. - One TopicPartition A is tracked by the
WatchForKafkaTopicPartitions
at 10:00AM andReadFromKafkaDoFn
starts to read from TopicPartition A immediately. - At 10:30AM, the
WatchForKafkaTopicPartitions
notices that theTopicPartition
has been stopped/removed, so it stops reading from it and returnsProcessContinuation.stop()
. - At 10:45 the pipeline author wants to read from TopicPartition A again.
- At 11:00AM when
WatchForKafkaTopicPartitions
is invoked by firing timer, it doesn't know that TopicPartition A has been stopped/removed. All it knows is that TopicPartition A is still an active TopicPartition and it will not emit TopicPartition A again.
- At 10:00AM,
ReadFromKafkaDoFn
is processing TopicPartition A - At 10:05AM,
ReadFromKafkaDoFn
starts to process other TopicPartitions(sdf-initiated checkpoint or runner-issued checkpoint happens) - At 10:10AM,
WatchForKafkaTopicPartitions
knows that TopicPartition A is stopped/removed - At 10:15AM,
WatchForKafkaTopicPartitions
knows that TopicPartition A is added again and emits TopicPartition A again - At 10:20AM,
ReadFromKafkaDoFn
starts to process resumed TopicPartition A but at the same timeReadFromKafkaDoFn
is also processing the new emitted TopicPartitionA.
pipeline .apply(KafkaIO.<Long, String>read() // Configure the dynamic read with 1 hour, where the pipeline will look into available // TopicPartitions and emit new added ones every 1 hour. .withDynamicRead(Duration.standardHours(1)) .withCheckStopReadingFn(new SerializedFunction<TopicPartition, Boolean>() {}) .withBootstrapServers("broker_1:9092,broker_2:9092") .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class) ) .apply(Values.<String>create()) // PCollection<String> ...
Partition Assignment and Checkpointing
The Kafka partitions are evenly distributed among splits (workers).Checkpointing is fully supported and each split can resume from previous checkpoint (to the extent supported by runner). See
KafkaUnboundedSource.split(int, PipelineOptions)
for more details on splits and checkpoint support.When the pipeline starts for the first time, or without any checkpoint, the source starts consuming from the latest offsets. You can override this behavior to consume from the beginning by setting properties appropriately in
ConsumerConfig
, throughKafkaIO.Read.withConsumerConfigUpdates(Map)
. You can also enable offset auto_commit in Kafka to resume from last committed.In summary, KafkaIO.read follows below sequence to set initial offset:
1.KafkaCheckpointMark
provided by runner;
2. Consumer offset stored in Kafka whenConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG = true
;
3. Start from latest offset by default;Seek to initial offset is a blocking operation in Kafka API, which can block forever for certain versions of Kafka client library. This is resolved by KIP-266 which provides `default.api.timeout.ms` consumer config setting to control such timeouts. KafkaIO.read implements timeout itself, to not to block forever in case older Kafka client is used. It does recognize `default.api.timeout.ms` setting and will honor the timeout value if it is passes in consumer config.
Use Avro schema with Confluent Schema Registry
If you want to deserialize the keys and/or values based on a schema available in Confluent Schema Registry, KafkaIO can fetch this schema from a specified Schema Registry URL and use it for deserialization. A
Coder
will be inferred automatically based on the respectiveDeserializer
.For an Avro schema it will return a
PCollection
ofKafkaRecord
s where key and/or value will be typed asGenericRecord
. In this case, users don't need to specify key or/and value deserializers and coders since they will be set toKafkaAvroDeserializer
andAvroCoder
by default accordingly.For example, below topic values are serialized with Avro schema stored in Schema Registry, keys are typed as
Long
:PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline .apply(KafkaIO.<Long, GenericRecord>read() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("my_topic") .withKeyDeserializer(LongDeserializer.class) // Use Confluent Schema Registry, specify schema registry URL and value subject .withValueDeserializer( ConfluentSchemaRegistryDeserializerProvider.of("https://localhost:8081", "my_topic-value")) ...
You can also pass properties to the schema registry client allowing you to configure authentication
ImmutableMap<String, Object> csrConfig = ImmutableMap.<String, Object>builder() .put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,"USER_INFO") .put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,"<username>:<password>") .build(); PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline .apply(KafkaIO.<Long, GenericRecord>read() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("my_topic") .withKeyDeserializer(LongDeserializer.class) // Use Confluent Schema Registry, specify schema registry URL, value subject and schema registry client configuration .withValueDeserializer( ConfluentSchemaRegistryDeserializerProvider.of("https://localhost:8081", "my_topic-value", null, csrConfig)) ...
Read from Kafka as a
DoFn
KafkaIO.ReadSourceDescriptors
is thePTransform
that takes a PCollection ofKafkaSourceDescriptor
as input and outputs a PCollection ofKafkaRecord
. The core implementation is based onSplittableDoFn
. For more details about the concept ofSplittableDoFn
, please refer to the blog post and design doc. The major difference fromKafkaIO.Read
is,KafkaIO.ReadSourceDescriptors
doesn't require source descriptions(e.g.,KafkaIO.Read.getTopicPattern()
,KafkaIO.Read.getTopicPartitions()
,KafkaIO.Read.getTopics()
,KafkaIO.Read.getStartReadTime()
, etc.) during the pipeline construction time. Instead, the pipeline can populate these source descriptions during runtime. For example, the pipeline can query Kafka topics from a BigQuery table and read these topics viaKafkaIO.ReadSourceDescriptors
.Common Kafka Consumer Configurations
Most Kafka consumer configurations are similar to
KafkaIO.Read
:KafkaIO.ReadSourceDescriptors.getConsumerConfig()
is the same asKafkaIO.Read.getConsumerConfig()
.KafkaIO.ReadSourceDescriptors.getConsumerFactoryFn()
is the same asKafkaIO.Read.getConsumerFactoryFn()
.KafkaIO.ReadSourceDescriptors.getOffsetConsumerConfig()
is the same asKafkaIO.Read.getOffsetConsumerConfig()
.KafkaIO.ReadSourceDescriptors.getKeyCoder()
is the same asKafkaIO.Read.getKeyCoder()
.KafkaIO.ReadSourceDescriptors.getValueCoder()
is the same asKafkaIO.Read.getValueCoder()
.KafkaIO.ReadSourceDescriptors.getKeyDeserializerProvider()
is the same asKafkaIO.Read.getKeyDeserializerProvider()
.KafkaIO.ReadSourceDescriptors.getValueDeserializerProvider()
is the same asKafkaIO.Read.getValueDeserializerProvider()
.KafkaIO.ReadSourceDescriptors.isCommitOffsetEnabled()
has the same meaning asKafkaIO.Read.isCommitOffsetsInFinalizeEnabled()
.
For example, to create a basic
KafkaIO.ReadSourceDescriptors
transform:
Note that thepipeline .apply(Create.of( KafkaSourceDescriptor.of( new TopicPartition("topic", 1), null, null, null, null, null))) .apply( KafkaIO.<Long, String>readSourceDescriptors() .withBootstrapServers("broker_1:9092,broker_2:9092") .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class));
bootstrapServers
can also be populated from theKafkaSourceDescriptor
:pipeline .apply(Create.of( KafkaSourceDescriptor.of( new TopicPartition("topic", 1), null, null, null, null, ImmutableList.of("broker_1:9092", "broker_2:9092")))) .apply(KafkaIO.<Long, String>readSourceDescriptors() .withKeyDeserializer(LongDeserializer.class). .withValueDeserializer(StringDeserializer.class));
Configurations of
KafkaIO.ReadSourceDescriptors
Except configurations of Kafka Consumer, there are some other configurations which are related to processing records.
KafkaIO.ReadSourceDescriptors.commitOffsets()
enables committing offset after processing the record. Note that ifConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
is set in the consumer config, theKafkaIO.ReadSourceDescriptors.commitOffsets()
will be ignored.KafkaIO.ReadSourceDescriptors.withExtractOutputTimestampFn(SerializableFunction)
is used to compute theoutput timestamp
for a givenKafkaRecord
and controls the watermark advancement. There are three built-in types:KafkaIO.ReadSourceDescriptors.withProcessingTime()
KafkaIO.ReadSourceDescriptors.withCreateTime()
KafkaIO.ReadSourceDescriptors.withLogAppendTime()
For example, to create a
KafkaIO.ReadSourceDescriptors
with this additional configuration:pipeline .apply(Create.of( KafkaSourceDescriptor.of( new TopicPartition("topic", 1), null, null, null, null, ImmutableList.of("broker_1:9092", "broker_2:9092")))) .apply(KafkaIO.<Long, String>readSourceDescriptors() .withKeyDeserializer(LongDeserializer.class). .withValueDeserializer(StringDeserializer.class) .withProcessingTime() .commitOffsets());
Writing to Kafka
KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write just the values or native Kafka producer records using
ProducerRecord
. To configure a Kafka sink, you must specify at the minimum Kafka bootstrapServers, the topic to write to, and key and value serializers. For example:PCollection<KV<Long, String>> kvColl = ...; kvColl.apply(KafkaIO.<Long, String>write() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("results") .withKeySerializer(LongSerializer.class) .withValueSerializer(StringSerializer.class) // You can further customize KafkaProducer used to write the records by adding more // settings for ProducerConfig. e.g, to enable compression : .withProducerConfigUpdates(ImmutableMap.of("compression.type", "gzip")) // You set publish timestamp for the Kafka records. .withInputTimestamp() // element timestamp is used while publishing to Kafka // or you can also set a custom timestamp with a function. .withPublishTimestampFunction((elem, elemTs) -> ...) // Optionally, records that fail to serialize can be sent to an error handler // See {@link ErrorHandler} for details of for details of configuring a bad record error // handler .withBadRecordErrorHandler(errorHandler) // Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS(). .withEOS(20, "eos-sink-group-id"); );
To produce Avro values you can use class
KafkaAvroSerializer
. To make this class work withwrite()
and method withValueSerializer() make sure to erase the generic types by casting to (Class) as shown in the following example:KafkaIO.<Long, String>write() ... .withValueSerializer((Class)KafkaAvroSerializer.class) .withProducerConfigUpdates( <Map with schema registry configuration details> ) ...
Often you might want to write just values without any keys to Kafka. Use
values()
to write records with default empty(null) key:PCollection<String> strings = ...; strings.apply(KafkaIO.<Void, String>write() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("results") .withValueSerializer(StringSerializer.class) // just need serializer for value .values() );
Also, if you want to write Kafka
ProducerRecord
then you should usewriteRecords()
:PCollection<ProducerRecord<Long, String>> records = ...; records.apply(KafkaIO.<Long, String>writeRecords() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("results") .withKeySerializer(LongSerializer.class) .withValueSerializer(StringSerializer.class) );
Advanced Kafka Configuration
KafkaIO allows setting most of the properties inConsumerConfig
for source or inProducerConfig
for sink. E.g. if you would like to enable offset auto commit (for external monitoring or other purposes), you can set "group.id", "enable.auto.commit", etc.Event Timestamps and Watermark
By default, record timestamp (event time) is set to processing time in KafkaIO reader and source watermark is current wall time. If a topic has Kafka server-side ingestion timestamp enabled ('LogAppendTime'), it can enabled withKafkaIO.Read.withLogAppendTime()
. A custom timestamp policy can be provided by implementingTimestampPolicyFactory
. SeeKafkaIO.Read.withTimestampPolicyFactory(TimestampPolicyFactory)
for more information.Supported Kafka Client Versions
KafkaIO relies on kafka-clients for all its interactions with the Kafka cluster. kafka-clients versions 0.10.1 and newer are supported at runtime. The older versions 0.9.x - 0.10.0.0 are also supported, but are deprecated and likely be removed in near future. Please ensure that the version included with the application is compatible with the version of your Kafka cluster. Kafka client usually fails to initialize with a clear error message in case of incompatibility.Updates to the I/O connector code
For any significant significant updates to this I/O connector, please consider involving corresponding code reviewers mentioned here.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class and Description static class
KafkaIO.Read<K,V>
APTransform
to read from Kafka topics.static class
KafkaIO.ReadSourceDescriptors<K,V>
APTransform
to read fromKafkaSourceDescriptor
.static class
KafkaIO.TypedWithoutMetadata<K,V>
APTransform
to read from Kafka topics.static class
KafkaIO.Write<K,V>
APTransform
to write to a Kafka topic with KVs .static class
KafkaIO.WriteRecords<K,V>
APTransform
to write to a Kafka topic with ProducerRecord's.
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method and Description static <K,V> KafkaIO.Read<K,V>
read()
Creates an uninitializedKafkaIO.Read
PTransform
.static KafkaIO.Read<byte[],byte[]>
readBytes()
A specific instance of uninitializedread()
where key and values are bytes.static <K,V> KafkaIO.ReadSourceDescriptors<K,V>
readSourceDescriptors()
Creates an uninitializedKafkaIO.ReadSourceDescriptors
PTransform
.static <K,V> KafkaIO.Write<K,V>
write()
Creates an uninitializedKafkaIO.Write
PTransform
.static <K,V> KafkaIO.WriteRecords<K,V>
writeRecords()
Creates an uninitializedKafkaIO.WriteRecords
PTransform
.
-
-
-
Method Detail
-
readBytes
public static KafkaIO.Read<byte[],byte[]> readBytes()
A specific instance of uninitializedread()
where key and values are bytes. See #read().
-
read
public static <K,V> KafkaIO.Read<K,V> read()
Creates an uninitializedKafkaIO.Read
PTransform
. Before use, basic Kafka configuration should set withKafkaIO.Read.withBootstrapServers(String)
andKafkaIO.Read.withTopics(List)
. Other optional settings include key and valueDeserializer
s, custom timestamp, watermark functions.
-
readSourceDescriptors
public static <K,V> KafkaIO.ReadSourceDescriptors<K,V> readSourceDescriptors()
Creates an uninitializedKafkaIO.ReadSourceDescriptors
PTransform
. Different fromKafkaIO.Read
, setting uptopics
andbootstrapServers
is not required during construction time. But thebootstrapServers
still can be configuredKafkaIO.ReadSourceDescriptors.withBootstrapServers(String)
. Please refer toKafkaIO.ReadSourceDescriptors
for more details.
-
write
public static <K,V> KafkaIO.Write<K,V> write()
Creates an uninitializedKafkaIO.Write
PTransform
. Before use, Kafka configuration should be set withKafkaIO.Write.withBootstrapServers(String)
andKafkaIO.Write.withTopic(java.lang.String)
along withDeserializer
s for (optional) key and values.
-
writeRecords
public static <K,V> KafkaIO.WriteRecords<K,V> writeRecords()
Creates an uninitializedKafkaIO.WriteRecords
PTransform
. Before use, Kafka configuration should be set withKafkaIO.WriteRecords.withBootstrapServers(String)
andKafkaIO.WriteRecords.withTopic(java.lang.String)
along withDeserializer
s for (optional) key and values.
-
-