Debezium connector for MS SQL - can we change/set topic name?

We are using the Debezium connector and everything is working great…BUT…is there a way to change or set the topic name of the topic that it creates?
For example, we create a connector and it creates the topic “Server.Schema.Table” but what if we wanted that topic to be named “server.schema.table”?
Thanks

Single Message Transforms are your friend here.

See com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase example shown here.

Hmm I really just need everything in lowercase but not sure that is an option here. I will play with it…thanks for the suggestion.

I don’t understand. Using Single Message Transform isn’t an option do you mean?

Well it mentions camel, underscore, and hyphens…I just need everything to be lowercase.

Tried multiple options and although it worked, it did not produce what I needed. I don’t need to change hyphens or underscores…I just need to lower or upper case it.

Can you share the config you used and the input/output topics names that you got?

So first I tried RemoveString, then Reroute, then ChangeTopicCase, some of them did change the name of the resulting topic, but the connector fails with avro format issues and/or schema registry issues.

curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/peterSourceV3/config \
 -d '{
 "connector.class":"io.debezium.connector.sqlserver.SqlServerConnector",
 "database.hostname":"server",
"database.connection.url":"jdbc:sqlserver://server;databaseName=db;integratedSecurity=true;authenticationScheme=JavaKerberos",
 "database.user":"service",
 "database.password":"",
 "database.integratedSecurity":"true",
 "database.authenticationScheme":"JavaKerberos",
 "database.dbname":"db",
 "database.server.name":"server",
 "table.include.list":"dbo.AdvisorUse",
 "database.history.kafka.bootstrap.servers":"localhost:9092",
 "database.history.kafka.topic":"peterSource_history",
 "transforms":"changeTopicCase", "transforms.changeTopicCase.type":"com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase",
 "transforms.changeTopicCase.from":"LOWER_CAMEL",
 "transforms.changeTopicCase.to":"LOWER_UNDERSCORE"
}'

What’s the error that you get?

{"name":"testSourceV3","connector":{"state":"RUNNING","worker_id":"localhost:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:220)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:330)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:359)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:272)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic testsourcev3_worked :
	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
	at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:330)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)
	... 11 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"server.dbo.AdvisorUse\",\"fields\":[{\"name\":\"Id\",\"type\":\"long\"}],\"[connect.name](https://urldefense.com/v3/__http:/connect.name__;!!Cz2fjcuE!iHaFhycNNa7Z-FUbmKrYfzv9rbWGLPPHcLQXdmM6hxMoxDkx6g60FZkGUJXCdAKn_jgLk6-aiWrvUmU2QMdeOiwFNQ$)\":\"server.dbo.AdvisorUse.Key\"}
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
	at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
	... 15 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject \"testsourcev3_worked-key\"; error code: 409
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:544)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:532)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:490)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:257)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:366)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:337)
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:115)
	... 17 more
"}],"type":"source"}

That’s your problem there. So either target a new topic, or use a schema that is compatible with the existing topic.

I don’t understand though…this is the initial creation of the topic, there should be no earlier schema.

Maybe from a previous test run of the connector? You can use the Schema Registry REST API to delete previous schemas if you want

Ref:

I thought of that too…deleted every instance of -key and -value I could find…same thing

Debezium works great, I have 75+ tables being synced…but no matter what I do, I cannot set or change the name of the default topic that Debezium creates. What I want to do is simple…I just want to be standard with my topic naming…ALL LOWERCASE. SQL is not and doesn’t matter, but java is so to make things bulletproof, I just need the topic name to be all lowercase regardless of how the SQL table is. Tried numerous transformations and some change the topic name but connector fails with avro issue or schema registry incompatibility.
Does anyone have a working example?

Tried numerous transformations and some change the topic name but connector fails with avro issue or schema registry incompatibility

I cannot recall the ordering of the registry client lookup, but assuming it is after the topic name is changed, the upper/mixed-case subject will need to copied manually in the Registry API to a fully-lower cased subject.

Sorry but I am not quite understanding what you mean.
By default, it looks like the Debezium connector creates the topic name with the exact case retrieved from the SQL server using the pieces like Server.Database.Table. The problem is that case doesn’t always match in every environment…so I was hoping to simply lowercase the topic name to make it generic across connectors and environments. I cannot seem to get this working however.

  1. There is an external transform library here for changing case
  2. There is an order that transforms get applied, and my point was that if you lowercased the topic in the wrong order of the AvroConverter, then the original topic name is what is used for the /subjects/{name} API call to lookup and/or register the schema. You’ll want to lowercase before that API call happens.

If you cannot control this, you need to GET /subjects/subjectName-value/versions/X and POST /subjects/subjectname-value/versions for each version of the schema manually.

  1. ChangeCase seems like a record transformation but ChangeTopicCase looks promising except there is no simple lower option…I don’t need to add hyphens or underscores, I just need to lowercase. I will give this a try though.
  2. Do you think the order of stuff in the config file can resolve?

Interesting…so I just tried this and well, things are working, which is awesome, but the resulting name of my topic is not what I wanted…
Actual: TXI1SQLDEV01.sddev.com.dbo.AdvisorUse
Desired: txi1sqldev01.sddev.com.dbo.advisoruse
Result: t_x_i1_s_q_l_d_e_v01.sddev.com.dbo._advisor_use

“transforms.changeTopicCase.from”:“UPPER_CAMEL”,
“transforms.changeTopicCase.to”:“LOWER_UNDERSCORE”,