How to increase the partition of existing topic while using S3 sink connector

I am using S3 sink connector along with kafka connect and trying to load data to s3 . somehow I am unable to update or increase the size of topic partition and also cannot change offset.flush.timeout.ms value. I am trying to add that in the S3 connector curl file which I am using but nothing updates.

{
    "name": "my-s3-sink3",
     "config": {
         "connector.class"               : "io.confluent.connect.s3.S3SinkConnector",
         "tasks.max"                     : "1",
         "topics"                        : "mysource.topic",
         "s3.region"                     : "us-east-1",
         "s3.bucket.name"                : "topicbucket001",
         "s3.part.size"                  : "5242880",
         "flush.size"                    : "1",
         "key.converter"                 : "org.apache.kafka.connect.storage.StringConverter",
         "value.converter"               : "org.apache.kafka.connect.json.JsonConverter",
         "value.converter.schemas.enable": "false",
         "storage.class"                 : "io.confluent.connect.s3.storage.S3Storage",
         "format.class"                  : "io.confluent.connect.s3.format.json.JsonFormat",
         "partitioner.class"             : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
         "schema.compatibility"          : "NONE"
         "offset.flust.timeout.ms"        : 1000
         "topic.creation.default.replication.factor": 3,
         "topic.creation.default.partitions": 10,
         "topic.creation.default.compression.type": "snappy"
        }
    }

@Kanikamiglani31 can you share the curl command you’re running to update the config? Also for clarity, when you say “increase the size of topic partition”, you are talking about increasing the number of partitions in Kafka itself using kafka-topics command?

Hey rick,
Thanks for the response here . I am using curl -X POST http://localhost:8083/connectors -H 'Content-Type:application/json' -H 'Accept:application/json' -d "{The json mentioned in above ticket }

Increasing number of partitions for existing topics I am creating in kafka (I am using AWS MSK for kafka though) but I am creating topics with above config using s3 sink connector.
Even if I can create partitions for new topics topics , I am good with that .

Note : I am enabled auto.create.topic=true in my kafka setup , so I am just passing a name of topic in my s3 sink connector config and it is automatically creating topics.

Thanks
Kanika

Kanika- Check out the Kafka Connect REST API docs, in particular the PUT command for an existing connector’s configuration.

https://docs.confluent.io/platform/current/connect/references/restapi.html#put--connectors-(string-name)-config

Heyy Rick,

I don’t see anywhere where it creates partitions for the topics are there any config for that ?

Thanks
Kanika

Hey Rick ,
do you know If need to sent data to multiple topics in S3 , what config is required in S3 connector config . I have added :
topics.regex : (SAP|EQ).(.*)
although this creates multiple topics in kafka but doesnt create topics in S3 bucket .
Do i need to add more config in connector in order to land data in S3?

Hello @Kanikamiglani31 , i’m not sure what you mean when you say “multiple topics in S3”. As you’ve discovered, the topics.regex configuration controls how the S3 sink connector reads from Kafka topics. Are you trying to read from multiple Kafka topics to sink to S3, or are you trying to route 1 Kafka topic to multiple S3 destinations?

I’m not certain, but I think the S3 sink connector is only designed to route events to a single S3 bucket via the s3.bucket.name configuration

Apologies , so I am trying to read from multiple kafka topics to one S3 bucket.
Right now after adding :
topics.regex : (SAP|EQ).(.*)
I see multiple topic getting created in kafka cluster but they are not being copied to S3 bucket , am I missing any other config apart from topics.regex in s3 connector ?

Have you debugged by confirming that the topics contain data? When you say “the topics are created” that leads me to believe that they are created by the consumer inside the S3 sink connector, in which case maybe data does not exist. However, I’m only speculating I would recommend you debug with standard console tools to verify that the topics have the data you would expect, and then investigate the sink connectors consumer group ID to determine if it is reading properly. Good luck!

Hey Rick,
Thank you for the details here .
Yes, I did verify, there is data in the topics which are getting created on Kafka cluster. So where you said that “investigate the sink connectors consumer group ID to determine if it is reading properly” - Do you know how that can be achieved?
Thanks

@Kanikamiglani31 You could try using the Kafka Consumer Group tool to investigate the consumer groups and their current position in the topic. Here is some documentation on that tool: Kafka Consumer | Confluent Documentation

If you have access to a UI, like Confluent Cloud or Control Center (there are other options as well), you may have access to a screen that can show you status of a consumer group.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.