Install AMQ Streams Custom Resource Definitions

Download and extract

  1. Download the latest version of Red Hat AMQ Streams installation examples. The latest version at the time of this writing is Red Hat AMQ Streams 1.6.

  2. Unzip Red Hat AMQ Streams installation and example resources to any destination.

    • On Windows or Mac, you can extract the contents of the ZIP archive by double-clicking on the ZIP file.

    • On Red Hat Enterprise Linux, open a terminal window in the target machine and navigate to where the ZIP file was downloaded. Extract the ZIP file by executing the following command:

      # unzip amq-streams-1.6.0-ocp-install-examples.zip

Install Custom Resource Definitions (CRDs)

Install Custom Resource Definitions (CRDs)
 

  1. Login into the OpenShift cluster with cluster-admin privileges, for example:

    # oc login -u system:admin

  2. By default, the installation files work in the myproject namespace. Modify the installation files according to the namespace where you will install the AMQ Streams Kafka Cluster Operator, for example kafka.

    • On Linux, use:

      # sed -i 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml

    • On Mac:

      # sed -i '' 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml

  3. Deploy the Custom Resource Definitions (CRDs) and role-based access control (RBAC) resources to manage the CRDs.

    # oc new-project kafka
    # oc apply -f install/cluster-operator/ 

  4. Create the project where you want to deploy your Kafka cluster, for example my-kafka-project.

    # oc new-project my-kafka-project

  5. Give it access to your non-admin user developer

    # oc adm policy add-role-to-user admin developer -n my-kafka-project

  6. Enable the Cluster Operator to watch that namespace.

    # oc set env deploy/strimzi-cluster-operator STRIMZI_NAMESPACE=kafka,my-kafka-project -n kafka
    # oc apply -f install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml -n my-kafka-project
    # oc apply -f install/cluster-operator/032-RoleBinding-strimzi-cluster-operator-topic-operator-delegation.yaml -n my-kafka-project
    # oc apply -f install/cluster-operator/031-RoleBinding-strimzi-cluster-operator-entity-operator-delegation.yaml -n my-kafka-project

  7. Create the new Cluster Role strimzi-admin.

    # oc apply -f install/strimzi-admin/
  8. Add the role to the non-admin user developer.

    # oc adm policy add-cluster-role-to-user strimzi-admin developer

Create your first Apache Kafka Cluster

Create Cluster and Topic resources

The Cluster Operator now will listen for new Kafka resources.

  1. Login as a normal user, for example:

    # oc login -u developer

    # oc project my-kafka-project

  2. Create the new my-cluster Kafka Cluster with 3 zookeeper and 3 broker nodes using ephemeral storage and exposing the Kafka cluster outside of the OpenShift cluster using Routes:

    # cat << EOF | oc create -f -
    apiVersion: kafka.strimzi.io/v1beta1
    kind: Kafka
    metadata: 
      name: my-cluster
    spec:
      kafka:
        replicas: 3
        listeners:
          - name: external
            port: 9092
            type: route
            tls: true
        storage:
          type: ephemeral
      zookeeper:
        replicas: 3
        storage:
          type: ephemeral
      entityOperator:
        topicOperator: {}
    EOF
  3. Now that our cluster is running, we can create a topic to publish and subscribe to from our external client. Create the following my-topicTopic custom resource definition with 3 replicas and 3 partitions in my-cluster Kafka cluster:

    # cat << EOF | oc create -f -
    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaTopic
    metadata:
      name: my-topic
      labels:
        strimzi.io/cluster: my-cluster 
    spec:
      partitions: 3
      replicas: 3
    EOF

You are now ready to start sending and receiving messages.

Start sending and receiving from a Topic

Test using an external Red Hat Fuse application. You will need to have Maven installed and a Java 8 JDK.

  1. Clone this git repo to test the access to your new Kafka cluster:

    $ git clone https://github.com/hguerrero/amq-examples.git

  2. Switch to the camel-kafka-demo folder.

    $ cd amq-examples/camel-kafka-demo/

  3. As we are using Routes for external access to the cluster, we need the cluster CA certificate to enable TLS in the client. Extract the public certificate of the broker certification authority.

    $ oc extract secret/my-cluster-cluster-ca-cert --keys=ca.crt --to=- > src/main/resources/ca.crt

  4. Import the trusted cert to a Keystore.

    $ keytool -import -trustcacerts -alias root -file src/main/resources/ca.crt -keystore src/main/resources/keystore.jks -storepass password -noprompt

  5. Now you can run the Red Hat Fuse application to send and receive messages to the Kafka cluster using the following maven command (this step currently requires Java 8):

    $ mvn -Drun.jvmArguments="-Dbootstrap.server=`oc get routes my-cluster-kafka-external-bootstrap -o=jsonpath='{.status.ingress[0].host}{"\n"}'`:443" clean package spring-boot:run

    After finishing the clean and package phases you will see the Spring Boot application start creating a producer and consumer sending and receiving messages from the “my-topic” Kafka topic.

    14:36:18.170 [main] INFO  com.redhat.kafkademo.Application - Started Application in 12.051 seconds (JVM running for 12.917)
    14:36:18.490 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=6de87ffa-c7cf-441b-b1f8-e55daabc8d12] Discovered coordinator my-cluster-kafka-1-myproject.192.168.99.100.nip.io:443 (id: 2147483646 rack: null)
    14:36:18.498 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=6de87ffa-c7cf-441b-b1f8-e55daabc8d12] Revoking previously assigned partitions []
    14:36:18.498 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=6de87ffa-c7cf-441b-b1f8-e55daabc8d12] (Re-)joining group
    14:36:19.070 [Camel (MyCamel) thread #3 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-hguerrer-osx-1540578972584-0-2
    14:36:19.987 [Camel (MyCamel) thread #4 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-hguerrer-osx-1540578972584-0-4
    14:36:20.982 [Camel (MyCamel) thread #5 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-hguerrer-osx-1540578972584-0-6
    14:36:21.620 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=6de87ffa-c7cf-441b-b1f8-e55daabc8d12] Successfully joined group with generation 1
    14:36:21.621 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=6de87ffa-c7cf-441b-b1f8-e55daabc8d12] Setting newly assigned partitions [my-topic-0, my-topic-1, my-topic-2]
    14:36:21.959 [Camel (MyCamel) thread #6 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-hguerrer-osx-1540578972584-0-8
    14:36:21.973 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-hguerrer-osx-1540578972584-0-8
    14:36:22.970 [Camel (MyCamel) thread #7 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-hguerrer-osx-1540578972584-0-11
    14:36:22.975 [Camel (MyCamel) thread #1 - KafkaConsumer[my-topic]] INFO  consumer-route - consumer >>> Hello World from camel-context.xml with ID ID-hguerrer-osx-1540578972584-0-11
    14:36:23.968 [Camel (MyCamel) thread #8 - KafkaProducer[my-topic]] INFO  producer-route - producer >>> Hello World from camel-context.xml with ID ID-hguerrer-osx-1540578972584-0-14
    
    

    You’re done! Press Ctrl + C to stop the running program.

Last updated: April 1, 2021