Hi.
I have to transfer data from kafka topics to SQL Server tables. One topic > one table. As I understand i should use kafka connect. I don’t know what does topic’s messages contains, I want to transfer them without any analizes to table with simple structure like
id - int
data - nvarchar(max) - put message here.
But now i see, that JDBC Sink Connector that can put message to SQL Server works with structured message, it want to know columns, primiry key and else.
I am afraid that my question is too simple, but googling does not show me any answer. Now I have two questions:
- Can I put message from Kafka topic to SQL Server table in raw format into nvarchar field without custom development of connector?
- Can somebody give me a link or sample of something like this, or show me the correct way?
Thanks to all who read till here :-).
I find an answer by my self, I should wrap around data by org.apache.kafka.connect.transforms.HoistField transform. My working sample:
{
"name": "migration_crm_signers_to_hh_connector",
"config": {
"topics": "migration_crm_signers_to_hh",
"connection.url": "jdbc:sqlserver://{{SQL_HOST}}:{{SQL_POST}};databaseName={{SQL_DATABASE}};",
"connection.user": "{{SQL_LOGIN}}",
"connection.password": "{{SQL_PASSWORD}}",
"insert.mode": "insert",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"transforms": "HoistField,InsertOffsetField,PartitionField, TimestampField",
"transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.HoistField.field": "data",
"transforms.InsertOffsetField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertOffsetField.offset.field": "offset",
"transforms.PartitionField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.PartitionField.partition.field": "partition",
"transforms.TimestampField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.TimestampField.timestamp.field": "timestamp",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.behavior": "ignore",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"config.action.reload": "restart",
"errors.retry.timeout": "0",
"errors.tolerance": "none",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"table.name.format": "topic_${topic}",
"delete.enabled": "false",
"pk.mode": "record_value",
"pk.fields": "offset",
"auto.create": "true",
"auto.evolve": "true",
"max.retries": "1",
"retry.backoff.ms": "3000",
"schemas.enable": "false"
}
}