I’m new with Kafk and I’m struggling dealing with timestamp field within my JDBC sink connector.
Making long things short, I’m using Docker to test a source PostgresSQL connector to copy the data of one table and then using a JDBC sink Connector to insert this data in another Postgres database in a table that already exists and have different column names. I think I did everything right with the rest of the configuration, but, for some reason, the TimestampConverter is not working, so it’s passing a bigint on the query.
My sink connector:
confluentinc/kafka-connect-jdbc:10.7.4
name = collaborator-sink-postgres-connector
connector.class = io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max = 1
topics = collab-service.public.parametros
connection.url = host
connection.user = user
connection.password = password
database = collaborator_suite
auto.create = false
table.options = CREATE_IF_NOT_EXISTS
transforms = timestampConverter,extractValue,replaceField,dropKey
transforms.timestampConverter.type = org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.dropKey.type = io.confluent.connect.transforms.Drop$Key
transforms.extractValue.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.replaceField.type = org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.dropKey.schema.behavior=retain
transforms.timestampConverter.field = para_dt_cadastro
transforms.timestampConverter.format=yyyy-MM-dd HH:mm:ss.SSS
transforms.timestampConverter.unix.precision= microseconds
transforms.timestampConverter.target.type = Timestamp
transforms.replaceField.renames = para_cd_id:pasi_cd_id, para_tx_dominio:pasi_tx_dominio, para_tx_descricao:pasi_tx_descricao, para_tx_valor:pasi_tx_valor, para_tx_tipo:pasi_tx_tipo, para_dt_ult_alt:pasi_dt_ult_alt, para_dt_cadastro:pasi_dt_cadastro, usua_cd_id_cadastro:usua_cd_id_cadastro, usua_cd_id_ult_alt:usua_cd_id_ult_alt, para_tx_sistema:pasi_tx_sistema
db.timezone = UTC
pk.fields = pasi_cd_id
table.name.format = public.parametro_sistema
transforms.extractValue.field=after
insert.field = pasi_cd_id, pasi_tx_dominio, pasi_tx_descricao, pasi_tx_valor, pasi_tx_tipo, pasi_dt_ult_alt, pasi_dt_cadastro, usua_cd_id_cadastro, usua_cd_id_ult_alt, pasi_tx_sistema
The message error on my Kafka Connect Docker console:
I’ve highlighted the timestamps fields on my insert.
Caused by: java.sql.SQLException: Exception chain: 2024-01-04 14:11:50 java.sql.BatchUpdateException: Batch entry 0 INSERT INTO “public”.“parametro_sistema” (“pasi_cd_id”,“pasi_tx_dominio”,“pasi_tx_descricao”,“pasi_tx_valor”,“pasi_tx_tipo”,“pasi_dt_cadastro”,“pasi_dt_ult_alt”,“usua_cd_id_cadastro”,“usua_cd_id_ult_alt”,“pasi_tx_sistema”) VALUES (1,‘SISTEMA_CLOUD_BUCKETNAME’,‘Variável para referenciar o nome do bucket na nuvem do Google’,‘r2d2-neki’,‘R2D2’,1669047144317563,1669047144317563,NULL,NULL,‘SISTEMA’) was aborted: ERROR: column “pasi_dt_cadastro” is of type timestamp without time zone but expression is of type bigint 2024-01-04 14:11:50 Hint: You will need to rewrite or cast the expression.
The part of the log when the sink connector try to convert the timestamp field:
transforms.timestampConverter.field = para_dt_cadastro
2024-01-04 14:11:36 transforms.timestampConverter.format = yyyy-MM-dd HH:mm:ss.SSS
2024-01-04 14:11:36 transforms.timestampConverter.negate = false
2024-01-04 14:11:36 transforms.timestampConverter.predicate =
2024-01-04 14:11:36 transforms.timestampConverter.target.type = Timestamp
2024-01-04 14:11:36 transforms.timestampConverter.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
2024-01-04 14:11:36 value.converter = null
My timestamp field inside my message:
"after": {
"para_cd_id": 2,
"para_tx_dominio": "SISTEMA_CLOUD_PROJECTID",
"para_tx_descricao": "Variável para referenciar o id do projeto na nuvem do Google",
"para_tx_valor": "collaborator-364516",
"para_tx_tipo": "SISTEMA",
"para_dt_cadastro": 1669047144317563,
"para_dt_ult_alt": 1669047144317563,
"usua_cd_id_cadastro": null,
"usua_cd_id_ult_alt": null,
"para_tx_sistema": "R2D2"
},
I accept any kind of help with the timestamp problem and feel free to give me heads up on the rest of the configuration.