JDBC Source connector doesn't see any updates in a table

Hi, all.
I’m new in kafka connect. I am trying to use jdbc connector to sinc postgress databases. The problem is that there are composite primary key in that table and there one timestamp column. When I starting connector it work perfectly fine and write all rows into topic. But it isn’t see any updates/deletes/inserts into the table. Now i stand with that configuration:

{
“name”: “source_dev_e9d79303”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”,
“tasks.max”: “1”,
“connection.url”: “jdbc:postgresql://postgres:5432/source_dev”,
“connection.user”: “my_user”,
“connection.password”: “my_pass”,
“mode”: “timestamp”,
“table.whitelist”: “public.source”,
“timestamp.column.name”: “date_input”,
“transforms”: “createKey,formatDate”,
“transforms.createKey.type”: “org.apache.kafka.connect.transforms.ValueToKey”,
“transforms.createKey.fields”: “sort,date_begin”,
“transforms.formatDate.type”: “org.apache.kafka.connect.transforms.TimestampConverter$Value”,
“transforms.formatDate.field”: “date_begin”,
“transforms.formatDate.target.type”: “string”,
“transforms.formatDate.format”: “yyyy-MM-dd”
}
}

I would be very grateful for any help

hey @unxly

the timestamp column is a real timestamp column?
any other column which has an incrementing Id?

best,
michael

anything in the logs?

@mmuehlbeyer Ty for your time. Yeah, timestamp column the real one. There aren’t any incrementing columns, pk is composite. Logs totally fine.
Is there anything I can do about it?

It is timestamp without time zone field. Is that’s the problem?
:thinking: :thinking:

I have this struct of table:

CREATE TABLE source (
    sort SMALLINT NOT NULL,
    date_begin DATE NOT NULL,
    num NUMERIC(20,10),       
    date_input TIMESTAMP WITHOUT TIME ZONE,
    status INTEGER,                
    PRIMARY KEY (sort, date_begin) 
);

And that’s what i see in kafka messages:

{
	"schema": {
		"type": "struct",
		"fields": [
			{
				"type": "int16",
				"optional": false,
				"field": "sort"
			},
			{
				"type": "string",
				"optional": false,
				"field": "date_begin"
			},
			{
				"type": "bytes",
				"optional": true,
				"name": "org.apache.kafka.connect.data.Decimal",
				"version": 1,
				"parameters": {
					"scale": "10",
					"connect.decimal.precision": "20"
				},
				"field": "num"
			},
			{
				"type": "int64",
				"optional": false,
				"name": "org.apache.kafka.connect.data.Timestamp",
				"version": 1,
				"field": "date_input"
			},
			{
				"type": "int32",
				"optional": false,
				"field": "status"
			}
		],
		"optional": false,
		"name": "source"
	},
	"payload": {
		"sort": 4,
		"date_begin": "1991-11-11",
		"num": "BGzQ",
		"date_input": 1144140647000,
		"status": 1
	}
}

need to check by myself.
though it looks not that bad as the arrives in the cluster
as you see in the payload section.

It’s only arrives on initial download(