S3 Sink Connector only contains part of Kafka topic

Hello All, I am trying to understand the behavior of the S3-Sink.
Using the debezium/example-postgres:1.0 docker image for the source db. I have set up dbz connector with

"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true"

the next step is to psql into the db and create a simple row.
then by another process in the debezium/kafka:1.0 docker image I can utilize the watch-topic functionality and see that the topic that is written out is in this format

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1021}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"}, ...rest}

This is good news since I can see that there is the key schema json blob and the value schema json blob, that is tab/space delimited.
Now when I create the s3-sink connector that monitors the topic names, the resulting json that is in the s3 bucket only contains the the value schema json blob. I was wondering if this is the natural behavior of the s3-sink connector, where when it consumes the topic, it only outputs the second part of the topic into the s3 bucket.

Hi jlam

If I recall correctly, by default it is only the value that is written to S3. You will need to add store.kafka.keys=true to sink the keys, and store.kafka.headers=true to store the headers.
The Confluent docs have more information on this: https://docs.confluent.io/kafka-connect-s3-sink/current/overview.html#writing-record-keys-and-headers

Hope this helps!

2 Likes

Hello,
Thank you for the link and extra configuration fields, I have added the following to the s3-sink connector

"keys.format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"store.kafka.keys": "true"

unfortunately I was testing it and I was not able to see the *.key.json in addition to the regular *.json file. so I am back to the same problem where I only see the values part of the topic. Would this configuration field perhaps only have an affect if I were using the Avro converter?

I admit that I am not entirely sure. I have always preferred to use Avro over JSON. I’m looking at the source code and it seems to be entirely agnostic to the schema type.

A few things to double check:

  1. Did you restart the connector after changing the configs? Ongoing connectors need to be restarted to propagate the config changes.
  2. Are you certain there are keys in the record? (I suspect yes, but just troubleshooting)
  3. Try turning on the “headers” option to see if that data comes out. Leave it as the default AvroFormat. It should write header data out in a .avro file. This is just to validate if any of these configs work at all.

Let me know

Sorry, it was a total mistake on my part.

The reason why I was not seeing results from your first comment is because I was using an older version of kafka-connect-s3. Looking through the source code for the version that I had, I was not able to see the flags for STORE_KAFKA_KEYS_CONFIG or STORE_KAFKA_HEADERS_CONFIG. I then upgraded and tested with v10.0.5, and I was able to resolved my issue, and now able to see the two files.

Thank you for your help, I will marked your initial response as the solution.

Perfect! I’m glad you sorted it out. Thanks for the details, hopefully anyone else coming across this will find it useful.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.