HelIo everyone,
I having a performance issue when using sink connector in upsert mode and with pk for record_values and the PK over 3 fields.
The lag grows quickly, even though kafka connect workers are working as a 3 nodes cluster as well as broker. I use 30 partitions and 30 sink tasks on that topic. The DB it selfs is on a SQL server (Azure) which should have enough power to handle it.
But we are talking about 220 million kafka records from a MySQL server placed by the source connector into the Topic.
Same works fast in insert mode and PK set to none. But this is not really an option.
Do you have any idea what could improve the speed?
I have tested this with another topic which has around 600k records which also still struggles with the performance.
Please find my configuration of the connector below. Transformantions etc. are done already in source connector tasks, so the sink is actually pretty straightforward.
{
"name": "attribute_values-01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://<schemaregistry>:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://<schemaregistry>:8081",
"connection.url": "jdbc:sqlserver://<sql_server>:1433;databaseName=<DB>",
"connection.user": "<user>",
"connection.password": "<password>",
"topics.regex": "input-attribute_values",
"auto.create": "true",
"auto.evolve":"true",
"table.name.format": "import_input-attribute_values",
"pk.mode": "record_value",
"pk.fields": "tenant_id, attribute_id, attribute_code",
"insert.mode": "upsert",
"tasks.max": 30
}
}