How the S3 Sink connector extract data

Hey All,

Is the S3 Sink connector looking for a specific key in the Kafka message to extract data?
When looking at my message in Kafka I see the following keys in the “top” level: “schema” and “payload”, but when I look at the files (“parquet” format) in S3, I only see the “payload” data. Is the connector using the “schema” information behind the scenes or can I remove it when sending the message to Kafka?

Thanks.

It sounds like you’re using org.apache.kafka.connect.json.JsonConverter with schemas.enable=true. When set, the schema is enabled within the JSON message. Only the payload itself is sent to S3.

See Kafka Connect Deep Dive – Converters and Serialization Explained | Confluent

Thanks, so I should be able to remove it, by setting value.converter.schemas.enable=false in the source connector, and it should not have any effect on the S3 sink. right?

Yes. But you might want to consider using Avro/Protobuf/JSON Schema so that the schema is still retained and available for other consumers of the data.

Hey,

I have tried to add

-e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE="false" \
-e CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE="false" 

to my kafka connect container, to remove the schema from debezium event and that worked, but not I’m getting the following error:

   {"id":0,"state":"FAILED","worker_id":"localhost:8083","trace":"org.apache.kafka.connect.errors.ConnectException: 
Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n\tSuppressed: java.lang.NullPointerException\n\t\tat io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:97)\n\t\tat io.confluent.connect.s3.TopicPartitionWriter.close(TopicPartitionWriter.java:313)\n\t\tat io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:249)\n\t\tat org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:410)\n\t\tat org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:644)\n\t\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)\n\t\t... 7 more\nCaused by: java.lang.StackOverflowError\n\tat org.apache.parquet.schema.LogicalTypeAnnotation$StringLogicalTypeAnnotation.getType(LogicalTypeAnnotation.java:308)\n\tat org.apache.parquet.schema.LogicalTypeAnnotation.toString(LogicalTypeAnnotation.java:172)\n\tat org.apache.parquet.schema.Types$BasePrimitiveBuilder$1.checkBinaryPrimitiveType(Types.java:565)\n\tat org.apache.parquet.schema.Types$BasePrimitiveBuilder$1.visit(Types.java:450)\n\tat org.apache.parquet.schema.LogicalTypeAnnotation$StringLogicalTypeAnnotation.accept(LogicalTypeAnnotation.java:303)\n\tat org.apache.parquet.schema.Types$BasePrimitiveBuilder.build(Types.java:447)\n\tat org.apache.parquet.schema.Types$BasePrimitiveBuilder.build(Types.java:338)\n\tat org.apache.parquet.schema.Types$Builder.named(Types.java:316)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:202)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertUnion(AvroSchemaConverter.java:226)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:182)\n\tat org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:141)\n\tat o

according to the Parquet section here: https://docs.confluent.io/kafka-connect-s3-sink/current/index.html I must use ProtobufConverter , JsonSchemaConverter , AvroConverter

but if I try to use JsonSchemaConverter I get an error that I must have schema.registry.url any suggestions?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.