Hi there
Getting the following error when executing the flink example steps.
Probably user error…
– Error
flink-jobmanager | 2024-07-01 13:44:34,060 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Values[3] (1/1) (588f9ef6e8f8d79402dc817e221e436b_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from INITIALIZING to RUNNING.
flink-jobmanager | 2024-07-01 13:44:34,071 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - movie_ticket_sales[4]: Writer -> movie_ticket_sales[4]: Committer (1/1) (588f9ef6e8f8d79402dc817e221e436b_20ba6b65f97481d5570070de90e4e791_0_0) switched from INITIALIZING to RUNNING.
flink-jobmanager | 2024-07-01 13:44:34,099 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Values[3] (1/1) (588f9ef6e8f8d79402dc817e221e436b_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from RUNNING to FINISHED.
flink-jobmanager | 2024-07-01 13:45:18,540 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - movie_ticket_sales[2]: Writer -> movie_ticket_sales[2]: Committer (1/1) (4362ecf746301dca3ed0bbb3ef4f4749_20ba6b65f97481d5570070de90e4e791_0_0) switched from RUNNING to FAILED on 172.20.0.8:33315-3d958e @ flink-taskmanager.aws-kafka-confluent_default (dataPort=44397).
flink-jobmanager | org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Topic movie-ticket-sales not present in metadata after 60000 ms.
flink-jobmanager | 2024-07-01 13:45:18,545 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 9d1a6c9290b1bd0fa1dca1ea7c2b20a5
flink-jobmanager | 2024-07-01 13:45:18,547 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job insert-into_default_catalog.default_database.movie_ticket_sales (9d1a6c9290b1bd0fa1dca1ea7c2b20a5) switched from state RUNNING to FAILING.
flink-jobmanager | org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
flink-jobmanager | at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at jdk.internal.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) ~[?:?]
flink-jobmanager | at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
flink-jobmanager | at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
flink-jobmanager | at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
flink-jobmanager | at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
flink-jobmanager | at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
flink-jobmanager | at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
flink-jobmanager | Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Topic movie-ticket-sales not present in metadata after 60000 ms.
flink-jobmanager | 2024-07-01 13:45:18,551 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Discarding the results produced by task execution 4362ecf746301dca3ed0bbb3ef4f4749_bc764cd8ddf7a0cff126f51c16239658_0_0.
flink-jobmanager | 2024-07-01 13:45:18,551 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job insert-into_default_catalog.default_database.movie_ticket_sales (9d1a6c9290b1bd0fa1dca1ea7c2b20a5) switched from state FAILING to FAILED.
flink-jobmanager | org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
flink-jobmanager | at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477) ~[flink-dist-1.16.0.jar:1.16.0]
flink-jobmanager | at jdk.internal.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) ~[?:?]
flink-jobmanager | at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
flink-jobmanager | at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_aacbcb3c-0908-4aab-9922-cbd24c1651ff.jar:1.16.0]
flink-jobmanager | at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
flink-jobmanager | at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
flink-jobmanager | at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
flink-jobmanager | at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
flink-jobmanager | at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
flink-jobmanager | Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Topic movie-ticket-sales not present in metadata after 60000 ms.
flink-jobmanager | 2024-07-01 13:45:18,552 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping checkpoint coordinator for job 9d1a6c9290b1bd0fa1dca1ea7c2b20a5.
flink-jobmanager | 2024-07-01 13:45:18,555 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 9d1a6c9290b1bd0fa1dca1ea7c2b20a5 reached terminal state FAILED.
flink-jobmanager | org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
flink-jobmanager | at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
flink-jobmanager | at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
flink-jobmanager | at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
flink-jobmanager | at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
flink-jobmanager | at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
flink-jobmanager | at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
flink-jobmanager | at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
flink-jobmanager | at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
flink-jobmanager | at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
flink-jobmanager | at jdk.internal.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
flink-jobmanager | at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
flink-jobmanager | at java.base/java.lang.reflect.Method.invoke(Unknown Source)
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
flink-jobmanager | at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
flink-jobmanager | at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
flink-jobmanager | at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
flink-jobmanager | at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
flink-jobmanager | at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
flink-jobmanager | at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
flink-jobmanager | at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
flink-jobmanager | at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
flink-jobmanager | at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
flink-jobmanager | at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
flink-jobmanager | at akka.actor.Actor.aroundReceive(Actor.scala:537)
flink-jobmanager | at akka.actor.Actor.aroundReceive$(Actor.scala:535)
flink-jobmanager | at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
flink-jobmanager | at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
flink-jobmanager | at akka.actor.ActorCell.invoke(ActorCell.scala:548)
flink-jobmanager | at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
flink-jobmanager | at akka.dispatch.Mailbox.run(Mailbox.scala:231)
flink-jobmanager | at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
flink-jobmanager | at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
flink-jobmanager | at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
flink-jobmanager | at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
flink-jobmanager | at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
flink-jobmanager | at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
flink-jobmanager | Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Topic movie-ticket-sales not present in metadata after 60000 ms.
flink-jobmanager | 2024-07-01 13:45:18,561 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 9d1a6c9290b1bd0fa1dca1ea7c2b20a5 has been registered for cleanup in the JobResultStore after reaching a terminal state.
flink-jobmanager | 2024-07-01 13:45:18,567 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job 'insert-into_default_catalog.default_database.movie_ticket_sales' (9d1a6c9290b1bd0fa1dca1ea7c2b20a5).
flink-jobmanager | 2024-07-01 13:45:18,570 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down
flink-jobmanager | 2024-07-01 13:45:18,570 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Disconnect TaskExecutor 172.20.0.8:33315-3d958e because: Stopping JobMaster for job 'insert-into_default_catalog.default_database.movie_ticket_sales' (9d1a6c9290b1bd0fa1dca1ea7c2b20a5).
– create table
CREATE TABLE movie_ticket_sales (
title STRING,
sales_ts STRING,
total_ticket_value INT
) WITH (
'connector' = 'kafka',
'topic' = 'movie-ticket-sales',
'properties.bootstrap.servers' = 'broker:9092',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'raw',
'key.fields' = 'title',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://schema-registry:8081',
'value.fields-include' = 'EXCEPT_KEY'
);
– insert records
INSERT INTO movie_ticket_sales VALUES
('Aliens', '2019-07-18T10:00:00Z', 10),
('Die Hard', '2019-07-18T10:00:00Z', 12),
('Die Hard', '2019-07-18T10:01:00Z', 12),
('The Godfather', '2019-07-18T10:01:31Z', 12),
('Die Hard', '2019-07-18T10:01:36Z', 24),
('The Godfather', '2019-07-18T10:02:00Z', 18),
('The Big Lebowski', '2019-07-18T11:03:21Z', 12),
('The Big Lebowski', '2019-07-18T11:03:50Z', 12),
('The Godfather', '2019-07-18T11:40:00Z', 36),
('The Godfather', '2019-07-18T11:40:09Z', 18);
– select … group by. - Return no data, as expected as above failed already.
SELECT title,
COUNT(total_ticket_value) AS tickets_sold
FROM movie_ticket_sales
GROUP BY title;
– I’m including my docker-compose.yaml file used to spin up my cp kafka and flink environment, I can confirm that a topic is created, but no data makes to it…
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.6.1
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
- "29092:29092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
schema-registry:
image: confluentinc/cp-schema-registry:7.6.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
build:
context: .
dockerfile: Dockerfile
# image: cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0
image: kafka-connect-custom:1.2
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.6.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
volumes:
- ./demo-scene/csv-to-kafka/data:/data
control-center:
image: confluentinc/cp-enterprise-control-center:7.6.1
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.6.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.6.1
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
volumes:
- ./kSQL:/data
ksql-datagen:
image: confluentinc/ksqldb-examples:7.6.1
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.6.1
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
flink-sql-client:
image: cnfldemos/flink-sql-client-kafka:1.16.0-scala_2.12-java11
hostname: flink-sql-client
container_name: flink-sql-client
depends_on:
- flink-jobmanager
environment:
FLINK_JOBMANAGER_HOST: flink-jobmanager
volumes:
- ./settings/:/settings
flink-jobmanager:
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
hostname: flink-jobmanager
container_name: flink-jobmanager
ports:
- 9081:9081
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
rest.bind-port: 9081
flink-taskmanager:
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
hostname: flink-taskmanager
container_name: flink-taskmanager
depends_on:
- flink-jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 10
kafkacat-cli:
image: confluentinc/cp-kcat:latest
hostname: kafkacat-cli
container_name: kafkacat
depends_on:
- broker
- schema-registry
- connect
entrypoint: /bin/bash -i
tty: true
mysql:
image: mysql:8.0
hostname: mysql
container_name: mysql
ports:
- "3306:3306"
restart: always
environment:
MYSQL_ROOT_PASSWORD: dumdumb
postgresql:
image: postgres:14
hostname: postgresql
container_name: postgresql
ports:
- "5432:5432"
environment:
POSTGRES_PASSWORD: abfr24
mongo:
image: mongodb/mongodb-atlas-local:latest
hostname: mongodb
container_name: mongodb
ports:
- "27017:27017"
Connect Dockerfile
FROM confluentinc/cp-kafka-connect-base:latest
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN echo "Installing Connector Plugins"
RUN confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.43
RUN confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.12.0
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
RUN confluent-hub install --no-prompt markteehan/file-chunk-sink:latest
RUN confluent-hub install --no-prompt tabular/iceberg-kafka-connect:latest
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
USER root
RUN wget -O /usr/share/java/kafka/mysql-connector-j-8.4.0.jar \
https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.4.0/mysql-connector-j-8.4.0.jar
USER appuser
# docker build -t kafka-connect-custom:1.0 .