Conversation from Confluent Community Slack. Copied here to make it available to all.
David Pratt
Hey all, I’ve got what is probably a “beginner” question. I have a sink connector (io.confluent.connect.jdbc.JdbcSinkConnector
) that writes to a SQL Server database using insert.mode = upsert
. The topic being written has 100k messages, and one partition.
The issue is that it takes the connector around 30 minutes to write all 100k messages (when the connector is first started). What’s going on here? How can I speed this up to take not more than a minute?
Some observations and basic information about my configuration:
- The topic has 100k messages
- The topic has 1 partition
- I have observed the same behavior when I use similarly sized topics with more partitions - the timing is effectively identical despite increase in partitions and
tasks.max
- I have made sure there is no other activity on the database when the sink connector is writing to it
- The database has two cores. The CPU utilization is ~40% when the upsert connector is running
- The database is not tight on space. It’s a fresh Azure SQL DB
- I have tried
upsert
when the table is auto created by the connector (with a clustered primary key) - I have tried
upsert
when the table is created by me (with no primary key) with effectively the same results - I have tried
upsert
when the table is created by me (with a nonclustered primary key) with effectively the same results - I have tried
insert
when the table is auto created by the connector (with a clustered primary key). All 100k messages are written within 10 seconds (!)
I suspect the reason why the connector is fast ininsert
mode and slow inupsert
mode is becauseupsert
mode is configured to use aMERGE
statement with aHOLDLOCK
(see here: https://github.com/confluentinc/kafka-connect-jdbc/blob/18ff2a3b15cc77bd08a90be3ceddfc1da833a839/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java#L327).
From my understanding,HOLDLOCK
places a lock on the table for the entire duration of the transaction, not just the time required to lock a row or page. I’m also somewhat sure that each message written to the database has it’s ownMERGE
statement - that is to say there is no batching of multiple records together.
What are my options here? I really need to keep the connector inupsert
mode for the long term. Should I just do all large batch loads with an insert connector (immediately after compacting the topic)? Should I use staging tables and a stored proc for merges? Or are there some settings in my connector config that I can adjust?
Here’s my connector config (with upserts and a table that has a clustered primary key):
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "${JDBC_CONNECTION_STRING}",
"connection.user": "${JDBC_CONNECTION_USER}",
"connection.password": "${JDBC_CONNECTION_PASSWORD}",
"topics": "${TOPIC_NAME}",
"tasks.max": 1,
"insert.mode": "upsert",
"delete.enabled": true,
"pk.mode": "record_key",
"pk.fields": "id",
"table.name.format": "${TABLE_NAME}",
"auto.create": true,
"auto.evolve": true,
"quote.sql.identifiers": "always",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.basic.auth.credentials.source": "${BASIC_AUTH_CREDENTIALS_SOURCE}",
"key.converter.schema.registry.basic.auth.user.info": "${SCHEMA_REGISTRY_API_KEY}:${SCHEMA_REGISTRY_API_SECRET}",
"key.converter.schema.registry.url": "${SCHEMA_REGISTRY_URL}",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.basic.auth.credentials.source": "${BASIC_AUTH_CREDENTIALS_SOURCE}",
"value.converter.schema.registry.basic.auth.user.info": "${SCHEMA_REGISTRY_API_KEY}:${SCHEMA_REGISTRY_API_SECRET}",
"value.converter.schema.registry.url": "${SCHEMA_REGISTRY_URL}",
"value.converter.schemas.enable": "false"
}
Robin Moffatt
Hi David, thanks for all of the info and problem statement, nicely detailed
- I have observed the same behavior when I use similarly sized topics with more partitions - the timing is effectively identical despite increase in partitions and tasks.max
When you increase the partitions and tasks.max
how many tasks do you see running for the worker? I would expect this to parallelise things.
(Whether that still bottlenecks on the DB side of things is another matter, but I think understanding where the bottleneck lies is the first step.)
David Pratt
Hey Robin! Thanks for the response.
When I was using more than a single partition, I was using 3 partitions and tasks.max
was set to 3. Sure enough, there were 3 tasks running under the connector.
I’ve set the kafka connect logs to TRACE
and I can see that 500 messages are being read from the topic and ‘flushed’ from the task (presumably being written to the DB). Reading the 500 messages from the kafka topic is near instant. Flushing the records from the task is what takes some time. (And that time increases as the number of records in the DB table increases!)
By the way, have you seen my issue before? I have a really strong suspicion the root cause is the HOLDLOCK
used as a hint to the MERGE
statement - and thus is specific to SQL Server. (Once I get the chance today I’m going to modify the kafka connect jdbc plugin to not use HOLDLOCK
)
Thanks again
David Pratt
Update: I rebuilt the jdbc plugin without the HOLDLOCK
hint and got the same results. I also built it with a different hint (READCOMMITTED
) and got the same results. I’m definitely stumped on this one.
Robin Moffatt
I’ve not seen this issue before, but to be fair I’ve not tried writing 100k records to MSSQL
Robin Moffatt
When I was using more than a single partition, I was using 3 partitions and
tasks.max
was set to 3. Sure enough, there were 3 tasks running under the connector.
So with an increase in tasks, what behaviour do you see on the database?
Is the lock it’s taking an exclusive one? (it’s a while since I’ve worked with MS SQL, so I’m less familiar with the different locks/hint semantics)
I think the thing here is to understand if the bottleneck is in getting the data pushed from Kafka out through Kafka Connect, or in getting it written to the database. This will also dictate quite where your answer is going to come from - a Kafka person, or a DBA
You can increase throughput out of Kafka by using partitions and parallel tasks, but that’ll be no use if it’s the database bottlenecking.
David Pratt
Hey Robin - found the issue.
We were letting Kafka Connect auto create our tables for us. Our topics have a primitive string key. Kafka Connect creates the db table with a key of type varchar(900)
. Turns out, indexing a column that’s varchar(900)
(when most values have not more than a dozen characters) is a bit troublesome for the db.
Switching up the types, using GUIDs (and the uniqueidentifier
type in MSSQL) solved the issue. We’re upserting 1.7m messages in less than 10 minutes on a pretty low-spec database with that configuration.
If I may offer a suggestion to the Kafka Connect documentation… I think it would be useful to see a small warning to new users that performance on the db side could be adversely affected if Kafka Connect auto-creates a table (or at least, automatically chooses the datatype of columns). If I saw a warning like that, I wouldn’t have spent as much time on this as I did.
That is a bit different than the warning seen here. Though, frankly, I should have got the hint
Anyway, thank you for your help - I really appreciate that Confluent representatives are as active as they are.
Robin Moffatt
hi David - that’s great you fixed it, and I appreciate you taking the time to report back on the solution!
Robin Moffatt
I’ll pass the docs suggestion onto our team here
Robin Moffatt
David, we’ve updated the doc as suggested: https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/sink_config_options.html#ddl-support