Kafka Connect Jdbc sink with protobuf convertor - How to use null values?

As title says i have a protobuff definition that imports “google/protobuf/wrappers.proto” .

When im trying to push into mysql via kafka jdbc connector im getting the error

Unsupported source data type: STRUCT.

But i dont have any structs.

In the confluent docs the nullable options is mentions, but i cant figure out where I need to enable it.

This is my protobuff and connector settings

    {
      "transforms": "TimestampConverter,RenameField",
      "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
      "transforms.TimestampConverter.format": "yyyy-MM-dd'\''T'\''HH:mm:ss",
      "transforms.TimestampConverter.target.type": "Timestamp",
      "transforms.TimestampConverter.field": "timestampUtc,timestampLocal",

      "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.RenameField.renames": "Id:MonitoringEventId,TimestampUtc:Date,EventType:Type,DeviceType:GeneratedBy",
      "transforms.RenameField.blacklist": "Duration,TypeId,TimestampLocal",

      "value.converter.schema.registry.url": "http://localhost:8081",
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    }
syntax = "proto3";

import "google/protobuf/wrappers.proto";

message MonitoringEventDAO {
    uint64 Id = 1;
    string TimestampUtc = 2;
    string TimestampLocal = 3;
    uint32 Duration = 4;

    uint32 BaseStationId = 5;
    google.protobuf.Int32Value CameraId = 6;
    uint32 AccountId = 7;
    google.protobuf.Int32Value SubjectId = 8;

    uint32 DeviceType = 9;
    uint32 TypeId = 10;
    string Message = 11;

    string Version = 12;
    uint32 EventType = 13;
    string Hash = 14;

    google.protobuf.StringValue StorageUrl = 15;
    bool PresentationUploaded = 16;
    uint32 DataType = 17;
    google.protobuf.Int32Value Frames = 18;
    google.protobuf.Int32Value Fps = 19;
    google.protobuf.Int32Value SampleRate = 20;
}
CREATE TABLE `Events` (
  `MonitoringEventId` int NOT NULL AUTO_INCREMENT,
  `BaseStationId` int DEFAULT NULL,
  `CameraId` int DEFAULT NULL,
  `AccountId` int DEFAULT NULL,
  `SubjectId` int DEFAULT NULL,
  `GeneratedBy` int NOT NULL,
  `Date` datetime(6) NOT NULL,
  `Message` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `Hash` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `Version` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
  `Type` int NOT NULL,
  `ReviewMonitoringEventReviewId` int DEFAULT NULL,
  `BreathingDataUrl` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
  `VideoUrl` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
  `DataType` int NOT NULL,
  `StorageUrl` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
  `Uploaded` tinyint(1) NOT NULL,
  `PresentationStorageUrl` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci,
  `PresentationUploaded` tinyint(1) NOT NULL,
  `Frames` int DEFAULT NULL,
  `Fps` int DEFAULT NULL,
  `SampleRate` int DEFAULT NULL,
  PRIMARY KEY (`MonitoringEventId`),
  KEY `IX_Events_AccountId` (`AccountId`),
  KEY `IX_Events_BaseStationId` (`BaseStationId`),
  KEY `IX_Events_CameraId` (`CameraId`),
  KEY `IX_Events_Date` (`Date`),
  KEY `IX_Events_GeneratedBy` (`GeneratedBy`),
  KEY `IX_Events_Hash` (`Hash`),
  KEY `IX_Events_ReviewMonitoringEventReviewId` (`ReviewMonitoringEventReviewId`),
  KEY `IX_Events_SubjectId` (`SubjectId`),
  KEY `IX_Events_Type` (`Type`),
  CONSTRAINT `FK_Events_Accounts_AccountId` FOREIGN KEY (`AccountId`) REFERENCES `Accounts` (`AccountId`) ON DELETE RESTRICT,
  CONSTRAINT `FK_Events_BaseStations_BaseStationId` FOREIGN KEY (`BaseStationId`) REFERENCES `BaseStations` (`BaseStationId`) ON DELETE RESTRICT,
  CONSTRAINT `FK_Events_Cameras_CameraId` FOREIGN KEY (`CameraId`) REFERENCES `Cameras` (`CameraId`) ON DELETE RESTRICT,
  CONSTRAINT `FK_Events_MonitoringEventReview_ReviewMonitoringEventReviewId` FOREIGN KEY (`ReviewMonitoringEventReviewId`) REFERENCES `MonitoringEventReview` (`MonitoringEventReviewId`) ON DELETE RESTRICT,
  CONSTRAINT `FK_Events_Subjects_SubjectId` FOREIGN KEY (`SubjectId`) REFERENCES `Subjects` (`SubjectId`) ON DELETE RESTRICT
) ENGINE=InnoDB AUTO_INCREMENT=162 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
    org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:591)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT
    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1578)
    at io.confluent.connect.jdbc.dialect.DatabaseDialect.bindField(DatabaseDialect.java:608)
    at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField(PreparedStatementBinder.java:186)
    at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindNonKeyFields(PreparedStatementBinder.java:172)
    at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:103)
    at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:184)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:79)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:563)
    ... 10 more

The top level object MonitoringEventDAO is a Struct to Connect API. What i the default key converter for your connector setup?

You are not setting key.converter, so it is using the default of the connect cluster. Also, what other settings do you for your connector you can share?

There is no key so its null convertor.I have shared all the settings, When I blacklist all nullable fields the message works and gets consumed by the connector

In looking at your protobuf more closely you do have structures, as google.protobuf.Int32Value and google.protobuf.StringValue would map to objects (structures), not primitives.

I would see if you could use the SMT flatten transformer to extract the value within each of the structures and “move up” the primitive within to the top-level structure,

flatten SampleRate.value to sampleRate

I am not an expert on protobuf, but I’m guessing that is what is going on (and not 100% sure of the syntax to get to the element within google.protobuf.Int32Value).

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.