Sink connector not able to post records into postgresDB

Hi,

Can any one please advice on this error.

ERROR: column “EMPNO” of relation “emp” does not exist

oracle table

Name Null? Type


EMPNO NOT NULL NUMBER(5)
ENAME VARCHAR2(15)
JOININGDATE TIMESTAMP(6)
RELEASEDATE TIMESTAMP(6)

Postgres DB Table

create table EMP(
EMPNO numeric(5) primary key,
ENAME varchar(15) NULL,
JOININGDATE TIMESTAMP NULL,
RELEASEDATE TIMESTAMP NULL);

I am sending data from oracle to postgres, source connector working fine, in sink connector getting issues.

java.sql.BatchUpdateException: Batch entry 0 INSERT INTO “emp” (“EMPNO”,“ENAME”,“JOININGDATE”,“RELEASEDATE”) VALUES (56,‘Anvit’,‘2021-09-17 05:30:00+05:30’,‘2024-01-21 05:30:00+05:30’) ON CONFLICT (“EMPNO”) DO UPDATE SET “ENAME”=EXCLUDED.“ENAME”,“JOININGDATE”=EXCLUDED.“JOININGDATE”,“RELEASEDATE”=EXCLUDED.“RELEASEDATE” was aborted: ERROR: column “EMPNO” of relation “emp” does not exist.

{
name=jdbc_sink_connector_oracle_01
config={
name= Kafka-sink-postgres1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
task.max=1
topics=opk

//JDBC Sink Connectors specific properties

connection.url=jdbc:postgresql://localhost:5432/Kafka_PostDB
connection.user=postgres
connection.password=postgres

insert.mode=upsert

table.types=TABLE
table.name.format=emp

auto.create= false
auto.evolve= true

pk.mode=record_value
pk.fields=EMPNO

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
dialect.name=PostgreSqlDatabaseDialect
db.timezone=Asia/Kolkata
}
}

Possibly a case sensitivity issue? (EMP in the create table, "emp" in the error). What does \d emp output at the psql prompt?

Hi dtroiano,

I cant find EMP table in windows command prompt, where as I can see in postgres GUI mode.

image

C:\Program Files\PostgreSQL\16\bin>psql -U postgres
Password for user postgres:
psql (16.1)
WARNING: Console code page (437) differs from Windows code page (1252)
8-bit characters might not work correctly. See psql reference
page “Notes for Windows users” for details.


please check GUI mode tables under pgAdmin.

but, I can see total 7 tables in pgadmin.

small correction, I have created 2 DB2. under one 2 tables I have created, under one DB, I have created 5 tables.

I am sending my emp table.

Caused by: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO “emp” (“EMPNO”,“ENAME”,“JOININGDATE”,“RELEASEDATE”) VALUES (56,‘Anvit’,‘2021-09-17 05:30:00+05:30’,‘2024-01-21 05:30:00+05:30’) ON CONFLICT (“EMPNO”) DO UPDATE SET “ENAME”=EXCLUDED.“ENAME”,“JOININGDATE”=EXCLUDED.“JOININGDATE”,“RELEASEDATE”=EXCLUDED.“RELEASEDATE” was aborted: ERROR: column “EMPNO” of relation “emp” does not exist .

values are coming from topic , but not able to insert into postgres.

This does look like case mismatch, but on column names rather than table name. Postgres has columns empno, ename, joiningdate, and releasedate while the connector is attempting all caps column names. As a quick test / proof point you could try recreating the postgres table with all caps field names. If that works, you could either go with that as the solution, or consider lowercasing upstream – e.g., if the all caps field names are coming from Oracle, use the ReplaceField SMT to lowercase (docs here).

Hi,

my data is coming from Oracle as below in topic.

{“schema”:{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“EMPNO”},{“type”:“string”,“optional”:true,“field”:“ENAME”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“JOININGDATE”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“RELEASEDATE”}],“optional”:false},“payload”:{“EMPNO”:11,“ENAME”:“Ramana”,“JOININGDATE”:1631836800000,“RELEASEDATE”:1705795200000}}

data is coming in payload as i kept below configurations.

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
dialect.name=PostgreSqlDatabaseDialect

I have changed in postgresql DB, with caps letters created table .
Here, I have observed , if I created data into small letters also, oracle is taking caps, and where as Postgres i have created in Caps, but it taking as small letters.

so, I have used replaced fields valued and converted as below.

transforms = RenameField
transforms.RenameField.type = org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames = EMPNO:empno,ENAME:ename,JOININGDATE:joiningdate,RELEASEDATE:releasedate

Still, my issue was not resolved, data was not pushing into Postgres. any clue.

I can see in logs, from source connector side, as below.

[2024-02-10 23:13:55,289] INFO [Kafka-sink-postgres |task-0] Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:176)
[2024-02-10 23:13:55,296] INFO [Kafka-sink-postgres |task-0] [Consumer clientId=connector-consumer-Kafka-sink-postgres -0, groupId=connect-Kafka-sink-postgres] Revoke previously assigned partitions hellokafka-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:343)
[2024-02-10 23:13:55,296] INFO [Kafka-sink-postgres |task-0] [Consumer clientId=connector-consumer-Kafka-sink-postgres -0, groupId=connect-Kafka-sink-postgres] Member connector-consumer-Kafka-sink-postgres -0-ace35691-da90-4fd5-92c2-f2ecaadcfa17 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1133)
[2024-02-10 23:13:55,298] INFO [Kafka-sink-postgres |task-0] [Consumer clientId=connector-consumer-Kafka-sink-postgres -0, groupId=connect-Kafka-sink-postgres] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1025)
[2024-02-10 23:13:55,298] INFO [Kafka-sink-postgres |task-0] [Consumer clientId=connector-consumer-Kafka-sink-postgres -0, groupId=connect-Kafka-sink-postgres] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1072)
[2024-02-10 23:13:55,794] INFO [Kafka-sink-postgres |task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:693)
[2024-02-10 23:13:55,795] INFO [Kafka-sink-postgres |task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:697)
[2024-02-10 23:13:55,796] INFO [Kafka-sink-postgres |task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:703)
[2024-02-10 23:13:55,798] INFO [Kafka-sink-postgres |task-0] App info kafka.consumer for connector-consumer-Kafka-sink-postgres -0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2024-02-10 23:22:55,081] INFO [oracle-kafka|task-0] [Producer clientId=connector-producer-oracle-kafka-0] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:977)

Hi, In source connector data is coming from Oracle fields, in sink conenctor, I have changed,
below fields from postgres. now its working fine.

table.types=TABLE
table.name.format=emp [ in postgres, these are small letter, case sensitive]
pk.mode=record_value
pk.fields=empno

Thanks a lot for your help and support.

Thanks, for sharing these insights mate as I found it very much useful and informative.