what I have working is below:
producer.js
const kafka = require('./client')
async function main() {
const producer = kafka.producer()
await producer.connect()
for (let i = 0; i < 10; i++) {
await producer.send({
topic: 'quickStart',
messages: [
{ value: `message ${i}` },
],
})
console.log(`producing message ${I}`)
}
await producer.disconnect()
console.log('done.')
}
main()
and
client.js
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9091', 'localhost:9092', 'localhost:9093']
})
module.exports = kafka
Is there a way I can send the following JSON to the cluster?
[{key: dateTime, value: {"eifFormVersion":"1","submittedDate":"5/26/2023, 09:18 AM EDT","TRANSLATION_SERVICES":false,"CAREER_INTERESTS_UNSURE":false,"PHONE_CONDITIONAL":false,"APT":"","CAREER_INTERESTS":[2],"CONTACT_METHOD":3,"NEXT_STEPS":5,"CAREER_INTERESTS_COMMENTS":"","PRONOUN":0,"WHO":1,"FIRST":"Jason","PREFERRED_NAME":"TEST","MIDDLE":"B","NO_MIDDLE":false,"LAST":"Smith","DOB":"2000-10-12T04:00:00.000Z","GENDER":"M","STREET_ADDRESS":"111 Turnberry Way","STATE":"NY","CITY":"NYC","ZIP":"45123","PHONE":"1238675309","RESIDENCY_STATUS":false,"SELECT_ADDRESS_A":[1],"UUID":"8bba58d4-9a30-4850-9f7b-853c2f856b4a","email":"json_jason@test.com"}}]
I tried to use one of the shell scripts provided with Kafka distribution, but got errors:
./kafka-producer-perf-test.sh --topic quickStart --producer-props bootstrap.server=kafka2:29092 --throughput -1 --num-records 2 --payload-file payload_2.json
Reading payloads from: /Users/syedahmed/QA/TOOLS/KAFKA/kafka_2.12-3.3.2/bin/payload_2.json
Number of messages read: 1
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:471)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)
at org.apache.kafka.tools.ProducerPerformance.createKafkaProducer(ProducerPerformance.java:164)
at org.apache.kafka.tools.ProducerPerformance.start(ProducerPerformance.java:85)
at org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:52)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:445)
... 6 more