Kafka JDBC Source Connector- UPDATE

Hi,

I am new to Kafka JDBC Connect, working on Sales_Order database. while working in insert through query its working fine. But I need to update some of records the same time , but this is not working.

I have taken Oracle DB for this.

for insertion, I have used below file.

{
name= my-jdbc-connector2
config= {
name=kafkasource1
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
name=oracle-kafka
connection.url=jdbc:oracle:thin:@localhost:1521/orcl
connection.user=kafka
connection.password=check
task.max=5
topic.prefix= kafkaop1
mode= timestamp+incrementing
query=SELECT APPLICATION_NUMBER,CUSTOMER_ID,PURCHASE_ORDER_NO,MATERIAL_NO,ORDER_TYPE,SUPPLIER,UNIT,SALESDATE,ADDRESS,MOBILENO,EMAIL_ID FROM SALESORDER
timestamp.column.name=SALESDATE
incrementing.column.name=APPLICATION_NUMBER
poll.interval.ms=1000
numeric.mapping=best_fit
errors.tolerance=all
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
dialect.name=OracleDatabaseDialect
}
}

for insertion , select statement is working, where as I needs to work on same properties file for insertion, update and deletion.

Please , let me know for update I needs to change any query.

can I write two queries in same properties file.

to workable, I need to update any config changes in properties file.

its not sending any data to topic…

So, help to understand, is kafka jdbc source connector will work for this??

Hi,

Please, help me to understand how can we achieve, update events through kafka source connector. I have stuck up with this issue.

I needs to do both insert and update options with single query.

where as Insertition is working fine, Need to check with update entry. I am not getting any update. So, please help here.

I am using only Kafka jdbc source connector.instead of this, am i need to user any other connectors.

Thanks and Regards,
A.

hey @Anil

have a look at the “mode” setting.

https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html#mode

just to have you on the same page some deep dive how it works.

Hi mmuehlbeyer,

Thanks for sharing Information.

I need some information about update option.

I have taken Timestamp + incrementing mode. I am looking for updated rows only, New rows insertion is working fine.


mode= timestamp+incrementing

query=SELECT SALES_ORDER, CAST(SO_ITEM AS NUMBER(5,0)) AS SO_ITEM, MATERIAL, CAST(QUANTITY AS NUMBER(5,2)) AS QUANTITY, UNIT_OF_MEASURE, REQUIRED_DATE, SOLD_TO_PARTY_NAME,SHIP_TO_PARTY_NAME, SALES_ORGANIZATION, DISTRIBUTION_CHANNEL, PURCHASE_ORDER_NUMBER, PLANT,DESTINATION_CITY, ORDER_CREATION_DATE, MODE_OF_TRANSPORT, CAST(STATUS_FLAG AS NUMERIC(1,0)) AS STATUS_FLAG, ERROR_DESCRIPTION, DATE_CREATED,DATE_MODIFIED, PI_PROCESS_STATUS, CAST(SO_OVERDELIVERY_TOL AS NUMBER(5,0)) AS SO_OVERDELIVERY_TOL, CAST(SO_UNDERDELIVERY_TOL AS NUMBER(5,0)) AS SO_UNDERDELIVERY_TOL, ROUTE,ROUTE_DESCRIPTION,UNLOADING_POINT,RECEIVING_POINT,ROUTE_TREE,PROCESS_PATH,MATERIAL_TREE, YIELD_STRING,RELEASE_DATE, PROPOSED_DELIVERY_DATE, COMMITTED_DATE FROM VJNR_MES_SO_HEADER

timestamp.column.name=REQUIRED_DATE
incrementing.column.name=SO_ITEM


We are updating rows as below.

UPDATE SOPOOB.VJNR_MES_SO_HEADER

SET SALES_ORDER=‘pqrs’

WHERE DATE_MODIFIED = TO_DATE(‘23/04/24’, ‘DD/MM/YY’)

UPDATE SOPOOB.VJNR_MES_SO_HEADER

SET SALES_ORDER=‘pqrs’

WHERE SO_ITEM=24;

we have not installed any trigger. Just config properties only taken through jdbc source connector.

I can able to see in Oracle, it was updating properly, but in topic I cant able to see any data is coming.

while insertion, I can able to see below commit log.

[2024-07-05 12:12:25,960] INFO [oracle-kafka|task-0|offsets] WorkerSourceTask{id=oracle-kafka-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)

and data is coming in topic…

while updating the record, I am unable to see any commit log is coming.
Kindly, help me to understand more about Update query…

For insertion, I have used, select query, for update , am I need to write any query.

are we needs to create trigger for update/insert , please, let me know.

Kindly, help me to understand, am I missing anything…here.

Thanks and Regards,
Anilkumar. S

my Table

CREATE TABLE “SOPOOB”.“VJNR_MES_SO_HEADER”
( “SALES_ORDER” VARCHAR2(30),
“SO_ITEM” NUMBER(10,0),
“MATERIAL” VARCHAR2(30),
“QUANTITY” NUMBER(38,3),
“UNIT_OF_MEASURE” VARCHAR2(30),
“REQUIRED_DATE” DATE,
“SOLD_TO_PARTY_NAME” VARCHAR2(40),
“SHIP_TO_PARTY_NAME” VARCHAR2(40),
“SALES_ORGANIZATION” VARCHAR2(40),
“DISTRIBUTION_CHANNEL” VARCHAR2(30),
“PURCHASE_ORDER_NUMBER” VARCHAR2(35),
“PLANT” VARCHAR2(30),
“DESTINATION_CITY” VARCHAR2(30),
“ORDER_CREATION_DATE” DATE,
“MODE_OF_TRANSPORT” VARCHAR2(40),
“STATUS_FLAG” NUMBER(38,0) NOT NULL ENABLE,
“ERROR_DESCRIPTION” VARCHAR2(32),
“DATE_CREATED” DATE,
“DATE_MODIFIED” DATE,
“PI_PROCESS_STATUS” VARCHAR2(1),
“SO_OVERDELIVERY_TOL” NUMBER,
“SO_UNDERDELIVERY_TOL” NUMBER,
“ROUTE” VARCHAR2(6),
“ROUTE_DESCRIPTION” VARCHAR2(50),
“UNLOADING_POINT” VARCHAR2(50),
“RECEIVING_POINT” VARCHAR2(50),
“ROUTE_TREE” VARCHAR2(300) DEFAULT ‘NA’ NOT NULL ENABLE,
“PROCESS_PATH” VARCHAR2(300) DEFAULT ‘NA’ NOT NULL ENABLE,
“MATERIAL_TREE” VARCHAR2(300) DEFAULT ‘NA’ NOT NULL ENABLE,
“YIELD_STRING” VARCHAR2(300),
“RELEASE_DATE” DATE,
“PROPOSED_DELIVERY_DATE” DATE,
“COMMITTED_DATE” DATE
)

Source connector

name=kafkasource
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
name=oracle-kafka
connection.url=jdbc:oracle:thin:@localhot:1521/<>
connection.user=XXXXX
connection.password=XXXXXXXX
task.max=1
topic.prefix=orakafkapost8
mode= timestamp+incrementing

query=SELECT SALES_ORDER, CAST(SO_ITEM AS NUMBER(5,0)) AS SO_ITEM, MATERIAL, CAST(QUANTITY AS NUMBER(5,2)) AS QUANTITY, UNIT_OF_MEASURE, REQUIRED_DATE, SOLD_TO_PARTY_NAME,SHIP_TO_PARTY_NAME, SALES_ORGANIZATION, DISTRIBUTION_CHANNEL, PURCHASE_ORDER_NUMBER, PLANT,DESTINATION_CITY, ORDER_CREATION_DATE, MODE_OF_TRANSPORT, CAST(STATUS_FLAG AS NUMERIC(1,0)) AS STATUS_FLAG, ERROR_DESCRIPTION, DATE_CREATED,DATE_MODIFIED, PI_PROCESS_STATUS, CAST(SO_OVERDELIVERY_TOL AS NUMBER(5,0)) AS SO_OVERDELIVERY_TOL, CAST(SO_UNDERDELIVERY_TOL AS NUMBER(5,0)) AS SO_UNDERDELIVERY_TOL, ROUTE,ROUTE_DESCRIPTION,UNLOADING_POINT,RECEIVING_POINT,ROUTE_TREE,PROCESS_PATH,MATERIAL_TREE, YIELD_STRING,RELEASE_DATE, PROPOSED_DELIVERY_DATE, COMMITTED_DATE FROM VJNR_MES_SO_HEADER

timestamp.column.name=REQUIRED_DATE
incrementing.column.name=SO_ITEM

validate.non.null=true

poll.interval.ms=1000
numeric.mapping=best_fit
errors.tolerance=all
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
dialect.name=OracleDatabaseDialect

numeric.precision.mapping = true

transforms= REQUIRED_DATE_CONVERTER,ORDER_CREATION_DATE_CONVERTER,DATE_CREATED_CONVERTER,DATE_MODIFIED_CONVERTER,RELEASE_DATE_CONVERTER,PROPOSED_DELIVERY_DATE_CONVERTER, COMMITTED_DATE_CONVERTER
transforms.REQUIRED_DATE_CONVERTER.type= org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.REQUIRED_DATE_CONVERTER.format= yyyy-MM-dd
transforms.REQUIRED_DATE_CONVERTER.target.type= string
transforms.REQUIRED_DATE_CONVERTER.field= REQUIRED_DATE

transforms.ORDER_CREATION_DATE_CONVERTER.type= org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.ORDER_CREATION_DATE_CONVERTER.format= yyyy-MM-dd
transforms.ORDER_CREATION_DATE_CONVERTER.target.type= string
transforms.ORDER_CREATION_DATE_CONVERTER.field= ORDER_CREATION_DATE

transforms.DATE_CREATED_CONVERTER.type= org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.DATE_CREATED_CONVERTER.format= yyyy-MM-dd
transforms.DATE_CREATED_CONVERTER.target.type= string
transforms.DATE_CREATED_CONVERTER.field= DATE_CREATED

transforms.DATE_MODIFIED_CONVERTER.type= org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.DATE_MODIFIED_CONVERTER.format= yyyy-MM-dd
transforms.DATE_MODIFIED_CONVERTER.target.type= string
transforms.DATE_MODIFIED_CONVERTER.field= DATE_MODIFIED

transforms.RELEASE_DATE_CONVERTER.type= org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.RELEASE_DATE_CONVERTER.format= yyyy-MM-dd
transforms.RELEASE_DATE_CONVERTER.target.type= string
transforms.RELEASE_DATE_CONVERTER.field= RELEASE_DATE

transforms.PROPOSED_DELIVERY_DATE_CONVERTER.type= org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.PROPOSED_DELIVERY_DATE_CONVERTER.format= yyyy-MM-dd
transforms.PROPOSED_DELIVERY_DATE_CONVERTER.target.type= string
transforms.PROPOSED_DELIVERY_DATE_CONVERTER.field= PROPOSED_DELIVERY_DATE

transforms.COMMITTED_DATE_CONVERTER.type= org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.COMMITTED_DATE_CONVERTER.format= yyyy-MM-dd
transforms.COMMITTED_DATE_CONVERTER.target.type= string
transforms.COMMITTED_DATE_CONVERTER.field= COMMITTED_DATE

hey,

thanks for the update.
have look at for what you would like to do :

is the query needed? might it be enough to run the connector without the query config?

best,
michael

Hi Michael,

for oracle table,

CREATE TABLE “KAFKA”.“EMP”
( “EMPNO” NUMBER(5,0),
“ENAME” VARCHAR2(15),
“JOININGDATE” DATE,
“RELEASEDATE” DATE,
PRIMARY KEY (“EMPNO”)

for Postgres,

CREATE TABLE public.emp (

empno numeric(5) NOT NULL,

ename varchar(15) NULL,

joiningdate timestamp NULL,

releasedate timestamp NULL,

CONSTRAINT emp_pkey PRIMARY KEY (empno)

);

JDBC Source Connector:- Oracle

{
name= my-jdbc-connector
config= {
name=kafkasouece
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
name=oracle-kafka
connection.url=jdbc:oracle:thin:@localhost:1521/orcl
connection.user=kafka
connection.password=check
task.max=1
topic.prefix= opkcheck125
mode= timestamp+incrementing
query=SELECT EMPNO,ENAME,JOININGDATE,RELEASEDATE FROM EMP
timestamp.column.name=RELEASEDATE
incrementing.column.name=EMPNO
poll.interval.ms=1000
numeric.mapping=best_fit
errors.tolerance=all

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true

dialect.name=OracleDatabaseDialect
}
}

JDBC Sink Connector - Postgres

{
name=postgres-jdbc-sink-connector
config={
name= Kafka-sink-postgres
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
task.max=1
topics=opkcheck125
//JDBC Sink Connectors specific properties

connection.url=jdbc:postgresql://localhost:5432/postgres
connection.user=postgres
connection.password=admin

insert.mode=upsert

table.types=TABLE
table.name.format=public.emp

auto.create= true
auto.evolve= true

pk.mode=record_value
pk.fields=empno

transforms = RenameField
transforms.RenameField.type = org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames = EMPNO:empno,ENAME:ename,JOININGDATE:joiningdate,RELEASEDATE:releasedate

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true

dialect.name=PostgreSqlDatabaseDialect

db.timezone=Asia/Kolkata
}
}

I am updating at oracle side,

INSERT INTO emp values(11,‘venkat’,TO_DATE(‘23/04/22’, ‘DD/MM/YY’),TO_DATE(‘23/04/24’, ‘DD/MM/YY’));

SELECT * FROM emp;

UPDATE emp

SET ENAME=‘Rajesh’

WHERE EMPNO=11;

Still, facing issue for update, I have taken timestamp+incremental, So, am i doing any wrong…please help to check and update me.

any action needs to take.

what happens if you leave the query setting empty?