Multiple Primary Keys in Kafka Connect

Hi My source connector is as follows:

curl -i -X POST http://localhost:8083/connectors  -H "Content-Type: application/json" 
-d '{     "name": "source_connector",     
"config": {         
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",  
       "tasks.max": "1",      
       "connection.url": "jdbc:postgresql://localhost:5432/postgres",      
       "connection.user": "postgres",
       "connection.password": "*****",    
       "mode": "timestamp+incrementing", 
       "": "entry_date",       
       "": "id,id_no",    
       "topic.prefix": "psql",         
       "table.whitelist": "table_name"  , 
       "key.converter": "",
       "value.converter":"org.apache.kafka.connect.json.JsonConverter" ,
       "value.converter.schema.registry.url" :"http://schema-registry:8081" ,
       "key.converter.schema.registry.url":"same as above",
       "auto.register.schemas": "true","validate.non.null": false,
       "delete.enabled": true } }'

I am getting following error :

 Failed to run query for table TimestampIncrementingTableQuerier{table="table_name", query='null', topicPrefix='psql', incrementingColumn='id,id_no', timestampColumns=[entry_date]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:337)
org.postgresql.util.PSQLException: ERROR: column,id_no does not exist

Please help me out.

Hi, from this it looks like you’re trying to supply a list of columns that increment. The JDBC source connector does not take in a list. It is taking a single column name. The error you’re getting is because the connector believes it looking for “id,id_no” as the column name.

Thanks for the reply @mitchell-h .Can you please tell me the solution what we need to write if we have composite primary keys in table. Because the above scenario mentioned is of composite primary keys.

Use log-based CDC - which since you’re reading from Postgres the Debezium connector will work perfectly for.

1 Like

I believe that the JDBC source connector does not support composite keys for this use. You’ll need to do like @rmoff suggests and use a log based CDC, or change the data model in the db.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.