How to restrict the size form varchar(max) to a fixed size? Issue
Using JDBC sink connector to get data from kafka topic to mssql database. The kafka topic has data in Avro format and fields are of string type.Now the connector is throwing error when trying to set the string field ID as PK, as the datatype interpreted by connector is coming as varchar(max), this is not supported by the JdbcSinkConnector.
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:xxx) com.microsoft.sqlserver.jdbc.SQLServerException: Column ‘ID’ in table ‘XXXX’ is of a type that is invalid for use as a key column in an index.
Below is the data in kafka topic (value):
{ “XXXX”: { “string”: “XXXXX” }, “ID”: { “string”: “rlUkiMbzl1” }, “XXXX”: { “string”: “XXXXX” }, “XXXX”: { “string”: “XXXXX” } }
I have below configuration for sink connector:
{
“name”: “name-of-sink-connector”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“table.name.format”: “table_name”,
“connection.password”: “password”,
“topics”: “kafka-topic”,
“tasks.max”: “1”,
“value.converter.schema.registry.url”: “http://xx.xx.xx.xx:8081”,
“delete.enabled”: “false”,
“auto.evolve”: “true”,
“connection.user”: “user”,
“transforms.unwrap.drop.tombstones”: “false”,
“name”: “name-of-sink-connector”,
“auto.create”: “true”,
“connection.url”: “jdbc:sqlserver://xx.xx.xx.xx:1433;databaseName=dbname;encrypt=true;trustServerCertificate=true”,
“value.converter”: “io.confluent.connect.avro.AvroConverter”,
“insert.mode”: “upsert”,
“pk.mode”: “record_value”,
“key.converter”: “io.confluent.connect.avro.AvroConverter”,
“key.converter.schema.registry.url”: “http://xx.xx.xx.xx:8081”,
“pk.fields”: “ID”,
“quote.sql.identifiers”: “never”
}
}
Things tried: In connector configuration:
“transforms”: “Cast”, “transforms.Cast.type”: “org.apache.kafka.connect.transforms.Cast$Value”, “transforms.Cast.spec”: “ID:string”
In avro schema avsc file:
“logicalType”: “varchar”, “maxLength”:100
Requirement: Data should flow from kafka topic to mssql database, if the table is not present it should create table based on the schema coming from topic, provided the data in kafka topic is in Avro format and the datatype is string and the table should have field “ID” (string) as PK , mode should be upsert.