I’m trying to wrap my head around how Kafka Connect works and I can’t understand one particular thing.
From what I have read and watched, I understand that Kafka Connect allows you to send data into Kafka using Source Connectors and read data from Kafka using Sink Connectors. And the great thing about this is that Kafka Connect somehow abstracts away all the platform-specific things and all you have to care about is having proper connectors. E.g. you can use a PostgreSQL Source Connector to write to Kafka and then use Elasticsearch and Neo4J Sink Connectors in parallel to read the data from Kafka.
My question is: how does this abstraction work? Why are Source and Sink connectors written by different people able to work together? In order to read data from Kafka and write them anywhere, you have to expect some fixed message structure/schema, right? E.g. how does an Elasticsearch Sink know in advance what kind of messages would a PostgreSQL Source produce? What if I replaced PostgreSQL Source with MySQL source? Would the produced messages have the same structure?
It would be logical to assume that Kafka requires some kind of a fixed message structure, but according to the documentation the SourceRecord which is sent to Kafka does not necessarily have a fixed structure:
…can have arbitrary structure and should be represented using
org.apache.kafka.connect.data objects (or primitive values). For
example, a database connector might specify the sourcePartition as
a record containing { "db": "database_name", "table": "table_name"}
and the sourceOffset as a Long containing the timestamp of the row".
Kafka Connect has an internal data format that each record is converted to prior to either sourcing or sinking. Connect authors code to write to this normalized format in source connectors and read from it when sinking. Connect then supports Converters for the read/write to Kafka itself (Kafka Connect Concepts | Confluent Documentation). So the data “on the wire” will end up as one of these supported formats and will allow the connectors to agree (via configuration) on the data format.
“Kafka Connect has an internal data format that each record is converted to prior to either sourcing or sinking”
This means that Kafka Connect relies on each connector to perform data normalization in which specific formats (i.e.: PostgreSQL, MySQL, Elasticsearch) are programmatically translated to the SourceRecord. In this context, I have used the word “programmatically” here intentionally to emphasize the fact that it is up to the connector author dealing with data normalization.
Important to note that this data normalization precedes the usage of converters. This means that Kafka Connect’s architecture doesn’t expect the users to apply converters to achieve data normalization. It is up to the connector to handle this — as they are the ones that know how to handle the specific format.
Take the code below as an example. This is the poll() method of a source connector that I wrote for Apache Pulsar. Mind that I iterate over different possible formats (Bytes, Generic, Avro, Protobuf) to ultimately build up an array of SourceRecords.