Flink Demo/Example Failing

Hi there

Flink Example being followed.

Getting the following error when executing the flink example steps.

Probably user error…

– Error

flink-jobmanager  | 2024-07-01 13:44:34,060 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Values[3] (1/1) (588f9ef6e8f8d79402dc817e221e436b_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from INITIALIZING to RUNNING.
flink-jobmanager  | 2024-07-01 13:44:34,071 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - movie_ticket_sales[4]: Writer -> movie_ticket_sales[4]: Committer (1/1) (588f9ef6e8f8d79402dc817e221e436b_20ba6b65f97481d5570070de90e4e791_0_0) switched from INITIALIZING to RUNNING.
flink-jobmanager  | 2024-07-01 13:44:34,099 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Values[3] (1/1) (588f9ef6e8f8d79402dc817e221e436b_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from RUNNING to FINISHED.
flink-jobmanager  | 2024-07-01 13:45:18,540 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - movie_ticket_sales[2]: Writer -> movie_ticket_sales[2]: Committer (1/1) (4362ecf746301dca3ed0bbb3ef4f4749_20ba6b65f97481d5570070de90e4e791_0_0) switched from RUNNING to FAILED on 172.20.0.8:33315-3d958e @ flink-taskmanager.aws-kafka-confluent_default (dataPort=44397).
flink-jobmanager  | org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Topic movie-ticket-sales not present in metadata after 60000 ms.
flink-jobmanager  | 2024-07-01 13:45:18,545 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 9d1a6c9290b1bd0fa1dca1ea7c2b20a5
flink-jobmanager  | 2024-07-01 13:45:18,547 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job insert-into_default_catalog.default_database.movie_ticket_sales (9d1a6c9290b1bd0fa1dca1ea7c2b20a5) switched from state RUNNING to FAILING.
flink-jobmanager  | org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
flink-jobmanager  | 	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at jdk.internal.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) ~[?:?]
flink-jobmanager  | 	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
flink-jobmanager  | 	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
flink-jobmanager  | 	at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
flink-jobmanager  | 	at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
flink-jobmanager  | 	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
flink-jobmanager  | 	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
flink-jobmanager  | Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Topic movie-ticket-sales not present in metadata after 60000 ms.
flink-jobmanager  | 2024-07-01 13:45:18,551 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding the results produced by task execution 4362ecf746301dca3ed0bbb3ef4f4749_bc764cd8ddf7a0cff126f51c16239658_0_0.
flink-jobmanager  | 2024-07-01 13:45:18,551 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job insert-into_default_catalog.default_database.movie_ticket_sales (9d1a6c9290b1bd0fa1dca1ea7c2b20a5) switched from state FAILING to FAILED.
flink-jobmanager  | org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
flink-jobmanager  | 	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager  | 	at jdk.internal.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) ~[?:?]
flink-jobmanager  | 	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
flink-jobmanager  | 	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager  | 	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
flink-jobmanager  | 	at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
flink-jobmanager  | 	at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
flink-jobmanager  | 	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
flink-jobmanager  | 	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
flink-jobmanager  | Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Topic movie-ticket-sales not present in metadata after 60000 ms.
flink-jobmanager  | 2024-07-01 13:45:18,552 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping checkpoint coordinator for job 9d1a6c9290b1bd0fa1dca1ea7c2b20a5.
flink-jobmanager  | 2024-07-01 13:45:18,555 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 9d1a6c9290b1bd0fa1dca1ea7c2b20a5 reached terminal state FAILED.
flink-jobmanager  | org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
flink-jobmanager  | 	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
flink-jobmanager  | 	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
flink-jobmanager  | 	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
flink-jobmanager  | 	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
flink-jobmanager  | 	at jdk.internal.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
flink-jobmanager  | 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
flink-jobmanager  | 	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
flink-jobmanager  | 	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
flink-jobmanager  | 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
flink-jobmanager  | 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
flink-jobmanager  | 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
flink-jobmanager  | 	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
flink-jobmanager  | 	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
flink-jobmanager  | 	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
flink-jobmanager  | 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
flink-jobmanager  | 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
flink-jobmanager  | 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
flink-jobmanager  | 	at akka.actor.Actor.aroundReceive(Actor.scala:537)
flink-jobmanager  | 	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
flink-jobmanager  | 	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
flink-jobmanager  | 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
flink-jobmanager  | 	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
flink-jobmanager  | 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
flink-jobmanager  | 	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
flink-jobmanager  | 	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
flink-jobmanager  | 	at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
flink-jobmanager  | 	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
flink-jobmanager  | 	at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
flink-jobmanager  | 	at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
flink-jobmanager  | 	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
flink-jobmanager  | Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Topic movie-ticket-sales not present in metadata after 60000 ms.
flink-jobmanager  | 2024-07-01 13:45:18,561 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 9d1a6c9290b1bd0fa1dca1ea7c2b20a5 has been registered for cleanup in the JobResultStore after reaching a terminal state.
flink-jobmanager  | 2024-07-01 13:45:18,567 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Stopping the JobMaster for job 'insert-into_default_catalog.default_database.movie_ticket_sales' (9d1a6c9290b1bd0fa1dca1ea7c2b20a5).
flink-jobmanager  | 2024-07-01 13:45:18,570 INFO  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down
flink-jobmanager  | 2024-07-01 13:45:18,570 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Disconnect TaskExecutor 172.20.0.8:33315-3d958e because: Stopping JobMaster for job 'insert-into_default_catalog.default_database.movie_ticket_sales' (9d1a6c9290b1bd0fa1dca1ea7c2b20a5).

– create table

CREATE TABLE movie_ticket_sales (
    title STRING,
    sales_ts STRING,
    total_ticket_value INT
) WITH (
    'connector' = 'kafka',
    'topic' = 'movie-ticket-sales',
    'properties.bootstrap.servers' = 'broker:9092',
    'scan.startup.mode' = 'earliest-offset',
    'key.format' = 'raw',
    'key.fields' = 'title',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://schema-registry:8081',
    'value.fields-include' = 'EXCEPT_KEY'
);

– insert records

INSERT INTO movie_ticket_sales VALUES
    ('Aliens', '2019-07-18T10:00:00Z', 10),
    ('Die Hard', '2019-07-18T10:00:00Z', 12),
    ('Die Hard', '2019-07-18T10:01:00Z', 12),
    ('The Godfather', '2019-07-18T10:01:31Z', 12),
    ('Die Hard', '2019-07-18T10:01:36Z', 24),
    ('The Godfather', '2019-07-18T10:02:00Z', 18),
    ('The Big Lebowski', '2019-07-18T11:03:21Z', 12),
    ('The Big Lebowski', '2019-07-18T11:03:50Z', 12),
    ('The Godfather', '2019-07-18T11:40:00Z', 36),
    ('The Godfather', '2019-07-18T11:40:09Z', 18);

– select … group by. - Return no data, as expected as above failed already.

SELECT title,
       COUNT(total_ticket_value) AS tickets_sold
FROM movie_ticket_sales
GROUP BY title;

– I’m including my docker-compose.yaml file used to spin up my cp kafka and flink environment, I can confirm that a topic is created, but no data makes to it…

version: '2'
services:
  broker:
    image: confluentinc/cp-kafka:7.6.1
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
      - "29092:29092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" 
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    build:
      context: .
      dockerfile: Dockerfile
    # image: cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0
    image: kafka-connect-custom:1.2
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.6.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    volumes:
      - ./demo-scene/csv-to-kafka/data:/data
      
  control-center:
    image: confluentinc/cp-enterprise-control-center:7.6.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.6.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.6.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
    volumes:
      - ./kSQL:/data
      
  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.6.1
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.6.1
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

  flink-sql-client:
    image: cnfldemos/flink-sql-client-kafka:1.16.0-scala_2.12-java11
    hostname: flink-sql-client
    container_name: flink-sql-client
    depends_on:
      - flink-jobmanager
    environment:
      FLINK_JOBMANAGER_HOST: flink-jobmanager
    volumes:
      - ./settings/:/settings
      
  flink-jobmanager:
    image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
    hostname: flink-jobmanager
    container_name: flink-jobmanager
    ports:
      - 9081:9081
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
        rest.bind-port: 9081

  flink-taskmanager:
    image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
    hostname: flink-taskmanager
    container_name: flink-taskmanager
    depends_on:
      - flink-jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 10
        
  kafkacat-cli:
    image: confluentinc/cp-kcat:latest
    hostname: kafkacat-cli
    container_name: kafkacat
    depends_on:
      - broker
      - schema-registry
      - connect
    entrypoint: /bin/bash -i
    tty: true

  mysql:
    image: mysql:8.0
    hostname: mysql
    container_name: mysql
    ports:
      - "3306:3306"
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: dumdumb

  postgresql:
    image: postgres:14
    hostname: postgresql
    container_name: postgresql
    ports:
      - "5432:5432"
    environment:
      POSTGRES_PASSWORD: abfr24
  
  mongo:
    image: mongodb/mongodb-atlas-local:latest
    hostname: mongodb
    container_name: mongodb
    ports:
      - "27017:27017" 

Connect Dockerfile

FROM confluentinc/cp-kafka-connect-base:latest

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN echo "Installing Connector Plugins"
RUN confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.43
RUN confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.12.0
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
RUN confluent-hub install --no-prompt markteehan/file-chunk-sink:latest
RUN confluent-hub install --no-prompt tabular/iceberg-kafka-connect:latest
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest

USER root
RUN wget -O /usr/share/java/kafka/mysql-connector-j-8.4.0.jar \
    https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.4.0/mysql-connector-j-8.4.0.jar
USER appuser

# docker build -t kafka-connect-custom:1.0 .

Strange… issuing the following into flink client worked…
now to figure out what the difference is with above.

Fix Found: Broker port!!

Create table

CREATE TABLE movie_ticket_sales (
    title STRING,
    sales_ts STRING,
    total_ticket_value INT
) WITH (
    'connector' = 'kafka',
    'topic' = 'movie-ticket-sales',
    'properties.bootstrap.servers' = 'broker:29092',
    'scan.startup.mode' = 'earliest-offset',
    'key.format' = 'raw',
    'key.fields' = 'title',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = 'http://schema-registry:8081',
    'value.fields-include' = 'EXCEPT_KEY'
);

insert

INSERT INTO movie_ticket_sales VALUES
    ('Aliens', '2019-07-18T10:00:00Z', 10),
    ('Die Hard', '2019-07-18T10:00:00Z', 12),
    ('Die Hard', '2019-07-18T10:01:00Z', 12),
    ('The Godfather', '2019-07-18T10:01:31Z', 12),
    ('Die Hard', '2019-07-18T10:01:36Z', 24),
    ('The Godfather', '2019-07-18T10:02:00Z', 18),
    ('The Big Lebowski', '2019-07-18T11:03:21Z', 12),
    ('The Big Lebowski', '2019-07-18T11:03:50Z', 12),
    ('The Godfather', '2019-07-18T11:40:00Z', 36),
    ('The Godfather', '2019-07-18T11:40:09Z', 18);

select

SELECT title,
       COUNT(total_ticket_value) AS tickets_sold
FROM movie_ticket_sales
GROUP BY title;

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.