Sink connector to retrieve Long Key and Long value from kafka topic and store in a database

I have a simple requirement.

Retrieve the Long Key and Long value from a Kafka topic and store them in a database using a Sink connector.

I tried to use org.apache.kafka.connect.converters.LongConverter for key and value. But it throws an exception

Hi @SDev,

Which database / connector do you have in mind for sinking? Most require Schema Registry usage so that the connector knows how to map from the bytes in Kafka to typed fields. E.g., if you were using the JDBC Sink connector, schema is required for the value.

In other words, rather than store primitive Long’s in Kafka (serialized with org.apache.kafka.common.serialization.LongSerializer), you would store Avro or protobuf or JSON schema formatted records where the schema just has the single long field. This assumes that you have control over the Kafka producer - is that the case?

UPDATE: Based on the comment here saying that LongConverter “will always return an optional INT64 schema,” I thought that maybe it would just work and a default field name would be used, but I’m not finding that to be the case in my testing. I hit a ConnectException saying that primitive value isn’t allowed with this particular connector: “Sink connector ‘JdbcSinkConnectorConnector_0’ … requires records with a non-null Struct value and non-null Struct schema, but found record … with a Long value and int64 value schema.”

HTH,
Dave

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.