DESCRIBE throws Failed to deserialise object error

Hello, everyone.
I am currently trying to ingest into my kafka topics some .csv files containg medical data.
I was successful in getting the rows into my topics with the FilePulse Source connector, but when I try to create a Stream on them and then use the DESCRIBE command, I get this error:

Failed to deserialise object
Caused by: Cannot construct instance of
        `io.confluent.ksql.rest.entity.SourceDescription`, problem:
        queryOffsetSummaries
 at [Source:
        (byte[  ])"[{"@type":"sourceDescription","statementText":"describe
        ECG_STREAM;","sourceDescription":{"name":"ECG_STREAM","windowType":null,"readQue
        ries":[  ],"writeQueries":[  ],"fields":[{"name":"TIMESTAMP","schema":{"type":"BIGIN
        T","fields":null,"memberSchema":null}},{"name":"INX","schema":{"type":"STRING","
        fields":null,"memberSchema":null}},{"name":"SAMPLENUM","schema":{"type":"STRING"
        ,"fields":null,"memberSchema":null}},{"name":"FREQUENCY","schema":{"type":"INTEG
        ER","fields":null,"memberSchema":null}},{"na"[truncated 760 bytes]; line: 1,
        column: 1244] (through reference chain:
        io.confluent.ksql.rest.entity.KsqlEntityList[0]->io.confluent.ksql.rest.entity.S
        ourceDescriptionEntity["sourceDescription"])
Caused by: queryOffsetSummaries

My connector configuration is pretty simple:

  • name = BR_Ingestion
  • connector.class = io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
  • tasks.max = 1
  • topic = file_BR
  • offset.strategy = name
  • filters = ParseLine, brIntervalToInt, breathRate, TimestampToInt
  • internal.kafka.reporter.bootstrap.servers = kafka1:19092
  • fs.scan.filters =
  • fs.scan.directory.path = /tmp/input_files/BR/
  • fs.cleanup.policy.class = io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
  • filters.brIntervalToInt.field = brInterval
  • skip.headers = 1
  • filters.ParseLine.extractColumnName = headers
  • filters.ParseLine.type = io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter
  • filters.brIntervalToInt.to = INTEGER
  • filters.brIntervalToInt.type = io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter
  • filters.ParseLine.separator = ;
  • filters.ParseLine.trimColumn = true
  • filters.breathRate.type = io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter
  • filters.breathRate.field = breathRate
  • filters.breathRate.to = INTEGER
  • filters.TimestampToInt.type = io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter
  • filters.TimestampToInt.field = timestamp
  • filters.TimestampToInt.to = LONG

I also tried to specify the key and value converter to the Avro class, and explicitly set the key/value.converter.schema.registry.url as an additional property, but haven’t managed to solve the issue.
Researching for the error also didn’t show any hope of a solution.

Am I missing something elementary in my configuration?
If someone could help me, I would really be thankful.
Best regards,

Hi @Steno, what version of ksqlDB server and CLI are you running? How have you deployed them?

Hello, @rmoff , thank you very much for answering.

Both my ksqlDB Server and CLI are version 6.0.1 and are deployed as docker containers.
I do not know if it could be useful, but here is one of the messages that are inside on the topics I am trying to base my streams on:

rowtime: 2021/02/24 07:23:57.857 Z, key: , value: {“timestamp”: 1579604593, “inx”: “212”, “sampleNum”: “773”, “Frequency”: 128, “HeartRate”: 112, “SensorStatus”: “255”, “RRLength”: 531, “Samples”: “-301”}

A key value, and schema, has not been set and has been left untouched.

The value I used to define the schema registry url should be correct, as I have also landoop’s api that shows me the schemas correctly.

Is the problem the key?

EDIT: I also tried to set the key at ingestion-time and manually create its schema, but the error remains the same.

Best regards,

Could you share:

  1. The full output of PRINT your_topic FROM BEGINNING LIMIT 1;
  2. The full CREATE STREAM statement that you’re running

Hello,

Absolutely, here they are:

ksql> print file_ECG from beginning limit 1;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO
rowtime: 2021/02/24 07:23:56.412 Z, key: <null>, value: {"timestamp": 1579604534, "inx": "20", "sampleNum": "715", "Frequency": 128, "HeartRate": 108, "SensorStatus": "255", "RRLength": 554, "Samples": "67"}
ksql> CREATE STREAM ECG_STREAM WITH(KAFKA_TOPIC='file_ECG',VALUE_FORMAT='AVRO');

Best regards,

Hmm that kinda looks OK. Is it only the DESCRIBE that fails? Does it work if you run this:

SET 'auto.offset.reset' = 'earliest';

SELECT * FROM ECG_STREAM EMIT CHANGES;

Hello,

So, in order:

  1. Running the SET command and then the SELECT command, worked properly.
    Here a sample of the output:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'.
ksql> SELECT * FROM ECG_STREAM EMIT CHANGES;
|TIMESTAMP               |INX                     |SAMPLENUM               |FREQUENCY               |HEARTRATE               |SENSORSTATUS              |RRLENGTH                |SAMPLES                 |
|1579604973              |328                     |1151                    |128                     |163                     |255                     |367                     |-34                     |
|1579604973              |332                     |1151                    |128                     |163                     |255                     |367                     |-249                    |                               

(I cleaned the input up a bit, or it would have been unreadable)

  1. Running the SET command and then trying the DESCRIBE command again, still ends up with the usual error.
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'.
ksql> DESCRIBE ECG_STREAM;
Failed to deserialise object
Caused by: Cannot construct instance of
        `io.confluent.ksql.rest.entity.SourceDescription`, problem:
        queryOffsetSummaries
 at [Source:
        (byte[  ])"[{"@type":"sourceDescription","statementText":"DESCRIBE
        ECG_STREAM;","sourceDescription":{"name":"ECG_STREAM","windowType":null,"readQue
        ries":[  ],"writeQueries":[  ],"fields":[{"name":"TIMESTAMP","schema":{"type":"BIGIN
        T","fields":null,"memberSchema":null}},{"name":"INX","schema":{"type":"STRING","
        fields":null,"memberSchema":null}},{"name":"SAMPLENUM","schema":{"type":"STRING"
        ,"fields":null,"memberSchema":null}},{"name":"FREQUENCY","schema":{"type":"INTEG
        ER","fields":null,"memberSchema":null}},{"na"[truncated 760 bytes]; line: 1,
        column: 1244] (through reference chain:
        io.confluent.ksql.rest.entity.KsqlEntityList[0]->io.confluent.ksql.rest.entity.S
        ourceDescriptionEntity["sourceDescription"])
Caused by: queryOffsetSummaries

So it looks like the DESCRIBE commands is the one with issues, and with all streams generated, even from different topics.

Best regards,

Do you have access to Docker to give ksqlDB 0.15 a try?

And/or are you able to share a sample of the CSV file so that I can try to reproduce the issue from here?

Hello,

I could test ksqlDB 0.15 in the next days to see if something changes, and will revert with the feedback on the attempt back as soon as I have the results
In the meanwhile, as you asked, here’s the one of the files that I am working with. As the data inside is completely anonymous, there was no problem in uploading the entire file as-is:

Input file: Gofile - Your all-in-one storage solution

Also, hoping it might help in reproducing the issue, I have also uploaded the configuration file corresponding the FilePulse source connector I used to ingest the file into my topic:

FilePulse link: Gofile - Your all-in-one storage solution

Thank you.
Best regards,

  • The bad news: I couldn’t reproduce your error :confused:
  • To good news: using that file and config I could ingest the data and get it to work just fine using ksqlDB 0.15 :slight_smile:

I used this Docker Compose to spin up Kafka Connect etc, and then this connector config, which is the same as yours except I fiddled with fs.scan.filters to get it to pick up the file:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ECG_Ingestion/config \
    -d '{
        "connector.class"                          : "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.cleanup.policy.class"                  : "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "fs.scan.directory.path"                   : "/data/",
        "fs.scan.filters"                          : "io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern"                : "file_ECG.csv",
        "offset.strategy"                          : "name",
        "skip.headers"                             : "1",
        "tasks.max"                                : "1",
        "topic"                                    : "file_ECG",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "value.converter.schema.registry.url"      : "http://schema-registry:8081/",
        "value.converter"                          : "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url"        : "http://schema-registry:8081/",
        "key.converter"                            : "io.confluent.connect.avro.AvroConverter",
        "filters"                                  : "ParseLine, HeartRateToInt, FrequencyToInt, RRLengthToInt, TimestampToInt, SetKey, KeyConversion",
        "filters.ParseLine.extractColumnName"      : "headers",
        "filters.ParseLine.separator"              : ";",
        "filters.ParseLine.trimColumn"             : "true",
        "filters.ParseLine.type"                   : "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
        "filters.SetKey.field"                     : "$key",
        "filters.SetKey.type"                      : "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
        "filters.SetKey.value"                     : "{{ uppercase($value.sampleNum)}}",
        "filters.FrequencyToInt.field"             : "Frequency",
        "filters.FrequencyToInt.to"                : "INTEGER",
        "filters.FrequencyToInt.type"              : "io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter",
        "filters.HeartRateToInt.field"             : "HeartRate",
        "filters.HeartRateToInt.to"                : "INTEGER",
        "filters.HeartRateToInt.type"              : "io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter",
        "filters.KeyConversion.field"              : "$key",
        "filters.KeyConversion.to"                 : "STRING",
        "filters.KeyConversion.type"               : "io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter",
        "filters.RRLengthToInt.field"              : "RRLength",
        "filters.RRLengthToInt.to"                 : "INTEGER",
        "filters.RRLengthToInt.type"               : "io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter",
        "filters.TimestampToInt.field"             : "timestamp",
        "filters.TimestampToInt.to"                : "LONG",
        "filters.TimestampToInt.type"              : "io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter"
        }' 

From there, I could then see the data in the Kafka topic from ksqlDB:

ksql> SHOW TOPICS;

 Kafka Topic                           | Partitions | Partition Replicas
-------------------------------------------------------------------------
 confluent_rmoff_01ksql_processing_log | 1          | 1
 connect-file-pulse-status             | 1          | 1
 docker-connect-configs                | 1          | 1
 docker-connect-offsets                | 25         | 1
 docker-connect-status                 | 5          | 1
 file_ECG                              | 1          | 1
-------------------------------------------------------------------------
ksql> PRINT file_ECG FROM BEGINNING LIMIT 5;
Key format: AVRO or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO
rowtime: 2021/02/24 21:16:47.681 Z, key: 715, value: {"timestamp": 1579604534, "inx": "20", "sampleNum": "715", "Frequency": 128, "HeartRate": 108, "SensorStatus": "255", "RRLength": 554, "Samples": "67"}, partition: 0
rowtime: 2021/02/24 21:16:47.681 Z, key: 715, value: {"timestamp": 1579604534, "inx": "24", "sampleNum": "715", "Frequency": 128, "HeartRate": 108, "SensorStatus": "255", "RRLength": 554, "Samples": "134"}, partition: 0
rowtime: 2021/02/24 21:16:47.681 Z, key: 715, value: {"timestamp": 1579604534, "inx": "28", "sampleNum": "715", "Frequency": 128, "HeartRate": 108, "SensorStatus": "255", "RRLength": 554, "Samples": "214"}, partition: 0
rowtime: 2021/02/24 21:16:47.681 Z, key: 715, value: {"timestamp": 1579604534, "inx": "32", "sampleNum": "715", "Frequency": 128, "HeartRate": 108, "SensorStatus": "255", "RRLength": 554, "Samples": "315"}, partition: 0
rowtime: 2021/02/24 21:16:47.681 Z, key: 715, value: {"timestamp": 1579604534, "inx": "36", "sampleNum": "715", "Frequency": 128, "HeartRate": 108, "SensorStatus": "255", "RRLength": 554, "Samples": "449"}, partition: 0
Topic printing ceased
ksql>

and then create the stream, describe it, and query it:

ksql> CREATE STREAM ECG_STREAM WITH(KAFKA_TOPIC='file_ECG',FORMAT='AVRO');

 Message
----------------
 Stream created
----------------

ksql> DESCRIBE ECG_STREAM;

Name                 : ECG_STREAM
 Field        | Type
---------------------------------------
 ROWKEY       | VARCHAR(STRING)  (key)
 TIMESTAMP    | BIGINT
 INX          | VARCHAR(STRING)
 SAMPLENUM    | VARCHAR(STRING)
 FREQUENCY    | INTEGER
 HEARTRATE    | INTEGER
 SENSORSTATUS | VARCHAR(STRING)
 RRLENGTH     | INTEGER
 SAMPLES      | VARCHAR(STRING)
---------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'.

ksql> SELECT * FROM ECG_STREAM EMIT CHANGES LIMIT 5;
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|ROWKEY            |TIMESTAMP         |INX               |SAMPLENUM         |FREQUENCY         |HEARTRATE         |SENSORSTATUS      |RRLENGTH          |SAMPLES           |
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|715               |1579604534        |20                |715               |128               |108               |255               |554               |67                |
|715               |1579604534        |24                |715               |128               |108               |255               |554               |134               |
|715               |1579604534        |28                |715               |128               |108               |255               |554               |214               |
|715               |1579604534        |32                |715               |128               |108               |255               |554               |315               |
|715               |1579604534        |36                |715               |128               |108               |255               |554               |449               |
Limit Reached
Query terminated

One thing comparing my output with yours is that the PRINT output shows your keys as null, whereas the connector should be setting them. Perhaps something strange happened at ingest, or there are messages on the topic from earlier attempts.

Hello,

Thank you very much, I will definitely try with the 0.15 version.
Do you suggest it as a more stable version in general?
Best regards,

The issue you reported is not one I’ve seen before - so either it’s a weird bug in the version you’ve got, or something’s up with your install. Either way, trying the new version should definitely help :slight_smile:

I understand, I will definitely try this version then: what I was using before was confluentinc/cp-ksqldb-server:6.0.1.
Thank you very much for your help, I really appreciated it!
Best regards,

One more thing - could you post the splash screen from when you launch the CLI?

Hello,

So, I have managed to make a few more tests and got ahold of the 0.15.0 version.
With the same ‘yml code’, this were the results:

  • With the original cp-ksqldb-server:6.0.1 image, the error is as I have reported
  • With the ksqldb-server:0.15.0 image, that specific error disappears (I have another one with the construction of the kafka consumer, but I want to research on it).

It does seem like that specific version is causing some issue, but it could also be my installation being messed up, as I am not very experienced yet.

As you have asked, here is the splash screen of the CLI:

Best regards,

Ah yes, this is the problem

image

It looks like you’ve mixed CLI and server versions, and that will cause the kind of behaviour you’ve been seeing.

Can you reproduce the DESCRIBE problem if you also use 6.0.1 CLI (not 0.15) ?

Hello,

Yes, this solved the issue, I did not thought that it was related to difference between server’s and cli’s versions.
Thank you very much for your help @rmoff , you have been extremely helpful and kind!

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.