"Issue processing large amounts of data with timestamp+incrementing mode in Kafka Connect and JDBC"

I am using Kafka Connect with JDBC (Confluent JDBC 10.5) to test the source connector for reading data from a view of an Informix database. The view is designed to return an incremental field “id” and a last modification field “last_date”. My source database is Informix and the destination is PostgreSQL.

Initially, I thought the process was working correctly as inserts and modifications were happening quickly, but when the view increased the amount of data to be queried (23 million records), the modifications were no longer instantaneous.

During testing, I realized that Kafka Connect was actually rereading the entire view instead of starting at the last processed data (id, last_date). To test, I limited the view to 100 records and changes were detected quickly. However, when I reconfigured the view with 23 million records, it takes hours to process the changes. Upon further review, I see that the current values are being displayed in this way:

rowtime: 2023/01/04 15:27:16.361 Z, key: ["source_novedades",{"query":"query"}], value: {"timestamp_nanos":0,"incrementing":5001,"timestamp":1672835217000}, partition: 2

I do not understand what “timestamp_nano” means, but it is always 0. However, I see a single date field (“timestamp”) and it is the last processed. Also, the record query is being displayed like this:

SELECT * FROM my_vw WHERE last_date < ? AND (( last_date = ? AND id > ?) OR last_date > ?) ORDER BY last_date, id ASC

Which does not make much sense as I am always asking for the same date, both if it is less than and greater than at the same time.

last_date < ? and also AND (.... OR last_date > ?)

Is the timestamp “1672860963000” being replaced in all date fields (?)? If so, it would clearly not work. Do I need to do something to make “timestamp_nanos” no longer 0 and take the last processed date?

This is my source connector:

  {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:informix-sqli://ip:port/sis:informixserver=mibase",
        "connection.user":"informix", 
        "connection.password":"pass",
        "query": "SELECT * FROM my_vw",
        "topic.prefix": "novedades",
        "poll.interval.ms": "10000",
        "mode":"timestamp+incrementing",
        "schema.pattern": "informix",
        "timestamp.column.name": "last_date",
        "incrementing.column.name": "id",
        "validate.non.null": false,

        "numeric.mapping":"best_fit",
        "transforms": "copyFieldToKey,extractKeyFromStruct,removeKeyFromValue",
        "transforms.copyFieldToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.copyFieldToKey.fields": "id",
        "transforms.extractKeyFromStruct.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractKeyFromStruct.field": "id",
        "transforms.removeKeyFromValue.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.removeKeyFromValue.blacklist": "id",
        "key.converter" : "org.apache.kafka.connect.converters.LongConverter",

        "batch.size": 200000,
        "linger.ms": 60000,
        "compression.type":"lz4",
        "acks": 1,
        "offset.flush.timeout.ms": 300000

}

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.