- The bad news: I couldnât reproduce your error
- To good news: using that file and config I could ingest the data and get it to work just fine using ksqlDB 0.15
I used this Docker Compose to spin up Kafka Connect etc, and then this connector config, which is the same as yours except I fiddled with fs.scan.filters
to get it to pick up the file:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ECG_Ingestion/config \
-d '{
"connector.class" : "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.cleanup.policy.class" : "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"fs.scan.directory.path" : "/data/",
"fs.scan.filters" : "io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern" : "file_ECG.csv",
"offset.strategy" : "name",
"skip.headers" : "1",
"tasks.max" : "1",
"topic" : "file_ECG",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"value.converter.schema.registry.url" : "http://schema-registry:8081/",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url" : "http://schema-registry:8081/",
"key.converter" : "io.confluent.connect.avro.AvroConverter",
"filters" : "ParseLine, HeartRateToInt, FrequencyToInt, RRLengthToInt, TimestampToInt, SetKey, KeyConversion",
"filters.ParseLine.extractColumnName" : "headers",
"filters.ParseLine.separator" : ";",
"filters.ParseLine.trimColumn" : "true",
"filters.ParseLine.type" : "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
"filters.SetKey.field" : "$key",
"filters.SetKey.type" : "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.SetKey.value" : "{{ uppercase($value.sampleNum)}}",
"filters.FrequencyToInt.field" : "Frequency",
"filters.FrequencyToInt.to" : "INTEGER",
"filters.FrequencyToInt.type" : "io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter",
"filters.HeartRateToInt.field" : "HeartRate",
"filters.HeartRateToInt.to" : "INTEGER",
"filters.HeartRateToInt.type" : "io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter",
"filters.KeyConversion.field" : "$key",
"filters.KeyConversion.to" : "STRING",
"filters.KeyConversion.type" : "io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter",
"filters.RRLengthToInt.field" : "RRLength",
"filters.RRLengthToInt.to" : "INTEGER",
"filters.RRLengthToInt.type" : "io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter",
"filters.TimestampToInt.field" : "timestamp",
"filters.TimestampToInt.to" : "LONG",
"filters.TimestampToInt.type" : "io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter"
}'
From there, I could then see the data in the Kafka topic from ksqlDB:
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
-------------------------------------------------------------------------
confluent_rmoff_01ksql_processing_log | 1 | 1
connect-file-pulse-status | 1 | 1
docker-connect-configs | 1 | 1
docker-connect-offsets | 25 | 1
docker-connect-status | 5 | 1
file_ECG | 1 | 1
-------------------------------------------------------------------------
ksql> PRINT file_ECG FROM BEGINNING LIMIT 5;
Key format: AVRO or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO
rowtime: 2021/02/24 21:16:47.681 Z, key: 715, value: {"timestamp": 1579604534, "inx": "20", "sampleNum": "715", "Frequency": 128, "HeartRate": 108, "SensorStatus": "255", "RRLength": 554, "Samples": "67"}, partition: 0
rowtime: 2021/02/24 21:16:47.681 Z, key: 715, value: {"timestamp": 1579604534, "inx": "24", "sampleNum": "715", "Frequency": 128, "HeartRate": 108, "SensorStatus": "255", "RRLength": 554, "Samples": "134"}, partition: 0
rowtime: 2021/02/24 21:16:47.681 Z, key: 715, value: {"timestamp": 1579604534, "inx": "28", "sampleNum": "715", "Frequency": 128, "HeartRate": 108, "SensorStatus": "255", "RRLength": 554, "Samples": "214"}, partition: 0
rowtime: 2021/02/24 21:16:47.681 Z, key: 715, value: {"timestamp": 1579604534, "inx": "32", "sampleNum": "715", "Frequency": 128, "HeartRate": 108, "SensorStatus": "255", "RRLength": 554, "Samples": "315"}, partition: 0
rowtime: 2021/02/24 21:16:47.681 Z, key: 715, value: {"timestamp": 1579604534, "inx": "36", "sampleNum": "715", "Frequency": 128, "HeartRate": 108, "SensorStatus": "255", "RRLength": 554, "Samples": "449"}, partition: 0
Topic printing ceased
ksql>
and then create the stream, describe it, and query it:
ksql> CREATE STREAM ECG_STREAM WITH(KAFKA_TOPIC='file_ECG',FORMAT='AVRO');
Message
----------------
Stream created
----------------
ksql> DESCRIBE ECG_STREAM;
Name : ECG_STREAM
Field | Type
---------------------------------------
ROWKEY | VARCHAR(STRING) (key)
TIMESTAMP | BIGINT
INX | VARCHAR(STRING)
SAMPLENUM | VARCHAR(STRING)
FREQUENCY | INTEGER
HEARTRATE | INTEGER
SENSORSTATUS | VARCHAR(STRING)
RRLENGTH | INTEGER
SAMPLES | VARCHAR(STRING)
---------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'.
ksql> SELECT * FROM ECG_STREAM EMIT CHANGES LIMIT 5;
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|ROWKEY |TIMESTAMP |INX |SAMPLENUM |FREQUENCY |HEARTRATE |SENSORSTATUS |RRLENGTH |SAMPLES |
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|715 |1579604534 |20 |715 |128 |108 |255 |554 |67 |
|715 |1579604534 |24 |715 |128 |108 |255 |554 |134 |
|715 |1579604534 |28 |715 |128 |108 |255 |554 |214 |
|715 |1579604534 |32 |715 |128 |108 |255 |554 |315 |
|715 |1579604534 |36 |715 |128 |108 |255 |554 |449 |
Limit Reached
Query terminated
One thing comparing my output with yours is that the PRINT
output shows your keys as null, whereas the connector should be setting them. Perhaps something strange happened at ingest, or there are messages on the topic from earlier attempts.