Problem with TimestampConverter from Confluent

I’m new with Kafk and I’m struggling dealing with timestamp field within my JDBC sink connector.

Making long things short, I’m using Docker to test a source PostgresSQL connector to copy the data of one table and then using a JDBC sink Connector to insert this data in another Postgres database in a table that already exists and have different column names. I think I did everything right with the rest of the configuration, but, for some reason, the TimestampConverter is not working, so it’s passing a bigint on the query.

My sink connector:

confluentinc/kafka-connect-jdbc:10.7.4

name = collaborator-sink-postgres-connector
connector.class = io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max = 1
topics = collab-service.public.parametros
connection.url = host
connection.user = user
connection.password = password
database = collaborator_suite
auto.create = false
table.options = CREATE_IF_NOT_EXISTS    
transforms = timestampConverter,extractValue,replaceField,dropKey
transforms.timestampConverter.type = org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.dropKey.type = io.confluent.connect.transforms.Drop$Key
transforms.extractValue.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.replaceField.type = org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.dropKey.schema.behavior=retain

transforms.timestampConverter.field = para_dt_cadastro
transforms.timestampConverter.format=yyyy-MM-dd HH:mm:ss.SSS
transforms.timestampConverter.unix.precision= microseconds
transforms.timestampConverter.target.type = Timestamp

transforms.replaceField.renames = para_cd_id:pasi_cd_id, para_tx_dominio:pasi_tx_dominio, para_tx_descricao:pasi_tx_descricao, para_tx_valor:pasi_tx_valor, para_tx_tipo:pasi_tx_tipo, para_dt_ult_alt:pasi_dt_ult_alt, para_dt_cadastro:pasi_dt_cadastro, usua_cd_id_cadastro:usua_cd_id_cadastro, usua_cd_id_ult_alt:usua_cd_id_ult_alt, para_tx_sistema:pasi_tx_sistema
db.timezone = UTC
pk.fields = pasi_cd_id
table.name.format = public.parametro_sistema
transforms.extractValue.field=after
insert.field = pasi_cd_id, pasi_tx_dominio, pasi_tx_descricao, pasi_tx_valor, pasi_tx_tipo, pasi_dt_ult_alt, pasi_dt_cadastro, usua_cd_id_cadastro, usua_cd_id_ult_alt, pasi_tx_sistema

The message error on my Kafka Connect Docker console:

I’ve highlighted the timestamps fields on my insert.

Caused by: java.sql.SQLException: Exception chain: 2024-01-04 14:11:50 java.sql.BatchUpdateException: Batch entry 0 INSERT INTO “public”.“parametro_sistema” (“pasi_cd_id”,“pasi_tx_dominio”,“pasi_tx_descricao”,“pasi_tx_valor”,“pasi_tx_tipo”,“pasi_dt_cadastro”,“pasi_dt_ult_alt”,“usua_cd_id_cadastro”,“usua_cd_id_ult_alt”,“pasi_tx_sistema”) VALUES (1,‘SISTEMA_CLOUD_BUCKETNAME’,‘Variável para referenciar o nome do bucket na nuvem do Google’,‘r2d2-neki’,‘R2D2’,1669047144317563,1669047144317563,NULL,NULL,‘SISTEMA’) was aborted: ERROR: column “pasi_dt_cadastro” is of type timestamp without time zone but expression is of type bigint 2024-01-04 14:11:50 Hint: You will need to rewrite or cast the expression.

The part of the log when the sink connector try to convert the timestamp field:

transforms.timestampConverter.field = para_dt_cadastro
2024-01-04 14:11:36     transforms.timestampConverter.format = yyyy-MM-dd HH:mm:ss.SSS
2024-01-04 14:11:36     transforms.timestampConverter.negate = false
2024-01-04 14:11:36     transforms.timestampConverter.predicate = 
2024-01-04 14:11:36     transforms.timestampConverter.target.type = Timestamp
2024-01-04 14:11:36     transforms.timestampConverter.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
2024-01-04 14:11:36     value.converter = null

My timestamp field inside my message:

 "after": {
                    "para_cd_id": 2,
                    "para_tx_dominio": "SISTEMA_CLOUD_PROJECTID",
                    "para_tx_descricao": "Variável para referenciar o id do projeto na nuvem do Google",
                    "para_tx_valor": "collaborator-364516",
                    "para_tx_tipo": "SISTEMA",
                    "para_dt_cadastro": 1669047144317563,
                    "para_dt_ult_alt": 1669047144317563,
                    "usua_cd_id_cadastro": null,
                    "usua_cd_id_ult_alt": null,
                    "para_tx_sistema": "R2D2"
                },

I accept any kind of help with the timestamp problem and feel free to give me heads up on the rest of the configuration.

What does the source connector config look like? What does a sample event in the collab-service.public.parametros topic look like?

Hello @dtroiano, thanks for answering!

My source connector:

debezium/debezium-connector-postgresql:2.2.1

name = collaborator-postgres-connector
connector.class = io.debezium.connector.postgresql.PostgresConnector
tasks.max = 1
topic.prefix = collab-service
database.hostname = host
database.server.name = user
database.port = 5432
database.user = postgres
database.password = password
database.dbname = sflm_dev
plugin.name = pgoutput
slot.name = collab_migracao
decimal.handling.mode = double
snapshot.mode = always
schema.name.adjustment.mode = none
table.include.list = public.parametros
database.history.kafka.topic = postgres_history
database.history.kafka.bootstrap.servers = kafka:9092
message.prefix.include.list = after

Sample of one message inside my topic, I’m sending the whole stuff, but in reality I’m just using the “after” object:

[
    {
        "topic": "collab-service.public.parametros",
        "partition": 0,
        "offset": 1,
        "timestamp": 1705613301602,
        "timestampType": "CREATE_TIME",
        "headers": [],
        "key": "Struct{para_cd_id=2}",
        "value": {
            "schema": {
                "type": "struct",
                "fields": [
                    {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "int32",
                                "optional": false,
                                "default": 0,
                                "field": "para_cd_id"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_dominio"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_descricao"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_valor"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_tipo"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "name": "io.debezium.time.MicroTimestamp",
                                "version": 1,
                                "field": "para_dt_cadastro"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "name": "io.debezium.time.MicroTimestamp",
                                "version": 1,
                                "field": "para_dt_ult_alt"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "usua_cd_id_cadastro"
                            },
                            {
                                "type": "double",
                                "optional": true,
                                "field": "usua_cd_id_ult_alt"
                            },
                            {
                                "type": "string",
                                "optional": true,
                                "field": "para_tx_sistema"
                            }
                        ],
                        "optional": true,
                        "name": "collab-service.public.parametros.Value",
                        "field": "before"
                    },
                    {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "int32",
                                "optional": false,
                                "default": 0,
                                "field": "para_cd_id"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_dominio"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_descricao"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_valor"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "para_tx_tipo"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "name": "io.debezium.time.MicroTimestamp",
                                "version": 1,
                                "field": "para_dt_cadastro"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "name": "io.debezium.time.MicroTimestamp",
                                "version": 1,
                                "field": "para_dt_ult_alt"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "usua_cd_id_cadastro"
                            },
                            {
                                "type": "double",
                                "optional": true,
                                "field": "usua_cd_id_ult_alt"
                            },
                            {
                                "type": "string",
                                "optional": true,
                                "field": "para_tx_sistema"
                            }
                        ],
                        "optional": true,
                        "name": "collab-service.public.parametros.Value",
                        "field": "after"
                    },
                    {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "string",
                                "optional": false,
                                "field": "version"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "connector"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "name"
                            },
                            {
                                "type": "int64",
                                "optional": false,
                                "field": "ts_ms"
                            },
                            {
                                "type": "string",
                                "optional": true,
                                "name": "io.debezium.data.Enum",
                                "version": 1,
                                "parameters": {
                                    "allowed": "true,last,false,incremental"
                                },
                                "default": "false",
                                "field": "snapshot"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "db"
                            },
                            {
                                "type": "string",
                                "optional": true,
                                "field": "sequence"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "schema"
                            },
                            {
                                "type": "string",
                                "optional": false,
                                "field": "table"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "txId"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "lsn"
                            },
                            {
                                "type": "int64",
                                "optional": true,
                                "field": "xmin"
                            }
                        ],
                        "optional": false,
                        "name": "io.debezium.connector.postgresql.Source",
                        "field": "source"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "op"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ms"
                    },
                    {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "string",
                                "optional": false,
                                "field": "id"
                            },
                            {
                                "type": "int64",
                                "optional": false,
                                "field": "total_order"
                            },
                            {
                                "type": "int64",
                                "optional": false,
                                "field": "data_collection_order"
                            }
                        ],
                        "optional": true,
                        "name": "event.block",
                        "version": 1,
                        "field": "transaction"
                    }
                ],
                "optional": false,
                "name": "collab-service.public.parametros.Envelope",
                "version": 1
            },
            "payload": {
                "before": null,
                "after": {
                    "para_cd_id": 2,
                    "para_tx_dominio": "SISTEMA_CLOUD_PROJECTID",
                    "para_tx_descricao": "Variável para referenciar o id do projeto na nuvem do Google",
                    "para_tx_valor": "collaborator-364516",
                    "para_tx_tipo": "SISTEMA",
                    "para_dt_cadastro": 1669047531542921,
                    "para_dt_ult_alt": 1669047531542921,
                    "usua_cd_id_cadastro": null,
                    "usua_cd_id_ult_alt": null,
                    "para_tx_sistema": "R2D2"
                },
                "source": {
                    "version": "2.2.1.Final",
                    "connector": "postgresql",
                    "name": "collab-service",
                    "ts_ms": 1705613298943,
                    "snapshot": "last",
                    "db": "sflm_dev",
                    "sequence": "[null,\"17629431464080\"]",
                    "schema": "public",
                    "table": "parametros",
                    "txId": 1053092,
                    "lsn": 17629431464080,
                    "xmin": null
                },
                "op": "r",
                "ts_ms": 1705613301328,
                "transaction": null
            }
        }
    }
]

I am doubting that the TimestampConverter is getting applied given that the timestamp is nested. I don’t see any error / warning logging here if a field isn’t found. Looks like it is basically a no-op and the field remains a bigint. I would recommend sanity checking this understanding by changing the transforms.timestampConverter.field to foo and rerunning. Does it quietly give the same result?

To get this working I’d recommend rearranging the SMTs so that the value is flattened down to the after object before the timestamp is converted. Nested fields aren’t supported by the SMT (see KIP-821 which is still open and proposes dot notation) so you need to flatten it first.

One aside on the converter config is that, since a bigint is expected in Kafka from the Debezium connector (it’s a io.debezium.time.MicroTimestamp), I don’t think you need to specify transforms.timestampConverter.format. The format only applies to string inputs / outputs, but you’re going from a Unix epoch to Timestamp. Not sure if its presence does any harm but you shouldn’t need it.

Yes, I’m getting the same result replacing the field for foo:

transforms.replaceField.renames = [para_cd_id:pasi_cd_id, para_tx_dominio:pasi_tx_dominio, para_tx_descricao:pasi_tx_descricao, para_tx_valor:pasi_tx_valor, para_tx_tipo:pasi_tx_tipo, para_dt_ult_alt:pasi_dt_ult_alt, para_dt_cadastro:pasi_dt_cadastro, usua_cd_id_cadastro:usua_cd_id_cadastro, usua_cd_id_ult_alt:usua_cd_id_ult_alt, para_tx_sistema:pasi_tx_sistema]
2024-01-19 15:53:55     transforms.replaceField.type = class org.apache.kafka.connect.transforms.ReplaceField$Value
2024-01-19 15:53:55     transforms.replaceField.whitelist = []
2024-01-19 15:53:55     transforms.timestampConverter.field = foo
2024-01-19 15:53:55     transforms.timestampConverter.format = 
2024-01-19 15:53:55     transforms.timestampConverter.negate = false
2024-01-19 15:53:55     transforms.timestampConverter.predicate = 
2024-01-19 15:53:55     transforms.timestampConverter.target.type = string
2024-01-19 15:53:55     transforms.timestampConverter.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
2024-01-19 15:53:55     value.converter = null

Sorry for asking that, but how do I flatten this filed before send it to the converter?

Yes, I know, I’ve tried with the format atribute out of pure desperation…

I’ve tried with the org.apache.kafka.connect.transforms.Flatten$Value transformer, but it didn’t work as well

The relevant part of my sink connector:

transforms = timestampConverter,extractValue,replaceField,dropKey,flatten

transforms.timestampConverter.type = org.apache.kafka.connect.transforms.TimestampConverter$Value

transforms.dropKey.type = io.confluent.connect.transforms.Drop$Key

transforms.extractValue.type=org.apache.kafka.connect.transforms.ExtractField$Value

transforms.replaceField.type = org.apache.kafka.connect.transforms.ReplaceField$Value

transforms.flatten.type =org.apache.kafka.connect.transforms.Flatten$Value

transforms.flatten.delimiter = _

transforms.dropKey.schema.behavior=retain

transforms.timestampConverter.field = pasi_dt_cadastro

transforms.timestampConverter.unix.precision= microseconds

transforms.timestampConverter.target.type = Timestamp

transforms.replaceField.renames = para_cd_id:pasi_cd_id, para_tx_dominio:pasi_tx_dominio, para_tx_descricao:pasi_tx_descricao, para_tx_valor:pasi_tx_valor, para_tx_tipo:pasi_tx_tipo, para_dt_ult_alt:pasi_dt_ult_alt, para_dt_cadastro:pasi_dt_cadastro, usua_cd_id_cadastro:usua_cd_id_cadastro, usua_cd_id_ult_alt:usua_cd_id_ult_alt, para_tx_sistema:pasi_tx_sistema

db.timezone = UTC

pk.fields = pasi_cd_id

table.name.format = c0.parametro_sistema

transforms.extractValue.field=after

insert.field = pasi_cd_id, pasi_tx_dominio, pasi_tx_descricao, pasi_tx_valor, pasi_tx_tipo, pasi_dt_ult_alt, pasi_dt_cadastro, usua_cd_id_cadastro, usua_cd_id_ult_alt, pasi_tx_sistema

Since you are already extracting the after field, that should give you a flattened object to work with. i.e., I think you can just rearrange the SMTs:

transforms = extractValue,timestampConverter,replaceField,dropKey

Just tried rearranging this way:
transforms = extractValue,timestampConverter,replaceField,dropKey

Unfortunately, I’ve got the same result:

2024-01-19 16:15:56 [2024-01-19 19:15:56,182] INFO [collaborator-sink-postgres-connector|task-0] Initializing: org.apache.kafka.connect.runtime.TransformationChain{org.apache.kafka.connect.transforms.ExtractField$Value, org.apache.kafka.connect.transforms.TimestampConverter$Value, org.apache.kafka.connect.transforms.ReplaceField$Value, io.confluent.connect.transforms.Drop$Key} (org.apache.kafka.connect.runtime.Worker:632)
2024-01-19 16:15:56 [2024-01-19 19:15:56,182] INFO [collaborator-sink-postgres-connector|task-0] SinkConnectorConfig values: 
2024-01-19 16:15:56     config.action.reload = restart
2024-01-19 16:15:56     connector.class = io.confluent.connect.jdbc.JdbcSinkConnector
2024-01-19 16:15:56     errors.deadletterqueue.context.headers.enable = false
2024-01-19 16:15:56     errors.deadletterqueue.topic.name = 
2024-01-19 16:15:56     errors.deadletterqueue.topic.replication.factor = 3
2024-01-19 16:15:56     errors.log.enable = false
2024-01-19 16:15:56     errors.log.include.messages = false
2024-01-19 16:15:56     errors.retry.delay.max.ms = 60000
2024-01-19 16:15:56     errors.retry.timeout = 0
2024-01-19 16:15:56     errors.tolerance = none
2024-01-19 16:15:56     header.converter = null
2024-01-19 16:15:56     key.converter = null
2024-01-19 16:15:56     name = collaborator-sink-postgres-connector
2024-01-19 16:15:56     predicates = []
2024-01-19 16:15:56     tasks.max = 1
2024-01-19 16:15:56     topics = [collab-service.public.parametros]
2024-01-19 16:15:56     topics.regex = 
2024-01-19 16:15:56     transforms = [extractValue, timestampConverter, replaceField, dropKey]
2024-01-19 16:15:56     value.converter = null
2024-01-19 16:15:56  (org.apache.kafka.connect.runtime.SinkConnectorConfig:354)
2024-01-19 16:15:56 [2024-01-19 19:15:56,186] INFO [collaborator-sink-postgres-connector|task-0] EnrichedConnectorConfig values: 
2024-01-19 16:15:56     config.action.reload = restart
2024-01-19 16:15:56     connector.class = io.confluent.connect.jdbc.JdbcSinkConnector
2024-01-19 16:15:56     errors.deadletterqueue.context.headers.enable = false
2024-01-19 16:15:56     errors.deadletterqueue.topic.name = 
2024-01-19 16:15:56     errors.deadletterqueue.topic.replication.factor = 3
2024-01-19 16:15:56     errors.log.enable = false
2024-01-19 16:15:56     errors.log.include.messages = false
2024-01-19 16:15:56     errors.retry.delay.max.ms = 60000
2024-01-19 16:15:56     errors.retry.timeout = 0
2024-01-19 16:15:56     errors.tolerance = none
2024-01-19 16:15:56     header.converter = null
2024-01-19 16:15:56     key.converter = null
2024-01-19 16:15:56     name = collaborator-sink-postgres-connector
2024-01-19 16:15:56     predicates = []
2024-01-19 16:15:56     tasks.max = 1
2024-01-19 16:15:56     topics = [collab-service.public.parametros]
2024-01-19 16:15:56     topics.regex = 
2024-01-19 16:15:56     transforms = [extractValue, timestampConverter, replaceField, dropKey]
2024-01-19 16:15:56     transforms.dropKey.negate = false
2024-01-19 16:15:56     transforms.dropKey.predicate = 
2024-01-19 16:15:56     transforms.dropKey.schema.behavior = retain
2024-01-19 16:15:56     transforms.dropKey.type = class io.confluent.connect.transforms.Drop$Key
2024-01-19 16:15:56     transforms.extractValue.field = after
2024-01-19 16:15:56     transforms.extractValue.negate = false
2024-01-19 16:15:56     transforms.extractValue.predicate = 
2024-01-19 16:15:56     transforms.extractValue.type = class org.apache.kafka.connect.transforms.ExtractField$Value
2024-01-19 16:15:56     transforms.replaceField.blacklist = []
2024-01-19 16:15:56     transforms.replaceField.negate = false
2024-01-19 16:15:56     transforms.replaceField.predicate = 
2024-01-19 16:15:56     transforms.replaceField.renames = [para_cd_id:pasi_cd_id, para_tx_dominio:pasi_tx_dominio, para_tx_descricao:pasi_tx_descricao, para_tx_valor:pasi_tx_valor, para_tx_tipo:pasi_tx_tipo, para_dt_ult_alt:pasi_dt_ult_alt, para_dt_cadastro:pasi_dt_cadastro, usua_cd_id_cadastro:usua_cd_id_cadastro, usua_cd_id_ult_alt:usua_cd_id_ult_alt, para_tx_sistema:pasi_tx_sistema]
2024-01-19 16:15:56     transforms.replaceField.type = class org.apache.kafka.connect.transforms.ReplaceField$Value
2024-01-19 16:15:56     transforms.replaceField.whitelist = []
2024-01-19 16:15:56     transforms.timestampConverter.field = pasi_dt_cadastro
2024-01-19 16:15:56     transforms.timestampConverter.format = 
2024-01-19 16:15:56     transforms.timestampConverter.negate = false
2024-01-19 16:15:56     transforms.timestampConverter.predicate = 
2024-01-19 16:15:56     transforms.timestampConverter.target.type = Timestamp
2024-01-19 16:15:56     transforms.timestampConverter.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
2024-01-19 16:15:56     value.converter = null

Exception:

2024-01-19 16:16:02 java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "c0"."parametro_sistema" ("pasi_cd_id","pasi_tx_dominio","pasi_tx_descricao","pasi_tx_valor","pasi_tx_tipo","pasi_dt_cadastro","pasi_dt_ult_alt","usua_cd_id_cadastro","usua_cd_id_ult_alt","pasi_tx_sistema") VALUES (1,'SISTEMA_CLOUD_BUCKETNAME','Variável para referenciar o nome do bucket na nuvem do Google','r2d2-neki','R2D2',1669047144317563,1669047144317563,NULL,NULL,'SISTEMA') was aborted: ERROR: column "pasi_dt_cadastro" is of type timestamp with time zone but expression is of type bigint
2024-01-19 16:16:02   Hint: You will need to rewrite or cast the expression.

One thing that jumps out at me: this should be para_dt_cadastro since the timestamp converter runs before the field gets renamed. That, or move timestampConverter one more down the chain to happen after replaceField

Thanks for the heads up!

I’ve managed to make it work using your tip, but now I can’t do the same with my other date field “pasi_dt_ult_alt”. I’ve tried to replicate like this, but only the field pasi_dt_ult_alt was converted :

transforms.timestampConverter.field = pasi_dt_cadastro
transforms.timestampConverter.unix.precision= microseconds
transforms.timestampConverter.target.type = Timestamp

transforms.timestampConverter.field = pasi_dt_ult_alt
transforms.timestampConverter.unix.precision= microseconds
transforms.timestampConverter.target.type = Timestamp

and like this:

transforms.timestampConverter.field = pasi_dt_cadastro, pasi_dt_ult_alt
transforms.timestampConverter.unix.precision= microseconds
transforms.timestampConverter.target.type = Timestamp

But neither of them worked for me. Do you what should I do here?

My log:

[2024-01-19 19:39:21,455] WARN [collaborator-sink-postgres-connector|task-0] Write of 4 records failed, remainingRetries=10 (io.confluent.connect.jdbc.sink.JdbcSinkTask:101)
2024-01-19 16:39:21 java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "c0"."parametro_sistema" ("pasi_cd_id","pasi_tx_dominio","pasi_tx_descricao","pasi_tx_valor","pasi_tx_tipo","pasi_dt_cadastro","pasi_dt_ult_alt","usua_cd_id_cadastro","usua_cd_id_ult_alt","pasi_tx_sistema") VALUES (1,'SISTEMA_CLOUD_BUCKETNAME','Variável para referenciar o nome do bucket na nuvem do Google','r2d2-neki','R2D2','54859-12-31 06:45:17.563+00',1669047144317563,NULL,NULL,'SISTEMA') was aborted: ERROR: column "pasi_dt_ult_alt" is of type timestamp with time zone but expression is of type bigint

As you can see, the field pasi_dt_cadastro was converted, although I haven’t saw ir on the log:

transforms = [extractValue, replaceField, timestampConverter, dropKey]
2024-01-19 16:39:15     transforms.dropKey.negate = false
2024-01-19 16:39:15     transforms.dropKey.predicate = 
2024-01-19 16:39:15     transforms.dropKey.schema.behavior = retain
2024-01-19 16:39:15     transforms.dropKey.type = class io.confluent.connect.transforms.Drop$Key
2024-01-19 16:39:15     transforms.extractValue.field = after
2024-01-19 16:39:15     transforms.extractValue.negate = false
2024-01-19 16:39:15     transforms.extractValue.predicate = 
2024-01-19 16:39:15     transforms.extractValue.type = class org.apache.kafka.connect.transforms.ExtractField$Value
2024-01-19 16:39:15     transforms.replaceField.blacklist = []
2024-01-19 16:39:15     transforms.replaceField.negate = false
2024-01-19 16:39:15     transforms.replaceField.predicate = 
2024-01-19 16:39:15     transforms.replaceField.renames = [para_cd_id:pasi_cd_id, para_tx_dominio:pasi_tx_dominio, para_tx_descricao:pasi_tx_descricao, para_tx_valor:pasi_tx_valor, para_tx_tipo:pasi_tx_tipo, para_dt_ult_alt:pasi_dt_ult_alt, para_dt_cadastro:pasi_dt_cadastro, usua_cd_id_cadastro:usua_cd_id_cadastro, usua_cd_id_ult_alt:usua_cd_id_ult_alt, para_tx_sistema:pasi_tx_sistema]
2024-01-19 16:39:15     transforms.replaceField.type = class org.apache.kafka.connect.transforms.ReplaceField$Value
2024-01-19 16:39:15     transforms.replaceField.whitelist = []
2024-01-19 16:39:15     transforms.timestampConverter.field = pasi_dt_cadastro
2024-01-19 16:39:15     transforms.timestampConverter.format = 
2024-01-19 16:39:15     transforms.timestampConverter.negate = false
2024-01-19 16:39:15     transforms.timestampConverter.predicate = 
2024-01-19 16:39:15     transforms.timestampConverter.target.type = Timestamp
2024-01-19 16:39:15     transforms.timestampConverter.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
2024-01-19 16:39:15     value.converter = null
2024-01-19 16:39:15  (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:354)

I’ve just solved it.
I needed to declare tranformer twice (it probably was documented and I did not see it before), so my final configuration ended something like that:

transforms.timestampConverter.type = org.apache.kafka.connect.transforms.TimestampConverter$Value

transforms.timestampConverter.field = para_dt_cadastro
transforms.timestampConverter.target.type = Timestamp

transforms.timestampConverter2.type = org.apache.kafka.connect.transforms.TimestampConverter$Value

transforms.timestampConverter2.field = para_dt_ult_alt
transforms.timestampConverter2.target.type = Timestamp

Thanks again @dtroiano for your kindness and help!

Glad you got it working! Using two different aliases like this is the right way to do it.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.