Joining via ksqldb 2 json docs

Hi all

Firstly I’ve been lazy and don’t have a schema… so ye shoot me :wink:

I want to join the following 2 docs, thats pushed to salesbasket and salespayments based in invoiceNumber, pusing to joined doc to salescompleted.

{
 "invoiceNumber": "252b6376-6110-4447-b68d-573abdc900ce",
 "saleDateTime": "2024-06-05T15:18:55+02:00",
 "store": {
  "id": "324213410",
  "name": "Milnerton"
 },
 "clerk": {
  "id": "10012",
  "name": "Michael"
 },
 "terminalPoint": "8",
 "basketItems": [
  {
   "id": "000000027",
   "name": "Knorr Spaghetti Bolognaise Dry Cook In Sauce 48g",
   "brand": "Knorr",
   "category": "Food Cupboard",
   "price": 19.99,
   "quantity": 1
  },
  {
   "id": "000000033",
   "name": "PnP 2 Ply White Toilet Paper 18 Pack",
   "brand": "PnP",
   "category": "Cleaning",
   "price": 134.99,
   "quantity": 2
  },
  {
   "id": "000000034",
   "name": "Kellogg's Coco Pops Original 350g",
   "brand": "Kellogg's",
   "category": "Food Cupboard",
   "price": 55.99,
   "quantity": 2
  },
  {
   "id": "000000016",
   "name": "KFC Street Wise 2",
   "brand": "KFC",
   "category": "Food Cupboard",
   "price": 20.99,
   "quantity": 5
  },
  {
   "id": "000000009",
   "name": "Nescafe Salted Caramel 180g 10 Sachets",
   "brand": "Nescafe",
   "category": "Food Cupboard",
   "price": 74,
   "quantity": 2
  },
  {
   "id": "000000021",
   "name": "Stork Country 40% Fat Spread 1kg",
   "brand": "Stork",
   "category": "Food Cupboard",
   "price": 46.99,
   "quantity": 2
  },
  {
   "id": "000000041",
   "name": "Castle Lite NRB 24 x 330ml",
   "brand": "Castle",
   "category": "Beverage",
   "price": 269.99,
   "quantity": 4
  }
 ],
 "nett": 1828.84,
 "vat": 256.04,
 "total": 2084.88
}

and

{
 "invoiceNumber": "252b6376-6110-4447-b68d-573abdc900ce",
 "payDateTime": "2024-06-05T15:23:40+02:00",
 "paid": 2084.88,
 "finTransactionId": "9bf017a7-5a86-43a3-a787-b60e8a429127"
}```

What’s your question?

how would join them into a new topic called sales completed.

G

What did you try? Kafka Streams? ksqlDB? Flink?

total noob here.
aware that link is currently not included with the docker-compose.yaml file.
so for now thinking streams and/or ksqldb…
when i did try streams it complained about json and schema… so guess need help with that as a start.
G

maybe some background…
assume i am getting this data pushed to me… 2 topics. and they are json based.
best would be to ask the producer, to create a schema…
lets now assume i’m the producer, i have a golang app creating the topic on the fly and pushing data.
I now need to create a schema… how ?
as it seems both streams and ksqldb is dependant on schema’s as a start, before i can do a select/join (on invoiceNumber, and if I follow Robin’s video i would key it on invoiceNumber) to push joined data to new topic.

G

building on this…
loaded the following 2 schema’s… CC accepted it. Don’t know how to specify the primary key field/value though.

Salespayment

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "properties": {
    "FinTransactionID": {
      "type": "string"
    },
    "InvoiceNumber": {
      "type": "string"
    },
    "Paid": {
      "type": "number"
    },
    "PayDateTime": {
      "type": "string"
    }
  },
  "required": [
    "InvoiceNumber",
    "PayDateTime",
    "Paid",
    "FinTransactionID"
  ],
  "type": "object"
}

Salesbasket

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "properties": {
    "BasketItems": {
      "items": [
        {
          "properties": {
            "Brand": {
              "type": "string"
            },
            "Catergory": {
              "type": "string"
            },
            "Id": {
              "type": "string"
            },
            "Name": {
              "type": "string"
            },
            "Price": {
              "type": "number"
            },
            "Quantity": {
              "type": "integer"
            }
          },
          "required": [
            "Id",
            "Name",
            "Brand",
            "Catergory",
            "Price",
            "Quantity"
          ],
          "type": "object"
        },
        {
          "properties": {
            "Brand": {
              "type": "string"
            },
            "Catergory": {
              "type": "string"
            },
            "Id": {
              "type": "string"
            },
            "Name": {
              "type": "string"
            },
            "Price": {
              "type": "number"
            },
            "Quantity": {
              "type": "integer"
            }
          },
          "required": [
            "Id",
            "Name",
            "Brand",
            "Catergory",
            "Price",
            "Quantity"
          ],
          "type": "object"
        },
        {
          "properties": {
            "Brand": {
              "type": "string"
            },
            "Catergory": {
              "type": "string"
            },
            "Id": {
              "type": "string"
            },
            "Name": {
              "type": "string"
            },
            "Price": {
              "type": "number"
            },
            "Quantity": {
              "type": "integer"
            }
          },
          "required": [
            "Id",
            "Name",
            "Brand",
            "Catergory",
            "Price",
            "Quantity"
          ],
          "type": "object"
        },
        {
          "properties": {
            "Brand": {
              "type": "string"
            },
            "Catergory": {
              "type": "string"
            },
            "Id": {
              "type": "string"
            },
            "Name": {
              "type": "string"
            },
            "Price": {
              "type": "number"
            },
            "Quantity": {
              "type": "integer"
            }
          },
          "required": [
            "Id",
            "Name",
            "Brand",
            "Catergory",
            "Price",
            "Quantity"
          ],
          "type": "object"
        },
        {
          "properties": {
            "Brand": {
              "type": "string"
            },
            "Catergory": {
              "type": "string"
            },
            "Id": {
              "type": "string"
            },
            "Name": {
              "type": "string"
            },
            "Price": {
              "type": "number"
            },
            "Quantity": {
              "type": "integer"
            }
          },
          "required": [
            "Id",
            "Name",
            "Brand",
            "Catergory",
            "Price",
            "Quantity"
          ],
          "type": "object"
        }
      ],
      "type": "array"
    },
    "Clerk": {
      "properties": {
        "Id": {
          "type": "string"
        },
        "Name": {
          "type": "string"
        }
      },
      "required": [
        "Id",
        "Name"
      ],
      "type": "object"
    },
    "InvoiceNumber": {
      "type": "string"
    },
    "Net": {
      "type": "number"
    },
    "SaleDateTime": {
      "type": "string"
    },
    "Store": {
      "properties": {
        "Id": {
          "type": "string"
        },
        "Name": {
          "type": "string"
        }
      },
      "required": [
        "Id",
        "Name"
      ],
      "type": "object"
    },
    "TerminalPoint": {
      "type": "string"
    },
    "Total": {
      "type": "number"
    },
    "VAT": {
      "type": "number"
    }
  },
  "required": [
    "InvoiceNumber",
    "SaleDateTime",
    "Store",
    "Clerk",
    "TerminalPoint",
    "BasketItems",
    "Net",
    "VAT",
    "Total"
  ],
  "type": "object"
}

Found the following site that generates a json schema from json given

Maybe you should get started with some tutorials: Apache Kafka® Tutorials and Recipes by Confluent

You don’t have a specific question, and a forum is not a good way to bootstrap yourself, asking too generic questions.

Can’t see how my question is to generic.
I want to join 2 messages on 2 topics based on a common field… and push the new combined value to a 3rd topic.
G

You did no even say what tool you want do use.

Later you said maybe Kafka Streams. So just try to write some code:

StreamBuilder builder = new StreamsBuilder();

builder.stream(...).join(builder.stream(...), ...).to(...);

Generic (not very useful answer?) to a generic question? I would like to help, but not sure how. It’s not a very specific question, so I can only give generic answers and send links to tutorials and docs…

maybe because i don’t know how… i just know i got 2 incoming topics with that message on it… help me, explain to me the options and why one would be better than another. most of the written pieces out there is from that writers view point the best. I have also not bitten and done nothing, look at my first question, and then how i’ve updated it. realising I need a schema, Ive actually gone and created the schema. so i’m also trying, reading…

G