Thank you for your response, I am so sorry to not come back to you sooner; I have been swept off my feet.
I have started up a modified stack based on the yaml you pointed me, with the sFTP file connector installed. I have produced a couple of test records into Kafka and can read them back out with Kafkacat so I guess I am ready for some play/set up. I’ll go to the examples and see what I can achieve, thanks for the yaml pointer/kafka connect.
I have some data in my topic called ‘turnip’ and I have used a free on line JSON schema tool to create a schema and loaded it into the control centre for the topic, both for JSON key & value. All good so far, I did this so I didn’t have to specify fields when I carried out my CREATE STREAM. However ksqlDB doesn’t seem to be seeing the schema for my topic:
ksql> CREATE STREAM peskyrodent WITH (
> KAFKA_TOPIC='turnip',
> KEY_FORMAT='JSON',
> VALUE_FORMAT='JSON'
> );
No columns supplied.
I am getting a response when I query with curl:
curl --silent -X GET http://localhost:8081/subjects/turnip-value/versions/latest | jq .
{
"subject": "turnip-value",
"version": 2,
"id": 2,
"schemaType": "JSON",
"schema": "{\"$schema\":\"http://json-schema.org/draft-04/schema#\",
... snip ...
}
I am trying to use Schema Registry integration with confluentinc/ksqldb-server:0.17.0, but the schema information I have created for the topic is not being picked up.
OK, some progress with this:
ksql> CREATE STREAM peskyrodent WITH (
> KAFKA_TOPIC='turnip',
> VALUE_FORMAT='JSON_SR'
returned some useful stuff. I am almost excited.
My excitement has subsided. borrowing from this StackExchange topic, I have added a couple of fields so the JSON looks like this:
{
"Id": 12345678,
"Version": 4,
"persistTime" : "2020-10-06T13:30:25.373Z",
"previous" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "registered",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
},
"current" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "registered",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
}
}
And generated a JSON schema and uploaded it to the Control Center for the events topic:
{
"$schema": "http://json-schema.org/draft-06/schema#",
"$ref": "#/definitions/Welcome2",
"definitions": {
"Welcome2": {
"type": "object",
"additionalProperties": false,
"properties": {
"Id": {
"type": "integer"
},
"Version": {
"type": "integer"
},
"persistTime": {
"type": "string",
"format": "date-time"
},
"previous": {
"$ref": "#/definitions/Current"
},
"current": {
"$ref": "#/definitions/Current"
}
},
"required": [
"Id",
"Version",
"current",
"persistTime",
"previous"
],
"title": "Welcome2"
},
"Current": {
"type": "object",
"additionalProperties": false,
"properties": {
"device": {
"type": "string"
},
"type": {
"type": "string"
},
"group": {
"type": "string"
},
"inventoryState": {
"type": "string"
},
"managementState": {
"type": "string"
},
"communicationId": {
"type": "string"
},
"manufacturer": {
"type": "string"
},
"description": {
"type": "string"
},
"model": {
"type": "string"
},
"location": {
"$ref": "#/definitions/Location"
},
"tags": {
"type": "array",
"items": {}
}
},
"required": [
"communicationId",
"description",
"device",
"group",
"inventoryState",
"location",
"managementState",
"manufacturer",
"model",
"tags",
"type"
],
"title": "Current"
},
"Location": {
"type": "object",
"additionalProperties": false,
"properties": {
"geo": {
"$ref": "#/definitions/Geo"
},
"address": {
"$ref": "#/definitions/Address"
},
"logicalInstallationPoint": {
"type": "string"
}
},
"required": [
"address",
"geo",
"logicalInstallationPoint"
],
"title": "Location"
},
"Address": {
"type": "object",
"additionalProperties": false,
"properties": {
"city": {
"type": "string"
},
"postalCode": {
"type": "string"
},
"street": {
"type": "string"
},
"houseNumber": {
"type": "string"
},
"floor": {
"type": "string"
},
"company": {
"type": "string"
},
"country": {
"type": "string"
},
"reference": {
"type": "string"
},
"timeZone": {
"type": "string"
},
"region": {
"type": "string"
},
"district": {
"type": "string"
}
},
"required": [
"city",
"company",
"country",
"district",
"floor",
"houseNumber",
"postalCode",
"reference",
"region",
"street",
"timeZone"
],
"title": "Address"
},
"Geo": {
"type": "object",
"additionalProperties": false,
"properties": {
"latitude": {
"type": "string"
},
"longitude": {
"type": "string"
}
},
"required": [
"latitude",
"longitude"
],
"title": "Geo"
}
}
}
Back in the ksqlDB cli, I run CREATE STREAM as follows:
ksql> CREATE STREAM events_stream WITH (
> KAFKA_TOPIC='events',
> VALUE_FORMAT='JSON_SR'
> );
Unable to verify if the value schema for topic events is compatible with ksqlDB.
Reason: Unsupported schema type org.everit.json.schema.EmptySchema
Please see https://github.com/confluentinc/ksql/issues/ to see if this particular reason is already known.
If not, please log a new issue, including this full error message.
Schema:{"$schema":"http://json-schema.org/draft-06/schema#","$ref":"#/definitions/Welcome2","definitions":{"Welcome2":{"type":"object","additionalProperties":false,"properties":{"Id":{"type":"integer"},"Version":{"type":"integer"},"persistTime":{"type":"string","format":"date-time"},"previous":{"$ref":"#/definitions/Current"},"current":{"$ref":"#/definitions/Current"}},"required":["Id","Version","current","persistTime","previous"],"title":"Welcome2"},"Current":{"type":"object","additionalProperties":false,"properties":{"device":{"type":"string"},"type":{"type":"string"},"group":{"type":"string"},"inventoryState":{"type":"string"},"managementState":{"type":"string"},"communicationId":{"type":"string"},"manufacturer":{"type":"string"},"description":{"type":"string"},"model":{"type":"string"},"location":{"$ref":"#/definitions/Location"},"tags":{"type":"array","items":{}}},"required":["communicationId","description","device","group","inventoryState","location","managementState","manufacturer","model","tags","type"],"title":"Current"},"Location":{"type":"object","additionalProperties":false,"properties":{"geo":{"$ref":"#/definitions/Geo"},"address":{"$ref":"#/definitions/Address"},"logicalInstallationPoint":{"type":"string"}},"required":["address","geo","logicalInstallationPoint"],"title":"Location"},"Address":{"type":"object","additionalProperties":false,"properties":{"city":{"type":"string"},"postalCode":{"type":"string"},"street":{"type":"string"},"houseNumber":{"type":"string"},"floor":{"type":"string"},"company":{"type":"string"},"country":{"type":"string"},"reference":{"type":"string"},"timeZone":{"type":"string"},"region":{"type":"string"},"district":{"type":"string"}},"required":["city","company","country","district","floor","houseNumber","postalCode","reference","region","street","timeZone"],"title":"Address"},"Geo":{"type":"object","additionalProperties":false,"properties":{"latitude":{"type":"string"},"longitude":{"type":"string"}},"required":["latitude","longitude"],"title":"Geo"}}}
So it is finding the JSON schema, but not liking it.
If I try the CREATE STREAM withour specifying JSON_SR, I get the following:
ksql> CREATE STREAM events_stream WITH (
> KAFKA_TOPIC='events',
> VALUE_FORMAT='JSON'
> );
No columns supplied.
If I delete the schema from the control centre and re-run the CREATE STREAM statement in the hope it will serialise the data in the topic, produce a schema & upload it to the control centre, I receive the following:
ksql> CREATE STREAM events_stream WITH (
> KAFKA_TOPIC='events',
> VALUE_FORMAT='JSON_SR'
> );
Schema for message values on topic events does not exist in the Schema Registry.Subject: events-value
Possible causes include:
- The topic itself does not exist -> Use SHOW TOPICS; to check
- Messages on the topic are not serialized using a format Schema Registry supports -> Use PRINT 'events' FROM BEGINNING; to verify
- Messages on the topic have not been serialized using a Confluent Schema Registry supported serializer -> See https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
- The schema is registered on a different instance of the Schema Registry -> Use the REST API to list available subjects https://docs.confluent.io/current/schema-registry/docs/api.html#get--subjects
- You do not have permissions to access the Schema Registry.Subject: events-value -> See https://docs.confluent.io/current/schema-registry/docs/security.html
So I am still stuck. I think the auto populating of fields in CREATE statements by either creating or using the schema registry is really neat, but I seem to be doing something wrong.
I guess I’ll have to manually create an example, which is not quite so cool for the intended audience. I have found an informative link on github confluentinc/ksql/issues/5801 which discusses this problem. I can’t include the extra link as I am a noob.
I still have some time, am interested in feedback but am going to crack on.