Kafka JDBC connector

APPLICATION_NUMBER,CUSTOMERID,PURCHASE_ORDERNO,MATERIALNO,ORDER_TYPE,SUPPLIER,UNIT,DATETIME,ADDRES1

S,MOBILENO,EMAIL_ADDRESS from salesorder) AS SALESORDER WHERE “DATETIME” < ? AND ((“DATETIME” = ? AND

“checkno” > ?) OR “DATETIME” > ?) ORDER BY “DATETIME”,“checkno” ASC, Error Msg = ORA-00933: SQL command

not properly ended


 my doubts are,

1). where we can give input information/is it coming runtime, (“?”), how it is taking input values
2). how to resolve SQL Command not properly ended
3). I am inserting data from one database and one simple query, please let me know, can we do multiple statements in single query
4). can we use multiple query in same source/sink connector property files.
Please help to resolve above

Hi,

I am working on Kafka Oracle to Postgress DB Data Migration.

Oracle Table

Name Null? Type


CHECKNO NUMBER(5)
APPLICATION_NUMBER NOT NULL VARCHAR2(15)
CUSTOMERID NOT NULL VARCHAR2(10)
PURCHASE_ORDERNO NOT NULL VARCHAR2(15)
MATERIALNO NOT NULL VARCHAR2(15)
ORDER_TYPE NOT NULL VARCHAR2(6)
SUPPLIER NOT NULL VARCHAR2(14)
UNIT NOT NULL NUMBER(6)
DATETIME TIMESTAMP(6)
ADDRESS VARCHAR2(15)
MOBILENO NOT NULL NUMBER(10)
EMAIL_ADDRESS VARCHAR2(15)

I am executing through standalone Kafka JDBC connector and getting below errors.

[2024-02-05 20:17:33,624] ERROR [Oracle-Source-Kafka|task-0] Non-transient SQL exception while running query for table: TimestampIncrementingTableQuerier{table=null, query=‘SELECT * FROM (select APPLICATION_NUMBER,CUSTOMERID,PURCHASE_ORDERNO,MATERIALNO,ORDER_TYPE,SUPPLIER,UNIT,DATETIME,ADDRESS,MOBILENO,EMAIL_ADDRESS from salesorder) AS SALESORDER’, topicPrefix=‘’, incrementingColumn=‘checkno’, timestampColumns=[DATETIME]} (io.confluent.connect.jdbc.source.JdbcSourceTask:470)
java.sql.SQLSyntaxErrorException: ORA-00933: SQL command not properly ended

    at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:630)
    at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:564)
    at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1231)
    at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:772)
    at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:299)
    at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:512)
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:163)
    at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:1010)
    at oracle.jdbc.driver.OracleStatement.prepareDefineBufferAndExecute(OracleStatement.java:1271)
    at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:1149)
    at oracle.jdbc.driver.OracleStatement.executeSQLSelect(OracleStatement.java:1661)
    at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1470)
    at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3761)
    at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3936)
    at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1102)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:213)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:164)
    at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:436)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)

Caused by: Error : 933, Position : 162, Sql = SELECT * FROM (select APPLICATION_NUMBER,CUSTOMERID,PURCHASE_ORDERNO,MATERIALNO,ORDER_TYPE,SUPPLIER,UNIT,DATETIME,ADDRESS,MOBILENO,EMAIL_ADDRESS from salesorder) AS SALESORDER WHERE “DATETIME” < :1 AND ((“DATETIME” = :2 AND “checkno” > :3 ) OR “DATETIME” > :4 ) ORDER BY “DATETIME”,“checkno” ASC, OriginalSql = SELECT * FROM (select APPLICATION_NUMBER,CUSTOMERID,PURCHASE_ORDERNO,MATERIALNO,ORDER_TYPE,SUPPLIER,UNIT,DATETIME,ADDRESS,MOBILENO,EMAIL_ADDRESS from salesorder) AS SALESORDER WHERE “DATETIME” < ? AND ((“DATETIME” = ? AND “checkno” > ?) OR “DATETIME” > ?) ORDER BY “DATETIME”,“checkno” ASC, Error Msg = ORA-00933: SQL command not properly ended

Executed below command

c:\kafka\bin\windows>connect-standalone.bat …..\config\connect-standalone.properties …..\config\jdbc-source.json

***Connect-standalone property file ****

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

it to

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets

Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000
plugin.path=C:\kafka\libs

***Worker source property File ***

{
name=jdbc_source_connector_oracle_01
config={
name=Oracle-Source-Kafka
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
task.max=1

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

// transforms=createKey,setSchema
// transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
// transforms.createKey.fields=Unit,DATETIME
// transforms.setSchema.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
// transforms.setSchema.schema.name=SalesOrderRecords

//JDBC Source Connectors specific properties

connection.url=jdbc:oracle:thin:@localhost:1521/orcl
connection.user=
connection.password=
dialect.name=OracleDatabaseDialect

mode=timestamp+incrementing
incrementing.column.name=checkno
timestamp.column.name=DATETIME

numeric.precision.mapping=true
numeric.mapping=best_fit

query=SELECT * FROM (select APPLICATION_NUMBER,CUSTOMERID,PURCHASE_ORDERNO,MATERIALNO,ORDER_TYPE,SUPPLIER,UNIT,DATETIME,ADDRESS,MOBILENO,EMAIL_ADDRESS from salesorder) AS SALESORDER
table.type=TABLE

poll.interval=5000
batch.max.rows=2
topics=sorder
db.timezone=Asia/Kolkata
}
}

while executing, custom function where cluase adding. I am not getting how values needs to added below. I am having some doubts and issue not resolving, Please help here.

1). where we can give input information/is it coming runtime, (“?”), how it is taking input values
2). how to resolve SQL Command not properly ended
3). I am inserting data from one database and one simple query, please let me know, can we do multiple statements in single query
4). can we use multiple query in same source/sink connector property files.
Please help to resolve above