Jdbc sink connector to update a single column in table based on 'riskStatus' in postgres db

Hi,

I have been trying to build a custom jdbc sink connector to update a single column for existing record in postgres db.

I will get ‘riskStatus’ from kafka message from topic ‘aus-stream-risk-output’ as ‘Approved’ and I will need to update a column called STATUS to ‘Completed’ for a particular transaction.
Please help me to form an update query. By that I mean the logic to create an update statement in the code using Java and the kafka library.
Here is my code snippet.

protected void write(final String tableNameFormat, final Collection records) throws SQLException {
// tableNameFormat must be processed
try (Connection connection = dataSource.getConnection()) {
connection.setAutoCommit(false);

        if (isInsertModeDatabaseLevelEnabled) {
            Statement statement = connection.createStatement();
            records.forEach(e -> {
                final String fieldsAsString = arrayFieldNamesSplitAsString(e.valueSchema().fields());
                final String valuesAsString = arrayFieldValuesSplitAsString((Struct) e.value(), e.valueSchema());
                if(fieldsAsString.contains("riskStatus") && valuesAsString.contains("Approved")){
                    try {
                        final String finalQuery = String.format(UPDATE_STATEMENT, tableNameFormat, "riskStatus", "Completed");
                        log.info("Final prepared statement: '{}' //", finalQuery);
                        statement.addBatch(finalQuery);
                    } catch (SQLException ex1) {
                        ex1.printStackTrace();
                    }
                }

And the update query should look like the following.

update EVENT_STATUS set STATUS=‘Completed’ where AUTHORISATION_ID=‘someNumber’

Here is my config file.

sink-connector-config.json:
{
“name”:“event-status-jdbc-sink-connector-v1”,
“config”: {

"connector.class":"com.connect.jdbc.PostgreSQLSinkConnector",
"task.max":"1",
"topics":"aus-stream-risk-output",

"key.converter":"io.confluent.connect.avro.AvroConnector",
"value.converter":"io.confluent.connect.avro.AvroConnector",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter.schema.registry.url":"http://localhost:8081",

"transforms":"Flatten,RenameFields",
"transforms.Flatten.type":"org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.Flatten.delimiter":"_",

"transforms.RenameFields.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameFields.renames":"riskStatus:STATUS",

"connection.url":"postgresql://localhost:5432/postgres",
"connection.user":"postgres",
"connection.password":"admin",

"insert.mode": "upsert",

"batch.size": "2",

"table.name.format": "postgres.CONFIG.EVENT_STATUS",

"pk.mode": "record_value",
"pk.fields": "EVENT_STATUS_ID",

"db.timezone": "Asia/Kolkata"

}
}

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.