JDBC Source connector doesn't see any updates in a table

Hi, all.
I’m new in kafka connect. I am trying to use jdbc connector to sinc postgress databases. The problem is that there are composite primary key in that table and there one timestamp column. When I starting connector it work perfectly fine and write all rows into topic. But it isn’t see any updates/deletes/inserts into the table. Now i stand with that configuration:

{
“name”: “source_dev_e9d79303”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”,
“tasks.max”: “1”,
“connection.url”: “jdbc:postgresql://postgres:5432/source_dev”,
“connection.user”: “my_user”,
“connection.password”: “my_pass”,
“mode”: “timestamp”,
“table.whitelist”: “public.source”,
“timestamp.column.name”: “date_input”,
“transforms”: “createKey,formatDate”,
“transforms.createKey.type”: “org.apache.kafka.connect.transforms.ValueToKey”,
“transforms.createKey.fields”: “sort,date_begin”,
“transforms.formatDate.type”: “org.apache.kafka.connect.transforms.TimestampConverter$Value”,
“transforms.formatDate.field”: “date_begin”,
“transforms.formatDate.target.type”: “string”,
“transforms.formatDate.format”: “yyyy-MM-dd”
}
}

I would be very grateful for any help

hey @unxly

the timestamp column is a real timestamp column?
any other column which has an incrementing Id?

best,
michael

anything in the logs?

@mmuehlbeyer Ty for your time. Yeah, timestamp column the real one. There aren’t any incrementing columns, pk is composite. Logs totally fine.
Is there anything I can do about it?

It is timestamp without time zone field. Is that’s the problem?
:thinking: :thinking:

I have this struct of table:

CREATE TABLE source (
    sort SMALLINT NOT NULL,
    date_begin DATE NOT NULL,
    num NUMERIC(20,10),       
    date_input TIMESTAMP WITHOUT TIME ZONE,
    status INTEGER,                
    PRIMARY KEY (sort, date_begin) 
);

And that’s what i see in kafka messages:

{
	"schema": {
		"type": "struct",
		"fields": [
			{
				"type": "int16",
				"optional": false,
				"field": "sort"
			},
			{
				"type": "string",
				"optional": false,
				"field": "date_begin"
			},
			{
				"type": "bytes",
				"optional": true,
				"name": "org.apache.kafka.connect.data.Decimal",
				"version": 1,
				"parameters": {
					"scale": "10",
					"connect.decimal.precision": "20"
				},
				"field": "num"
			},
			{
				"type": "int64",
				"optional": false,
				"name": "org.apache.kafka.connect.data.Timestamp",
				"version": 1,
				"field": "date_input"
			},
			{
				"type": "int32",
				"optional": false,
				"field": "status"
			}
		],
		"optional": false,
		"name": "source"
	},
	"payload": {
		"sort": 4,
		"date_begin": "1991-11-11",
		"num": "BGzQ",
		"date_input": 1144140647000,
		"status": 1
	}
}

need to check by myself.
though it looks not that bad as the arrives in the cluster
as you see in the payload section.

It’s only arrives on initial download(

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.