Flattening data, pushing out CSV's

I am trying to:

  • consume from Kafka topics
  • each topic contains just one JSON packet type, there can be multiple versions of the same record
  • select only records with the maximum version number
  • at scheduled times:
    ** write the flattened output to a CSV file with a particular dynamically created name & path

It looks to me like this is an ideal scenarion for ksqlDB, however I am struggling with the file/csv sink connector - I just don’t know where to start or how to configure/modify it to write to a particular file in a particular place.

I’d be very grateful for advice on how I might go about doing this, I am not good with the (file sink) connectors.

Welcome Paul,

Love the handle! This does sound like a good job for ksqlDB. As for the connect bit, you can find a lot of help on @rmoff’s site: Categories • rmoff's random ramblings And, specifically you may want to check out this video: From Zero to Hero with Kafka Connect.

Check out those resources and if you run into issues, feel free to post back here.

Thanks,
Dave

Thanks for taking the trouble to answer and I will go & look at the links.

I am thinking of spinning up a ksqlDB container for a play this weekend as I have a meeting about it first thing on Monday morning.

With my best regards, Paul.

I watched the vid, read a lot and spun up Kafka & ksqlDB using this yaml, all good.

I wanted to use the sftp sink connector, which looks straightforward so was mirroring the instructions to give the containers access to the connectors as installed by the connector hub client, installed without the entire confluent platform.

All good so far. So I followed the straightforward looking instruc
tions for installing the sftp connector:

paul@fox:~/confluent-hub-client$ confluent-hub install confluentinc/kafka-connect-sftp:2.3.2

The first thing that happens when you run the hub without the entire Confluent platform is this:

Unable to detect Confluent Platform installation. Specify --component-dir and --worker-configs explicitly.

Error: Invalid options or arguments

Which when you follow it up, presumes knowledge I simply don’t have yet and so ends up a rabbit hole of links. I think it is so obvious to someone with Moffat skills, it slips past their radar, but it stopped me.

So I will have to process the data & throw it out to another topic and forget connectors for now.

I’m not entirely clear where you’re running Kafka Connect (which you need if you want to run a connector). You can run it on its own, or within ksqlDB, and either way you can run it in Docker or locally.

If it’s in Docker then this is a good example of how to install a connector, and this is another way.

It’d be useful to know which rabbit holes you ended up in, so that we can try and plug some of those gaps & leaks in the docs

2 Likes

Thank you for your response, I am so sorry to not come back to you sooner; I have been swept off my feet.

I have started up a modified stack based on the yaml you pointed me, with the sFTP file connector installed. I have produced a couple of test records into Kafka and can read them back out with Kafkacat so I guess I am ready for some play/set up. I’ll go to the examples and see what I can achieve, thanks for the yaml pointer/kafka connect.

I have some data in my topic called ‘turnip’ and I have used a free on line JSON schema tool to create a schema and loaded it into the control centre for the topic, both for JSON key & value. All good so far, I did this so I didn’t have to specify fields when I carried out my CREATE STREAM. However ksqlDB doesn’t seem to be seeing the schema for my topic:

ksql> CREATE STREAM peskyrodent WITH (
>    KAFKA_TOPIC='turnip',
>    KEY_FORMAT='JSON',
>    VALUE_FORMAT='JSON'
>  );
No columns supplied.

I am getting a response when I query with curl:

curl --silent -X GET http://localhost:8081/subjects/turnip-value/versions/latest | jq .
{
  "subject": "turnip-value",
  "version": 2,
  "id": 2,
  "schemaType": "JSON",
  "schema": "{\"$schema\":\"http://json-schema.org/draft-04/schema#\",
... snip ...
}

I am trying to use Schema Registry integration with confluentinc/ksqldb-server:0.17.0, but the schema information I have created for the topic is not being picked up.

OK, some progress with this:

ksql> CREATE STREAM peskyrodent WITH (
>    KAFKA_TOPIC='turnip',
>    VALUE_FORMAT='JSON_SR'

returned some useful stuff. I am almost excited.

My excitement has subsided. borrowing from this StackExchange topic, I have added a couple of fields so the JSON looks like this:

{
  "Id": 12345678,
  "Version": 4,
  "persistTime" : "2020-10-06T13:30:25.373Z",
  "previous" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "registered",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  },
  "current" : {
    "device" : "REDACTED",
    "type" : "REDACTED",
    "group" : "REDACTED",
    "inventoryState" : "unknown",
    "managementState" : "registered",
    "communicationId" : "REDACTED",
    "manufacturer" : "",
    "description" : "",
    "model" : "",
    "location" : {
      "geo" : {
        "latitude" : "REDACTED",
        "longitude" : "REDACTED"
      },
      "address" : {
        "city" : "",
        "postalCode" : "",
        "street" : "",
        "houseNumber" : "",
        "floor" : "",
        "company" : "",
        "country" : "",
        "reference" : "",
        "timeZone" : "",
        "region" : "",
        "district" : ""
      },
      "logicalInstallationPoint" : ""
    },
    "tags" : [ ]
  }
}

And generated a JSON schema and uploaded it to the Control Center for the events topic:

{
    "$schema": "http://json-schema.org/draft-06/schema#",
    "$ref": "#/definitions/Welcome2",
    "definitions": {
        "Welcome2": {
            "type": "object",
            "additionalProperties": false,
            "properties": {
                "Id": {
                    "type": "integer"
                },
                "Version": {
                    "type": "integer"
                },
                "persistTime": {
                    "type": "string",
                    "format": "date-time"
                },
                "previous": {
                    "$ref": "#/definitions/Current"
                },
                "current": {
                    "$ref": "#/definitions/Current"
                }
            },
            "required": [
                "Id",
                "Version",
                "current",
                "persistTime",
                "previous"
            ],
            "title": "Welcome2"
        },
        "Current": {
            "type": "object",
            "additionalProperties": false,
            "properties": {
                "device": {
                    "type": "string"
                },
                "type": {
                    "type": "string"
                },
                "group": {
                    "type": "string"
                },
                "inventoryState": {
                    "type": "string"
                },
                "managementState": {
                    "type": "string"
                },
                "communicationId": {
                    "type": "string"
                },
                "manufacturer": {
                    "type": "string"
                },
                "description": {
                    "type": "string"
                },
                "model": {
                    "type": "string"
                },
                "location": {
                    "$ref": "#/definitions/Location"
                },
                "tags": {
                    "type": "array",
                    "items": {}
                }
            },
            "required": [
                "communicationId",
                "description",
                "device",
                "group",
                "inventoryState",
                "location",
                "managementState",
                "manufacturer",
                "model",
                "tags",
                "type"
            ],
            "title": "Current"
        },
        "Location": {
            "type": "object",
            "additionalProperties": false,
            "properties": {
                "geo": {
                    "$ref": "#/definitions/Geo"
                },
                "address": {
                    "$ref": "#/definitions/Address"
                },
                "logicalInstallationPoint": {
                    "type": "string"
                }
            },
            "required": [
                "address",
                "geo",
                "logicalInstallationPoint"
            ],
            "title": "Location"
        },
        "Address": {
            "type": "object",
            "additionalProperties": false,
            "properties": {
                "city": {
                    "type": "string"
                },
                "postalCode": {
                    "type": "string"
                },
                "street": {
                    "type": "string"
                },
                "houseNumber": {
                    "type": "string"
                },
                "floor": {
                    "type": "string"
                },
                "company": {
                    "type": "string"
                },
                "country": {
                    "type": "string"
                },
                "reference": {
                    "type": "string"
                },
                "timeZone": {
                    "type": "string"
                },
                "region": {
                    "type": "string"
                },
                "district": {
                    "type": "string"
                }
            },
            "required": [
                "city",
                "company",
                "country",
                "district",
                "floor",
                "houseNumber",
                "postalCode",
                "reference",
                "region",
                "street",
                "timeZone"
            ],
            "title": "Address"
        },
        "Geo": {
            "type": "object",
            "additionalProperties": false,
            "properties": {
                "latitude": {
                    "type": "string"
                },
                "longitude": {
                    "type": "string"
                }
            },
            "required": [
                "latitude",
                "longitude"
            ],
            "title": "Geo"
        }
    }
}

Back in the ksqlDB cli, I run CREATE STREAM as follows:

ksql> CREATE STREAM events_stream WITH (
>    KAFKA_TOPIC='events',
>    VALUE_FORMAT='JSON_SR'
>  );

Unable to verify if the value schema for topic events is compatible with ksqlDB.
Reason: Unsupported schema type org.everit.json.schema.EmptySchema

Please see https://github.com/confluentinc/ksql/issues/ to see if this particular reason is already known.
If not, please log a new issue, including this full error message.
Schema:{"$schema":"http://json-schema.org/draft-06/schema#","$ref":"#/definitions/Welcome2","definitions":{"Welcome2":{"type":"object","additionalProperties":false,"properties":{"Id":{"type":"integer"},"Version":{"type":"integer"},"persistTime":{"type":"string","format":"date-time"},"previous":{"$ref":"#/definitions/Current"},"current":{"$ref":"#/definitions/Current"}},"required":["Id","Version","current","persistTime","previous"],"title":"Welcome2"},"Current":{"type":"object","additionalProperties":false,"properties":{"device":{"type":"string"},"type":{"type":"string"},"group":{"type":"string"},"inventoryState":{"type":"string"},"managementState":{"type":"string"},"communicationId":{"type":"string"},"manufacturer":{"type":"string"},"description":{"type":"string"},"model":{"type":"string"},"location":{"$ref":"#/definitions/Location"},"tags":{"type":"array","items":{}}},"required":["communicationId","description","device","group","inventoryState","location","managementState","manufacturer","model","tags","type"],"title":"Current"},"Location":{"type":"object","additionalProperties":false,"properties":{"geo":{"$ref":"#/definitions/Geo"},"address":{"$ref":"#/definitions/Address"},"logicalInstallationPoint":{"type":"string"}},"required":["address","geo","logicalInstallationPoint"],"title":"Location"},"Address":{"type":"object","additionalProperties":false,"properties":{"city":{"type":"string"},"postalCode":{"type":"string"},"street":{"type":"string"},"houseNumber":{"type":"string"},"floor":{"type":"string"},"company":{"type":"string"},"country":{"type":"string"},"reference":{"type":"string"},"timeZone":{"type":"string"},"region":{"type":"string"},"district":{"type":"string"}},"required":["city","company","country","district","floor","houseNumber","postalCode","reference","region","street","timeZone"],"title":"Address"},"Geo":{"type":"object","additionalProperties":false,"properties":{"latitude":{"type":"string"},"longitude":{"type":"string"}},"required":["latitude","longitude"],"title":"Geo"}}}

So it is finding the JSON schema, but not liking it.
If I try the CREATE STREAM withour specifying JSON_SR, I get the following:

ksql> CREATE STREAM events_stream WITH (
>    KAFKA_TOPIC='events',
>    VALUE_FORMAT='JSON'
>  );
No columns supplied.

If I delete the schema from the control centre and re-run the CREATE STREAM statement in the hope it will serialise the data in the topic, produce a schema & upload it to the control centre, I receive the following:

ksql> CREATE STREAM events_stream WITH (
>    KAFKA_TOPIC='events',
>    VALUE_FORMAT='JSON_SR'
>  );
Schema for message values on topic events does not exist in the Schema Registry.Subject: events-value
Possible causes include:
- The topic itself does not exist       -> Use SHOW TOPICS; to check
- Messages on the topic are not serialized using a format Schema Registry supports      -> Use PRINT 'events' FROM BEGINNING; to verify
- Messages on the topic have not been serialized using a Confluent Schema Registry supported serializer -> See https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
- The schema is registered on a different instance of the Schema Registry       -> Use the REST API to list available subjects  https://docs.confluent.io/current/schema-registry/docs/api.html#get--subjects
- You do not have permissions to access the Schema Registry.Subject: events-value       -> See https://docs.confluent.io/current/schema-registry/docs/security.html

So I am still stuck. I think the auto populating of fields in CREATE statements by either creating or using the schema registry is really neat, but I seem to be doing something wrong.

I guess I’ll have to manually create an example, which is not quite so cool for the intended audience. I have found an informative link on github confluentinc/ksql/issues/5801 which discusses this problem. I can’t include the extra link as I am a noob.

I still have some time, am interested in feedback but am going to crack on.