Hi,
I created the following jdbc source connector for a mysql database:
{
"name": "mysql-source-patients",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/admindb?characterEncoding=UTF-8&serverTimezone=UTC",
"connection.user": "root",
"connection.password": "rootpw",
"table.whitelist": "patients",
"mode": "timestamp+incrementing",
"timestamp.column.name": "updated_at",
"incrementing.column.name": "id",
"topic.prefix": "mysql.admindb.",
"tasks.max": "1",
"_comment": "--- Add key to the message based on the entity id field ---",
"transform": "createKey,extractId",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "id",
"transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractId.field": "id"
}
}
The connector seems to work fine since I see the db records into the ‘mysql.admindb.patients’ topic. The problem is that the key is always set to null.
By reading the documentation, the properties needed to set the message key are “transforms.*”.
Avro schema:
{
“type”: “record”,
“name”: “Patient”,
“namespace”: “ch.demo.gpietro.schema”,
“fields”: [
{
“name”: “id”,
“type”: “long”
},
{
“name”: “created_at”,
“type”: {
“type”: “long”,
“connect.version”: 1,
“connect.name”: “org.apache.kafka.connect.data.Timestamp”,
“logicalType”: “timestamp-millis”
}
},
{
“name”: “updated_at”,
“type”: {
“type”: “long”,
“connect.version”: 1,
“connect.name”: “org.apache.kafka.connect.data.Timestamp”,
“logicalType”: “timestamp-millis”
}
},
{
“name”: “first_name”,
“type”: “string”
},
{
“name”: “last_name”,
“type”: “string”
},
{
“name”: “birth_date”,
“type”: {
“type”: “long”,
“connect.version”: 1,
“connect.name”: “org.apache.kafka.connect.data.Timestamp”,
“logicalType”: “timestamp-millis”
}
}
]
}
Topic output:
ksql> print 'mysql.admindb.patients' from BEGINNING; Key format: ¯\_(ツ)_/¯ - no data processed Value format: AVRO rowtime: 2021/04/13 15:16:57.367 Z, key: <null>, value: {"id": 1, "birth_date": null, "created_at": 1618327013631, "first_name": "Pietro", "last_name": "Ghezzi", "updated_at": 1618327013631}
What am I doing wrong?