KafkaConnect with Amazon Sink S3 Sink Connect is not working

Hi Team,

I am using Strimzi Kafka inside my K8s cluster and I want to use KafkaConnect to archive my topics data to S3 bucket. So I created a docker image using the following Dockerfile.
Dockerfile:

FROM quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

And I download and extract the confluent AWS s3 sink plugin from the following link and using the above Dockerfile.

https://www.confluent.io/hub/confluentinc/kafka-connect-s3?_ga=2.34258849.1091420604.1648005277-226780291.1648005277

I could see that the KafkaConnect pod is running, but the topics are not archiving to the S3 bucket. And I could see the following errors from the KafkaConnect pod:

2022-03-25 05:22:04,727 ERROR [s3-sink-connector|task-0] WorkerSinkTask{id=s3-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-s3-sink-connector-0]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
	... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'exit': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"exit"; line: 1, column: 5]
	at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:66)
	at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
	... 17 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'exit': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"exit"; line: 1, column: 5]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3593)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2688)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:870)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:762)
	at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4622)
	at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
	at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
	... 18 more

KafkaConnector YAML:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: s3-sink-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: "strimzi-kafka"
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  tasksMax: 1
  config:
    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false
    s3.bucket.name: test-kafka-connect
    topics: kafka-topic
    topics.dir: topics
    path.format: "YYYY/MM-dd/HH"
    flush.size: 99999999
    storage.class: io.confluent.connect.s3.storage.S3Storage
    format.class: io.confluent.connect.s3.format.json.JsonFormat
    s3.compression.type: gzip
    timezone: UTC
    aws.access.key.id: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
    aws.secret.access.key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
    s3.region: ca-central-1

Can you help me to resolve this error?

Is it valid JSON on your topic?

Ref: Kafka Connect Deep Dive – Converters and Serialization Explained | Confluent

Dear @rmoff Thanks for taking my doubts. The provided error I am getting when I am creating a new KafkaConnetor. But this error is not getting me for now in the logs. If I send some JSON data to my test Kafka topic(kafka-topic), parse and consumers are working fine. But the data is not archived in the S3 bucket. The following lines are my test JSON:

'{"name":"John", "age":20, "car":kia}'
'{"name":"mike", "age":30, "car":toyota}'
'{"name":"jack", "age":40, "car":benz}'
{"number":0}

I don’t know what I did wrong here!

@rmoff UPDATE: I am actually doing a PoC now, that’s why these JSON dummy values. In the production level, we are sending logs from filebeat/logstash to Kafka

I’m not clear from your post if you’ve now overcome the error or not.

What’s the status of the connector? Ref: Monitoring Kafka Connect and Connectors | Confluent Documentation

If there’s no data in S3 then the likely causes are one or more of:

  • The connector has failed, and isn’t reading any data
  • The connector is running but the messages it consumes from the topic it can’t handle so it’s dropping them
  • The connector is running, but has no data to read from the topic and so there’s no data written to S3

Hi @rmoff , Thanks for your response. The previously reported error is not showing now. I have tested the commands which you shared. Please see the below details. I strongly believe that the connector running fine in the cluster. But still, the data is not archiving to the S3 bucket.

I ran the following commands inside from the KafkaConnect pod:

$curl -s "http://localhost:8083/connectors/s3-sink-connector/status"

{
  "name": "s3-sink-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.244.2.152:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.244.2.152:8083"
    }
  ],
  "type": "sink"
}
$ curl localhost:8083/ | jq

{
  "version": "3.1.0",
  "commit": "37edeed0777bacb3",
  "kafka_cluster_id": "3lgIEbX1QEi8YlxKRnpkIw"
}

$ curl localhost:8083/connector-plugins | jq

[
  {
    "class": "io.confluent.connect.s3.S3SinkConnector",
    "type": "sink",
    "version": "10.0.6"
  },
  {
    "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
    "type": "source",
    "version": "3.1.0"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "3.1.0"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "3.1.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]
$ curl localhost:8083/connectors

["s3-sink-connector"]
$ curl localhost:8083/connectors/s3-sink-connector/tasks | jq

[
  {
    "id": {
      "connector": "s3-sink-connector",
      "task": 0
    },
    "config": {
      "connector.class": "io.confluent.connect.s3.S3SinkConnector",
      "s3.region": "ca-central-1",
      "topics.dir": "topics",
      "flush.size": "99999999",
      "s3.part.size": "10485760",
      "timezone": "UTC",
      "tasks.max": "1",
      "rotate.interval.ms": "3600000",
      "locale": "en",
      "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
      "aws.access.key.id": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "s3.bucket.name": "test-kafka-connect",
      "partition.duration.ms": "3600000",
      "schema.compatibility": "NONE",
      "topics": "kafka-topic",
      "aws.secret.access.key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}",
      "key.converter.schemas.enable": "false",
      "s3.compression.type": "gzip",
      "task.class": "io.confluent.connect.s3.S3SinkTask",
      "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
      "value.converter.schemas.enable": "false",
      "name": "s3-sink-connector",
      "storage.class": "io.confluent.connect.s3.storage.S3Storage",
      "path.format": "YYYY/MM-dd/HH",
      "timestamp.extractor": "Record"
    }
  }
]

OK, looks like the connector is indeed running - so then it’s on to the next two possibilities that I outlined:

For the first one, check the details of Kafka Connect error handling.

For the second one, make sure you’re producing data to the topic.

Also bear in mind that data has to flush to S3 so you won’t see it instantaneously. In fact, I think we have a third option to add to the two above to consider:

Check out the documentation:

  • flush.size specifies the number of records per partition the connector needs to write before completing a multipart upload to S3.
  • rotate.interval.ms specifies the maximum timespan in milliseconds a file can remain open and ready for additional records

I think with these values set very high it may be that everything is working exactly as you’ve told it to. Try setting the values lower for now to confirm that the pipeline is working, and then increase them to suitable values afterwards.

Hi @rmoff
I tried with the following values inside the kafka connector, but it is still not working. There is no errors,but the data is not archiving to S3.

s3.part.size: 5242880
flush.size: 999
rotate.interval.ms: 360

The JSON data I inputed:

{"registertime":1493456789434,"userid":"User_3","regionid":"Region_3","gender":"FEMALE"}

Test command for pushing data to kafka:

bin/kafka-console-producer.sh --broker-list prod-kafka-bootstrap.myhost.com:443 --producer-property security.protocol=SSL --producer-property ssl.truststore.password=xcMztndACxK7 --producer-property ssl.truststore.location=/tmp/user-truststore.jks --producer-property ssl.keystore.password=xcMztndACxK7 --producer-property ssl.keystore.location=/tmp/user-keystore.jks --topic kafka-topic

I’m new to Kafka Connect myself, but one thing that helped me run it (before I tried it with Strimzi inside Docker) was to disable the schema lookup.

Do you intend the connector to interpret the keys and values in the JSON messages? Or do you want it to just dump the messages “as they are”, treating them as simple byte strings?

I use Avro for storage format, to dump the messages verbatim for backup, but I think the same (or part of it) could hold here:

    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    schemas.enable: false
    schema.compatibility: NONE

I think you are missing the schemas.enabled: false

I think you are missing the schemas.enabled: false

On its own, this property does nothing. Bytes never have a schema. Avro always has a schema.

value.converter.schemas.enable (or for key), on the other hand is only applicable to the JsonConverter.

1 Like

archive my topics data to S3 bucket

Keep in mind that you’re not storing the record keys and are losing post-hour accuracy of the records, so you’re not making a valid “archive” if the intention was to have the data be recoverable

I have the same issue Amazon s3 sink connector running but not writing to s3, here is my config for the sink connector:-

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=eu-west-2
flush.size=200
schema.compatibility=NONE
tasks.max=1
topics=prod-arbor-test
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
s3.bucket.name=debezium-test-data

Thanks for the correction.

Without disabling the schema lookup (we don’t use schemas in our topics, some use JSON message format and some use Protobuf), the connector refused to write and complaint about something like “can’t find schema” or “failed to parse the JSON” (I don’t have the exact message right now).

Only when I set the fields to tell it not to look for keys vs. values did it start generating Avro files on the S3 bucket. Reading the Avro objects seemed to contain the messages correctly.

Are keys always supposed to exist?

@motahir it might be more useful to create your own post, including the full stacktrace and version you’re using, and describe what data types your producer is sending. From the original post, a plain string “exit” is being consumed and simply cannot be parsed by the JSON converters being used. Hopefully that helps debug your own error.

Regarding rest of this thread, I personally wouldn’t set rotation intervals to less than 5 minutes. Otherwise, you’ll get lots tiny files in S3, and you need to pay for each those bucket scans! For the default partitioner, you’ll need to wait for the full flush size…

If there’s no errors, you’ll have to look further into JMX metrics to see if consumers are actually reading topic data

@gliderflyer Keys can be null, based on the producer’s needs. I was mostly referring to the original post about “archival”, which ideally means the full record is going to be persisted in its most raw format. For example, Avro files would contain full schemas, not schema ids from the registry.

Also, there is a setting like “include.keys” that didn’t exist until around 6.x release chain, so record keys could never be written without a transform