How to return java.util.Date values instead of Long using KafkaJsonSchemaDeserializer with Postgres JDBC source connector?

I have setup Kafka with the JDBC source connector to Postgres as well as the schema registry. It is configured using io.confluent.connect.jdbc.JdbcSourceConnector and it is correctly retrieving the rows with the associated field values.

However my Postgres fields of type “timestamp with time zone” are being deserialised as “long” values (number of millseconds since 1970) rather than Java “Date” values. I’ve tried various config options at TimestampConverter | Confluent Documentation but I can’t make it return anything else more useful.

Is this a limitation of using JSON serialisation or is there a Kafka option which can address this? Before using the schema registry, an example record from the DB looks as below, where you can see the fields are correctly identified as “timestamp” fields, but are returned as longs.

In my application, I don’t know in advance which fields will be “timestamps”, so I’d prefer them to be deserialised as Date values rather than Long values when I retrieve the java Map value from ConsumerRecord.value().

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": false,
        "field": "quote"
      },
      {
        "type": "string",
        "optional": false,
        "field": "author"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "created_at"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "updated_at"
      }
    ],
    "optional": false,
    "name": "quote"
  },
  "payload": {
    "id": 1,
    "quote": "There are only two kinds of languages: the ones people complain about and the ones nobody uses.",
    "author": "Bjarne Stroustrup",
    "created_at": 1664155159683,
    "updated_at": 1664155159683
  }
}

JSON doesn’t natively offer any date types, only number or string, as compared to Avro or Protobuf.

So , unless you change serialization format ,the only workaround would be to define the conversion at the POJO deserializer level if you cannot get TimestampConverter to convert into a string which can then be parsed. Either way, you’ll need to know what fields are represented as times in the tables

Thanks. I changed to use io.confluent.connect.protobuf.ProtobufConverter and as expected the Kafka consumer now gets a DynamicMessage with the various fields. I was expecting/hoping the timestamp fields would be represented as a google.protobuf.Timestamp object, but instead it was returned as a DynamicMessage with a single field with name google.protobuf.Timestamp.seconds.

Is there some special config required to make this deserialise “properly”? I am using 7.2.1 so I was under the impression this would work based off Post protobuf schema in Schema Registry - #2 by rick.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.