Hi adhishankarit,
My source connector giving below.
name=kafkasource
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
name=oracle-kafka
connection.url=jdbc:oracle:thin:@XXXXX:1521/XXXXXX
connection.user=XXXXXX
connection.password=XXXXX
task.max=1
topic.prefix=orakafkapost5
mode= timestamp+incrementing
query=SELECT SALES_ORDER,SO_ITEM, MATERIAL, QUANTITY FROM OrderBook
timestamp.column.name=REQUIRED_DATE
incrementing.column.name=SO_ITEM
poll.interval.ms=1000
numeric.mapping=best_fit
errors.tolerance=all
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
dialect.name=OracleDatabaseDialect
numeric.precision.mapping = true
transforms=Cast
transforms.Cast.type= org.apache.kafka.connect.transforms.Cast$Value
transforms.Cast.spec= SO_ITEM:int32
My Table:
CREATE TABLE OrderBook
( “SALES_ORDER” VARCHAR2(30),
“SO_ITEM” NUMBER(38,0),
“MATERIAL” VARCHAR2(30),
“QUANTITY” NUMBER(38,0));
Inserting into oracle tables as below,
INSERT INTO TABLE_SJ_HEADER
(SALES_ORDER, SO_ITEM, MATERIAL, QUANTITY )
VALUES(‘cabc’,102, ‘cabc’,4)
in my JSON file, I can see…
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “SALES_ORDER”
},
{
“type”: “bytes”,
“optional”: false,
“name”: “org.apache.kafka.connect.data.Decimal”,
“version”: 1,
“parameters”: {
“scale”: “0”,
“connect.decimal.precision”: “38”
},
“field”: “SO_ITEM”
},
{
“type”: “string”,
“optional”: false,
“field”: “MATERIAL”
},
{
“type”: “bytes”,
“optional”: true,
“name”: “org.apache.kafka.connect.data.Decimal”,
“version”: 1,
“parameters”: {
“scale”: “0”,
“connect.decimal.precision”: “38”
},
“field”: “QUANTITY”
},
Payload look like,
“payload”: {
“SALES_ORDER”: “cabc”,
“SO_ITEM”: “Zg==”,
“MATERIAL”: “cabc”,
“QUANTITY”: “BA==”,
I have changed, CAST configs as below…please check.
transforms=Cast
transforms.Cast.type= org.apache.kafka.connect.transforms.Cast$Value
transforms.Cast.spec= SO_ITEM:int32
So, please help to check , how to change this.
transforms=Cast
transforms.Cast.type= org.apache.kafka.connect.transforms.Cast$Value
transforms.Cast.spec= SO_ITEM:int32
still i am getting, same values, how can we take this as an integer and float.
“SO_ITEM”: “Zg==”,
it needs to be change to ,
SO_ITEM=4
QUANTITY=24.3
Any code based on this , or snippet let me know. Please, let me know any thing needs to be change here.
Thanks and Regards,
Anilkumar. S