Kafka Topic data File extension issue in AWS S3 Bucket

Hello

I am trying to insert JSON data present in kafka topic into AWS S3 bucket.

As per configuration data is getting streamed into S3 bucket but i am facing issue file extension.

In S3 bucket data is inserted in .bin extension but expecting extension is .json

What chnaged need to be done to make extension of data getting inserted into s3 in json extension

.

Hi

could give some details about how you write the data to your S3 bucket?

According to the file name I assume you are using Kafka connect, correct?

Hi

I am loading JSON data into Kafka topic has object type using below command

./bin/kafka-console-producer --broker-list localhost:9092 --topic connect-test --property value.schema=‘{“type”:“object”,“name”:“myrecord”,“fields”:[{“name”:“f1”,“type”:“string”}]}’

input values :
{“f1”: “value1”}
{“f1”: “value2”}
{“f1”: “value3”}

S3-sink config :
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=connect-test
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.credentials.provider.class=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
s3.region=us-east-2
s3.bucket.name=snowflakelab2
s3.part.size=5242880
flush.size=3
#s3.credentials.provider.class=AwsAssumeRoleCredentialsProvider
s3.credentials.provider.access_key_id =
s3.credentials.provider.secret_access_key =
sts.role.arn =
sts.role.session.name = mysnowflakerole
format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
#format.class=io.confluent.connect.s3.format.avro.AvroFormat
#value.converter.schema.registry.url = http://localhost:8081
#format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
#partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
schema.generator.class = io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
schema.compatibility=NONE
#partition.field.name=
#partition.duration.ms=
#path.format=
#locale=
#timezone=

I see, I would recommend to try the following

format.class=io.confluent.connect.s3.format.json.JsonFormat

and remove

hth,
michael

ok will give it a try let you know if I face any issue.

Thanks
Rama

1 Like