… think i saw on CP’s hub.docker a cp flink container, has anyone downloaded this and connected this to a confluent environment build using the docker-compose.yaml file ?
G
… think i saw on CP’s hub.docker a cp flink container, has anyone downloaded this and connected this to a confluent environment build using the docker-compose.yaml file ?
G
I’m not sure if this is what you’re referring to, but there is this Docker Compose file (example tutorial using it), where the Flink images are based on the Docker Official Flink image.
thanks, looking at it…
see requirement to create a flink compute pool. hope this is still possible when you running the CP environment locally inside docker via the CP docker-compose file.
G
See the demo/Git Repo/ environment include a docker-compose.yml file…
going to try and merge my current cp stack yaml and this one… create one super yaml that stand up the entire CP stack and the Flink stack…
G
The Flink SQL Client CLI
section runs the example fully in Docker, and then the Flink Compute Pool requirement is for the Confluent Cloud
section that runs the same tutorial against Confluent Cloud for Apache Flink.
will give it a go, don’t just want to run the example/course, want to connect it to my local cp in container stack and try and make it run against my lab topics…
G
Hi @dtroiano
See this is actually your repo/you doing updates on it…
is there a Blog article that talks to this example/tutorial.
G
this might be slightly long… code…
i’ve posting my compiled/combined docker-compose.yaml…
bits from the standard CP docker-compose + the bits re flink from your example…
curious… might there be a version incompatibility between the broker and your flink images?
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.6.1
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
- "29092:29092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
schema-registry:
image: confluentinc/cp-schema-registry:7.6.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
# image: cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0
image: kafka-connect-custom:1.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.6.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
volumes:
- ./demo-scene/csv-to-kafka/data:/data
control-center:
image: confluentinc/cp-enterprise-control-center:7.6.1
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.6.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.6.1
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.6.1
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.6.1
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
flink-sql-client:
image: cnfldemos/flink-sql-client-kafka:1.16.0-scala_2.12-java11
hostname: flink-sql-client
container_name: flink-sql-client
depends_on:
- flink-jobmanager
environment:
FLINK_JOBMANAGER_HOST: flink-jobmanager
volumes:
- ./settings/:/settings
flink-jobmanager:
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
hostname: flink-jobmanager
container_name: flink-jobmanager
ports:
- 9081:9081
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
rest.bind-port: 9081
flink-taskmanager:
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
hostname: flink-taskmanager
container_name: flink-taskmanager
depends_on:
- flink-jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 10
kafkacat-cli:
image: confluentinc/cp-kcat:latest
hostname: kafkacat-cli
container_name: kafkacat
depends_on:
- broker
- schema-registry
- connect
entrypoint: /bin/bash -i
tty: true
The Docker Compose file you shared seems to be working. I used cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0
rather than the custom image, and smoke tested by running this example and it worked as expected. (Note: you’ll have to change broker:9092
to broker:29092
)
What issue are you running into?
i can create table, i can then insert data… and it shows a topic created on kafka.
select just hangs/sits there, no data.
on kafka side if i look at topic no data shows…
the broker setting above, you referring to which container, the flinks?
As you will notice the broker expose both 90292 and 29092
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
G
curious, is there a gui available for flink, that i can add ?
G
note of the below warning issued on the insert statement
[INFO] Submitting SQL update statement to the cluster…
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/opt/flink/lib/flink-dist-1.16.0.jar) to field java.lang.Class.ANNOTATION
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 1a4ef18fcabb3b02395532afdc8e9f28
never mind, you meant the create table, i changed the broker address from 9092 to 29092 and it all worked, sweeeeeet, thanks for that pointer.
curious (to make me understand) why it did not work on 9092 as the broker is also exposed on that port ?
G
welcome to add this docker-compose.yaml file to your repo for someone that want the entire stack…
G
Port 9092 in this example is for clients on the host machine. The KAFKA_ADVERTISED_LISTENERS
environment variable is what gets passed back to clients, and there are two listeners (PLAINTEXT://broker:29092
and PLAINTEXT_HOST://localhost:9092
). For Flink to connect to Kafka it has to use the first one to connect on the Docker network since localhost
would be itself. This blog on listeners and Docker is a helpful reference on this topic.
I’m remembering now that there is this example that has Confluent Platform plus Flink. This is pretty much what you’ve got minus kcat.
he he he… now if i knew this before…
but then having put this together myself was fun and educational.
G
is this a hard coded behaviour. it reaches out to the kafka cluster based on the first port, so if i swopped the 2 ports around in the KAFAK_ADVERTISED_LISTENERS then I would have used 9092.
G
I don’t follow this question. I was just saying that the listener to use in the Flink shell is the one that that has the hostname that is resolvable and correct from the perspective of other containers, i.e., the one that refers to broker
and not the localhost
one. So, you could swap ports to use 9092, keeping in mind that the bootstrap servers configs in the other services would also have to change.
I originally understood it as the one used was as per position, first variable specified…
I missed that the one was poker:port and the other localhost:port
G