Is there a producer I can use to send a JSON payload

what I have working is below:

producer.js

const kafka = require('./client')

async function main() {
    const producer = kafka.producer()

    await producer.connect()

    for (let i = 0; i < 10; i++) {
        await producer.send({
            topic: 'quickStart',
            messages: [
                { value: `message ${i}` },
            ],
        })

        console.log(`producing message ${I}`)
    }

    await producer.disconnect()

    console.log('done.')
}

main()

and
client.js

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9091', 'localhost:9092', 'localhost:9093']
})

module.exports = kafka

Is there a way I can send the following JSON to the cluster?

[{key: dateTime, value: {"eifFormVersion":"1","submittedDate":"5/26/2023, 09:18 AM EDT","TRANSLATION_SERVICES":false,"CAREER_INTERESTS_UNSURE":false,"PHONE_CONDITIONAL":false,"APT":"","CAREER_INTERESTS":[2],"CONTACT_METHOD":3,"NEXT_STEPS":5,"CAREER_INTERESTS_COMMENTS":"","PRONOUN":0,"WHO":1,"FIRST":"Jason","PREFERRED_NAME":"TEST","MIDDLE":"B","NO_MIDDLE":false,"LAST":"Smith","DOB":"2000-10-12T04:00:00.000Z","GENDER":"M","STREET_ADDRESS":"111 Turnberry Way","STATE":"NY","CITY":"NYC","ZIP":"45123","PHONE":"1238675309","RESIDENCY_STATUS":false,"SELECT_ADDRESS_A":[1],"UUID":"8bba58d4-9a30-4850-9f7b-853c2f856b4a","email":"json_jason@test.com"}}]

I tried to use one of the shell scripts provided with Kafka distribution, but got errors:

 ./kafka-producer-perf-test.sh  --topic quickStart --producer-props bootstrap.server=kafka2:29092 --throughput -1 --num-records 2 --payload-file payload_2.json
Reading payloads from: /Users/syedahmed/QA/TOOLS/KAFKA/kafka_2.12-3.3.2/bin/payload_2.json
Number of messages read: 1
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:471)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)
	at org.apache.kafka.tools.ProducerPerformance.createKafkaProducer(ProducerPerformance.java:164)
	at org.apache.kafka.tools.ProducerPerformance.start(ProducerPerformance.java:85)
	at org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:52)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89)
	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:445)
	... 6 more

Can you please clarify your problem more?

Kafka producer perf test being able to resolve the Kafka hostname you’ve provided is unrelated to any data format you’d like to send from Javascript (which, yes, you can, starting by replacing message ${i} with what you’ve pasted in the question).

Sorry, perhaps I should be more clear. I mixed up 2 solutions I tried:

  1. javascript, (producer.js) with message ${I}
  2. using ./kafka-producer-perf-test.sh

Now, regarding # 1: when I try to use JSON, replacing message ${I}: producer_json.js

onst kafka = require('./client')

async function main() {
    const producer = kafka.producer()

    await producer.connect()

    for (let i = 0; i < 10; i++) {
        await producer.send({
            topic: 'quickStart',
            messages: [
               //{key: "${i}", value: "${i}@example.com"},
               //{key: dateTime, value: "${i}@example.com"},
               {key: dateTime, value:"${i}@example.com"},
            ],
        })

        console.log(`producing message ${i}`)
    }

    await producer.disconnect()

    console.log('done.')
}

main()

$node producer_json.js I get error:

/Users/syedahmed/QA/TOOLS/KAFKA/LOCAL/kafka-local-cluster/node-kafka-demo/producer_json.js:14
               {key: dateTime, value:"${i}@bah.com"},
                     ^

ReferenceError: dateTime is not defined
    at main (/Users/syedahmed/QA/TOOLS/KAFKA/LOCAL/kafka-local-cluster/node-kafka-demo/producer_json.js:14:22)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)

Node.js v20.2.0

and when I try to replace ${I} with

{key: dateTime, value: {"eifFormVersion":"1","submittedDate":"5/26/2023, 09:18 AM EDT","TRANSLATION_SERVICES":false,"CAREER_INTERESTS_UNSURE":false,"PHONE_CONDITIONAL":false,"APT":"","CAREER_INTERESTS":[2],"CONTACT_METHOD":3,"NEXT_STEPS":5,"CAREER_INTERESTS_COMMENTS":"","PRONOUN":0,"WHO":1,"FIRST":"Jason","PREFERRED_NAME":"TEST","MIDDLE":"B","NO_MIDDLE":false,"LAST":"Smith","DOB":"2000-10-12T04:00:00.000Z","GENDER":"M","STREET_ADDRESS":"156 Turnberry Way","STATE":"AK","CITY":"NYC","ZIP":"45123","PHONE":"1238675309","RESIDENCY_STATUS":false,"SELECT_ADDRESS_A":[1],"UUID":"8bba58d4-9a30-4850-9f7b-853c2f856b4a","email”:”jane_doe@example.com"}},

I get:

 node producer_json.js
/Users/syedahmed/QA/TOOLS/KAFKA/LOCAL/kafka-local-cluster/node-kafka-demo/producer_json.js:15
               {key: dateTime, value: {"eifFormVersion":"1","submittedDate":"5/26/2023, 09:18 AM EDT","TRANSLATION_SERVICES":false,"CAREER_INTERESTS_UNSURE":false,"PHONE_CONDITIONAL":false,"APT":"","CAREER_INTERESTS":[2],"CONTACT_METHOD":3,"NEXT_STEPS":5,"CAREER_INTERESTS_COMMENTS":"","PRONOUN":0,"WHO":1,"FIRST":"Jason","PREFERRED_NAME":"TEST","MIDDLE":"B","NO_MIDDLE":false,"LAST":"Smith","DOB":"2000-10-12T04:00:00.000Z","GENDER":"M","STREET_ADDRESS":"156 Turnberry Way","STATE":"AK","CITY":"NYC","ZIP":"45123","PHONE":"1238675309","RESIDENCY_STATUS":false,"SELECT_ADDRESS_A":[1],"UUID":"8bba58d4-9a30-4850-9f7b-853c2f856b4a","email”:”jane_doe@example.com"}},
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

SyntaxError: Unexpected string

as for # 2:

./kafka-producer-perf-test.sh  --topic quickStart --producer-props bootstrap.server=127.0.0.1:29092 --throughput -1 --num-records 2 --payload-file payload_2.json
Reading payloads from: /Users/syedahmed/QA/TOOLS/KAFKA/kafka_2.12-3.3.2/bin/payload_2.json
Number of messages read: 1
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer

trying to send json using kafka-producer-perf-test.sh on the same host which is running the docker images.

I’m not sure I understand the problem, still. In both cases, you have a syntax error. Use an IDE that’ll highlight this for you, rather than just code in notepad, for example.

You can also use JSON.stringify() function, or extract the value payload to a separate variable or function. It does not need to be in-lined value within the messages array.

I don’t personally use the perf test tool, and you don’t need it to produce JSON data.