Kafka JDBC Source Connector

Hi,

I am inserting data into oracle through Kafka jdbc source connector.

my field
SO_ITEM NUMBER(38,0), Length - 22, Precision - 38, scale - 0 with not null.

I can see in json format sending as below,

{
“type”: “bytes”,
“optional”: false,
“name”: “org.apache.kafka.connect.data.Decimal”,
“version”: 1,
“parameters”: {
“scale”: “0”,
“connect.decimal.precision”: “38”
},
“field”: “SO_ITEM”
},

value is coming : “SO_ITEM”: “Vw==”,

how can i insert values normal integer and float, but its taking as base64 format, not integer.

Can you please help to identify, how can we convert and send integer.

how can we do in transformation using kafka jdbc source connector.

The JDBC source connector by defaults converts numeric columns value to base64 encoded value,you can cast the numeric columns into respective INT32 or INT64,FLOAT64 etc. Please refer the JDBC source connector document for casting data types using SMT

Hi adhishankarit,

My source connector giving below.

name=kafkasource
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
name=oracle-kafka
connection.url=jdbc:oracle:thin:@XXXXX:1521/XXXXXX
connection.user=XXXXXX
connection.password=XXXXX
task.max=1
topic.prefix=orakafkapost5
mode= timestamp+incrementing
query=SELECT SALES_ORDER,SO_ITEM, MATERIAL, QUANTITY FROM OrderBook
timestamp.column.name=REQUIRED_DATE
incrementing.column.name=SO_ITEM
poll.interval.ms=1000
numeric.mapping=best_fit
errors.tolerance=all
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
dialect.name=OracleDatabaseDialect

numeric.precision.mapping = true

transforms=Cast
transforms.Cast.type= org.apache.kafka.connect.transforms.Cast$Value
transforms.Cast.spec= SO_ITEM:int32

My Table:

CREATE TABLE OrderBook
( “SALES_ORDER” VARCHAR2(30),
“SO_ITEM” NUMBER(38,0),
“MATERIAL” VARCHAR2(30),
“QUANTITY” NUMBER(38,0));

Inserting into oracle tables as below,

INSERT INTO TABLE_SJ_HEADER
(SALES_ORDER, SO_ITEM, MATERIAL, QUANTITY )
VALUES(‘cabc’,102, ‘cabc’,4)

in my JSON file, I can see…
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “SALES_ORDER”
},
{
“type”: “bytes”,
“optional”: false,
“name”: “org.apache.kafka.connect.data.Decimal”,
“version”: 1,
“parameters”: {
“scale”: “0”,
“connect.decimal.precision”: “38”
},
“field”: “SO_ITEM”
},
{
“type”: “string”,
“optional”: false,
“field”: “MATERIAL”
},
{
“type”: “bytes”,
“optional”: true,
“name”: “org.apache.kafka.connect.data.Decimal”,
“version”: 1,
“parameters”: {
“scale”: “0”,
“connect.decimal.precision”: “38”
},
“field”: “QUANTITY”
},

Payload look like,

“payload”: {
“SALES_ORDER”: “cabc”,
“SO_ITEM”: “Zg==”,
“MATERIAL”: “cabc”,
“QUANTITY”: “BA==”,

I have changed, CAST configs as below…please check.

transforms=Cast
transforms.Cast.type= org.apache.kafka.connect.transforms.Cast$Value
transforms.Cast.spec= SO_ITEM:int32

So, please help to check , how to change this.

transforms=Cast
transforms.Cast.type= org.apache.kafka.connect.transforms.Cast$Value
transforms.Cast.spec= SO_ITEM:int32

still i am getting, same values, how can we take this as an integer and float.

“SO_ITEM”: “Zg==”,

it needs to be change to ,

SO_ITEM=4
QUANTITY=24.3

Any code based on this , or snippet let me know. Please, let me know any thing needs to be change here.

Thanks and Regards,
Anilkumar. S

I have a field with number(38,0) datatype in my source db and when I try to get the data using source connector, the data is coming in unicode format into the topic ? How can I cast this? In my traget table its having integer datatype. Source is oracle and target is poatgres

Finally, I got a solution and issue resolved, Thanks …

for eg:

CAST(SO_ITEM AS NUMBER(5,0)) AS SO_ITEM