Connect sink failing json serialisation

hi all… the below code / curl use to work when i was json.marshalling my msg int byte to post. I’ve now switched to a schema registry client which also calls for using the associated json serialize (does a schema check before posting msg)

curl
  curl -X POST \
  -H "Content-Type: application/json" \
  --data '
      {"name": "mongo-local-salesbaskets-sink-json",
        "config": {
          "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
          "connection.uri":"mongodb://mbp.local:27017/?directConnection=true",
          "key.converter": "org.apache.kafka.connect.storage.StringConverter",
          "value.converter": "org.apache.kafka.connect.json.JsonConverter",
          "value.converter.schemas.enable": true,
          "database":"MongoCom0",
          "collection":"json_salesbaskets",
          "topics":"json_salesbaskets"
          }
      }
      ' \
  http://localhost:8083/connectors -w "\n"

connect sinks are failing with the following line in the connect logs.

connect | Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

How were the events in the json_salesbaskets topic serialized when produced? What does the structure look like? The combination of JsonConverter and value.converter.schemas.enable=true in your connector config requires a specific message format with payload and embedded schema root elements, e.g.:

{
  "schema": {
    "type": "struct", "optional": false, "version": 1, "fields": [
      { "field": "my_string", "type": "string", "optional": true },
      { "field": "my_int", "type": "int64", "optional": true }
    ]
  },
  "payload": {
    "my_string": "hello",
    "my_int": 123456
  }
}

More on this topic in this blog.

Not to make a mess here
Can we take this offline…
G

My data is being produced by an Golang app that I wrote to test bits, the main purpose is to build understanding.

With the new create schema registration client and then the serializer (as per below)… and then post the performance have gone from 10000txn/sec (this was json.marshal) down to 500/sec. (doing the below)

Happy to change the curl… and settings in there.

The basic code is in one of the other threads. Can repost here, just out quickly.
G

See: https://forum.confluent.io/t/json-with-schema-registry/10261

Types Declares.

type Tp_basket struct {
	InvoiceNumber string          `json:"invoiceNumber,omitempty"`
	SaleDateTime  string          `json:"saleDateTime,omitempty"`
	SaleTimestamp string          `json:"saleTimestamp,omitempty"`
	Store         TStoreStruct    `json:"store,omitempty"`
	Clerk         TPClerkStruct   `json:"clerk,omitempty"`
	TerminalPoint string          `json:"terminalPoint,omitempty"`
	BasketItems   []Tp_BasketItem `json:"basketItems,omitempty"`
	Nett          float64         `json:"nett,omitempty"`
	VAT           float64         `json:"vat,omitempty"`
	Total         float64         `json:"total,omitempty"`
}

type Tp_payment struct {
	InvoiceNumber    string  `json:"invoiceNumber,omitempty"`
	PayDateTime      string  `json:"payDateTime,omitempty"`
	PayTimestamp     string  `json:"payTimestamp,omitempty"`
	Paid             float64 `json:"paid,omitempty"`
	FinTransactionID string  `json:"finTransactionId,omitempty"`
}

type TPClerkStruct struct {
	Id   string `json:"id,omitempty"`
	Name string `json:"name,omitempty"`
}

type TStoreStruct struct {
	Id   string `json:"id,omitempty"`
	Name string `json:"name,omitempty"`
}

main Kafka bits declare…

func main() {
	var p *kafka.Producer
	var client schemaregistry.Client
	var serdes *jsonschema.SerializerConfig
	var serializer *jsonschema.Serializer

		// Create a new Schema Registry client
		client, err = schemaregistry.NewClient(schemaregistry.NewConfig(vKafka.SchemaRegistryURL))

		serdes = jsonschema.NewSerializerConfig()
		serdes.AutoRegisterSchemas = false
		serdes.EnableValidation = true
		serdes.UseLatestVersion = true
		serializer, err = jsonschema.NewSerializer(client, serde.ValueSerde, serdes)

		t_SalesBasket, eventTimestamp, storeName, err := constructFakeBasket()

		valueBytes, err := serializer.Serialize(vKafka.BasketTopicname, &t_SalesBasket)
                 
               	kafkaMsg := kafka.Message{
			TopicPartition: kafka.TopicPartition{
					Topic:     &vKafka.BasketTopicname,
					Partition: kafka.PartitionAny,
				},
			Value:   valueBytes,        // This is the payload/body thats being posted
			Key:     []byte(storeName), // We us this to group the same transactions together in order, IE submitting/Debtor Bank.
			Headers: []kafka.Header{{Key: "myBasketTopicnameHeader", Value: []byte("header values are binary")}},
			}
}

Basic steps packaging of constructFakeBasket() Skipping some (lots :wink: ) bits of course…

func constructFakeBasket() (t_Basket types.Tp_basket  eventTimestamp time.Time, storeName string, err error) {

	t_Basket = types.Tp_basket{
		InvoiceNumber: txnId,
		SaleDateTime:  eventTime,
		SaleTimestamp: fmt.Sprint(eventTimestamp.UnixMilli()),
		Store:         store,
		Clerk:         clerk,
		TerminalPoint: strconv.Itoa(terminalPoint),
		BasketItems:   arBasketItems,
		Nett:          nett_amount,
		VAT:           vat_amount,
		Total:         total_amount,
	}
        return t_Basket
}

OK I think the issue is that you’re using JSON Schema so should use the io.confluent.connect.json.JsonSchemaConverter converter in your connector config. The converter in your config (org.apache.kafka.connect.json.JsonConverter) plus value.converter.schemas.enable=true is a way to support schemas embedded in JSON without Schema Registry.

1 Like

thanks. seems I thought I was actually doing the non embedded schema version.

G

hi …

ok so i have 2 topics i build up, a sales basket and a sales payment… done exactly the same, you solution above worked on sales basket… getting the sink to push the payloads into the mongodb… now the weird one, for sales payments I’m getting the below in the mongodb collection, did check the topic and the data as is suppose to be…

_id

:

ObjectId(‘6675d5f6cf58c5310bc9e770’)
Paid:null
FinTransactionID:null
PayDateTime:null
InvoiceNumber:null
PayTimetamp:null

Need to log of for the evening, will recheck everything tomorrow, see if I made a mistake somewhere…

G

hi hi, back to this one…

got the sink to work, using your above…
tried to create a ksqldb stream on the topic, getting the following… looks very similar to above.

ksqldb-server | [2024-06-24 07:25:17,763] WARN stream-thread [_confluent-ksql-default_transient_transient_JSON_SALESBASKETS_6638912548585293568_1719213917602-b443d979-8c57-47ec-bee7-7a09150e0568-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[json_salesbaskets] partition=[0] offset=[99] (org.apache.kafka.streams.processor.internals.RecordDeserializer)

Fixed this one… mostly user error…
got the sink to Mongo working…

getting problems on ksqldb now and think it’s similar to the original sink problem above that we fixed by changing the value converter.

ok, @rmoff blog article around serialisation cleared it up again.
jsonschema depends on the schema-registry… other words the schema is not included in message via a schema tag.

so the connect job is working as we specified the confluent converter… wondering if the ksqldb is using the apache converter and breaking… and how to tell it use the confluent one rather. => io.confluent.connect.json.JsonSchemaConverter

G

checking some logs…

ksqldb-server | [2024-06-24 08:07:59,447] ERROR {“type”:0,“deserializationError”:{“target”:“value”,“errorMessage”:“Error deserializing message from topic: json_salesbaskets”,“recordB64”:null,“cause”:[“Cannot deserialize type float64 as type int32 for path: ->BASKETITEMS->ARRAY->QUANTITY”],“topic”:“json_salesbaskets”},“recordProcessingError”:null,“productionError”:null,“serializationError”:null,“kafkaStreamsThreadError”:null} (processing.transient_JSON_SALESBASKETS_6595300725991909842.KsqlTopic.Source.deserializer)

it’s pointing at the QUANTITY field… thats defined in the golang as a int. in the schema it’s defined as a number, not sure why/where it is thinking it’s a float64

Resolved

change
VALUE_FORMAT='JSON',
to

VALUE_FORMAT=JSON_SR,

changed
QUANTITY number
to
QUANTITY double

this last one did not make sense, as it’s just how many items was bought… but it worked so not going to argue…

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.