JDBC Sink Connector keeps treating some fields as null even if it isn't

The value of the serialized message is like this (all info are fake and generated via faker):

{
  "order_id": 14,
  "order_date": "2024-02-28T03:33:35",
  "customer_info": {
    "customer_id": 36615,
    "customer_name": "Dr. Walter Evans",
    "company": "National Bottle Corporation of the Philippines",
    "shipping_address": "Unit 2226 Taylor Condominiums Tower 5, 907 Banahaw Street, Basilan",
    "contact_number": "+632-7729-9815",
    "email": "d.evans45@nationalbottle.com.ph"
  },
  "payment_info": {
    "payment_status": true,
    "payment_method": "Bank Transfer",
    "payment_date": "2024-02-29T03:10:17"
  },
  "shipping_info": {
    "shipping_id": 4635715,
    "express_shipping": true,
    "order_status": "Delivered",
    "shipped_date": "2024-03-01T03:10:17",
    "eta": "2024-03-05T18:41:35",
    "finished_date": "2024-03-05T18:31:16"
  },
  "item_info": [
    {
      "sku": 42,
      "unit_price": 1475,
      "quantity": 14,
      "discount_percent": 20,
      "subtotal": 16520
    },
    {
      "sku": 15,
      "unit_price": 775,
      "quantity": 17,
      "discount_percent": 5,
      "subtotal": 12516.25
    },
    {
      "sku": 4,
      "unit_price": 675,
      "quantity": 14,
      "discount_percent": 30,
      "subtotal": 6615
    }
  ],
  "item_variety": 3,
  "shipping_fee": 125,
  "order_total": 35776.25
}

The only SMT i’ve got on the connector is ExtractField$Value to extract the fields of the nested customer_info. However, the messages aren’t being stored and is instead sent to the connector’s DLQ. I checked its header and this is what it says on the error (which I think is because it thinks that the customer_name, company, and shipping_address is null even if it isn’t).

[
  {
    "key": "__connect.errors.topic",
    "value": "finished_orders"
  },
  {
    "key": "__connect.errors.partition",
    "value": "0"
  },
  {
    "key": "__connect.errors.offset",
    "value": "812"
  },
  {
    "key": "__connect.errors.connector.name",
    "value": "lcc-w2dvvj"
  },
  {
    "key": "__connect.errors.task.id",
    "value": "0"
  },
  {
    "key": "__connect.errors.stage",
    "value": "TASK_PUT"
  },
  {
    "key": "__connect.errors.class.name",
    "value": "org.apache.kafka.connect.sink.SinkTask"
  },
  {
    "key": "__connect.errors.exception.class.name",
    "value": "java.sql.SQLException"
  },
  {
    "key": "__connect.errors.exception.message",
    "value": "Exception chain:\njava.sql.BatchUpdateException: Batch entry 0 INSERT INTO \"customer_info\" (\"customer_name\",\"customer_id\",\"email\") VALUES (('Dr. Walter Evans'),('36615'::int8),('d.evans45@nationalbottle.com.ph')) was aborted: ERROR: null value in column \"shipping_address\" of relation \"customer_info\" violates not-null constraint\n  Detail: Failing row contains (36615, Dr. Walter Evans, No Company, null, null, d.evans45@nationalbottle.com.ph).  Call getNextException to see other errors in the batch.\norg.postgresql.util.PSQLException: ERROR: null value in column \"shipping_address\" of relation \"customer_info\" violates not-null constraint\n  Detail: Failing row contains (36615, Dr. Walter Evans, No Company, null, null, d.evans45@nationalbottle.com.ph).\norg.postgresql.util.PSQLException: ERROR: null value in column \"shipping_address\" of relation \"customer_info\" violates not-null constraint\n  Detail: Failing row contains (36615, Dr. Walter Evans, No Company, null, null, d.evans45@nationalbottle.com.ph).\n"
  },
  {
    "key": "__connect.errors.exception.stacktrace",
    "value": "java.sql.SQLException: Exception chain:\njava.sql.BatchUpdateException: Batch entry 0 INSERT INTO \"customer_info\" (\"customer_name\",\"customer_id\",\"email\") VALUES (('Dr. Walter Evans'),('36615'::int8),('d.evans45@nationalbottle.com.ph')) was aborted: ERROR: null value in column \"shipping_address\" of relation \"customer_info\" violates not-null constraint\n  Detail: Failing row contains (36615, Dr. Walter Evans, No Company, null, null, d.evans45@nationalbottle.com.ph).  Call getNextException to see other errors in the batch.\norg.postgresql.util.PSQLException: ERROR: null value in column \"shipping_address\" of relation \"customer_info\" violates not-null constraint\n  Detail: Failing row contains (36615, Dr. Walter Evans, No Company, null, null, d.evans45@nationalbottle.com.ph).\norg.postgresql.util.PSQLException: ERROR: null value in column \"shipping_address\" of relation \"customer_info\" violates not-null constraint\n  Detail: Failing row contains (36615, Dr. Walter Evans, No Company, null, null, d.evans45@nationalbottle.com.ph).\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:165)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.unrollAndRetry(JdbcSinkTask.java:150)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:121)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:607)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:354)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:251)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:220)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n"
  }
]

To confirm, I modified my columns in the postgres database hosted on supabase to allow null value and set the default value to blank. After that, there was no more errors on the connector. It is now storing the messages from the kafka topic to the database but the customer_name, company, and shipping_address are all blank. But the customer_name and email as well as the customer_id primary key is stored correctly.

Any idea why?

Nevermind. I figured it out and I’m a dummy. But in case someone else experiences the same problem, quadruple-check your schema. For some reason (most definitely a mistake by me), the schema that I’ve been using has some missing parts - which were the missing fields.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.