Messages from Kafka to SQL Server in raw format

Hi.
I have to transfer data from kafka topics to SQL Server tables. One topic > one table. As I understand i should use kafka connect. I don’t know what does topic’s messages contains, I want to transfer them without any analizes to table with simple structure like
id - int
data - nvarchar(max) - put message here.

But now i see, that JDBC Sink Connector that can put message to SQL Server works with structured message, it want to know columns, primiry key and else.

I am afraid that my question is too simple, but googling does not show me any answer. Now I have two questions:

  1. Can I put message from Kafka topic to SQL Server table in raw format into nvarchar field without custom development of connector?
  2. Can somebody give me a link or sample of something like this, or show me the correct way?

Thanks to all who read till here :-).

I find an answer by my self, I should wrap around data by org.apache.kafka.connect.transforms.HoistField transform. My working sample:

{
    "name": "migration_crm_signers_to_hh_connector",
    "config": {
        "topics": "migration_crm_signers_to_hh",
        "connection.url": "jdbc:sqlserver://{{SQL_HOST}}:{{SQL_POST}};databaseName={{SQL_DATABASE}};",
        "connection.user": "{{SQL_LOGIN}}",
        "connection.password": "{{SQL_PASSWORD}}",
        "insert.mode": "insert",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "transforms": "HoistField,InsertOffsetField,PartitionField, TimestampField",
        "transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.HoistField.field": "data",
        "transforms.InsertOffsetField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertOffsetField.offset.field": "offset",
        "transforms.PartitionField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.PartitionField.partition.field": "partition",
        "transforms.TimestampField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.TimestampField.timestamp.field": "timestamp",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.behavior": "ignore",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter.schemas.enable": "false",
        "config.action.reload": "restart",
        "errors.retry.timeout": "0",
        "errors.tolerance": "none",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "table.name.format": "topic_${topic}",
        "delete.enabled": "false",
        "pk.mode": "record_value",
        "pk.fields": "offset",
        "auto.create": "true",
        "auto.evolve": "true",
        "max.retries": "1",
        "retry.backoff.ms": "3000",
        "schemas.enable": "false"
    }
}

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.