ansible share image

Nowadays, IT automation is a must to accelerate, improve and deliver value in a secured, tested, and easy way. Ansible has become the go-to tool for IT teams due to its simplicity, versatility, and powerful automation capabilities. With its agentless architecture, Ansible allows for seamless deployment, configuration management, and orchestration across a wide range of systems and platforms.

Apache Kafka has emerged as the preferred tool for handling real-time data streams. Its distributed architecture, fault tolerance, and scalability make it a robust choice for building event-driven architecture (EDA). Red Hat AMQ streams is the enterprise edition of this platform provided by Red Hat. 

Both amazing tools meet together in the Ansible Middleware community, providing the Ansible collection for Red Hat AMQ streams. This collection includes a set of automation capabilities and features to manage and operate Apache Kafka clusters on top of Red Hat Enterprise Linux.

In this article, you will learn how to easily deploy an Apache Kafka cluster based on AMQ streams using the Ansible collection for Red Hat AMQ streams.

Install the collection

Note: To make use of this tutorial, you need a Red Hat Enterprise Linux or Fedora system, along with version 2.12 or higher of Ansible (preferably the latest version).

The very first step, of course, is to install the collection itself so that Ansible can use its content inside playbooks:

$ ansible-galaxy collection install middleware_automation.amq_streams

Before going further, you should check to make sure that the collection has been successfully installed. To do so, run the following command from Ansible Galaxy that will list all the installed collections:

$ ansible-galaxy collection list 
Collection                          Version
----------------------------------- -------
ansible.posix                       1.5.4 
middleware_automation.amq_streams   0.0.5  

Note: As this collection is under development and evolution, the version downloaded might differ from the version displayed.

Now that you've installed the collection and its dependencies, you can use them to automate the installation of AMQ streams.

Deploy AMQ streams with Ansible

Thanks to the dedicated Ansible collection for AMQ streams, automating the installation and configuration of Apache Kafka is easy. However, before you implement this collection inside your playbook, we should recap what we mean here by installing and configuring Apache Kafka. Indeed, this task encompasses quite a few operations that are performed on the target system:

  • Creating appropriate OS user and group accounts.
  • Downloading the installation archive from the Kafka website.
  • Unarchiving the contents while ensuring that all the files are associated with the appropriate user and groups along with the correct permissions.
  • Ensuring that the required version of the Java Virtual Machine (JVM) is installed.
  • Integrating the software into the host service management service— in our case, the Linux systemd daemon.

Apache Kafka is a distributed ecosystem of different components with multiple deployment options and capabilities. The Ansible collection for AMQ streams helps automate most common deployment and configuration topologies. This article shows one of these cases for the purposes of demonstrating the capabilities of the collection. For further information, please refer to the Ansible collection for AMQ streams documentation.

Figure 1 illustrates our use case: a distributed cluster formed by a ZooKeeper ensemble of three nodes and three Kafka brokers managed by Ansible.

AMQ Streams deployment managed by Ansible
Figure 1: AMQ Streams deployment managed by Ansible

All of this is achieved and is fully automated by the following playbook, combining several different roles included within the Ansible collection of AMQ streams.

Create a playbook at the path playbooks/my-amq_streams_distributed.yml containing the following content:

---
- name: "Ansible Playbook to install a Zookeeper ensemble and Kafka Broker Authenticated"
  hosts: all
  vars:    
    # Enabling Zookeeper Authentication
    amq_streams_zookeeper_auth_enabled: true
    amq_streams_zookeeper_auth_user: zkadmin
    amq_streams_zookeeper_auth_pass: p@ssw0rd

    # Enabling Kafka BrokerListeners
    amq_streams_broker_listeners:
      - AUTHENTICATED://:{{ amq_streams_broker_listener_port }} # Authenticated
      - REPLICATION://:{{ amq_streams_broker_listener_internal_port }} # Inter broker communication

    # Listener for inter-broker communications
    amq_streams_broker_inter_broker_listener: REPLICATION

    # Enabling Kafka Broker Authentication
    amq_streams_broker_auth_enabled: true
    amq_streams_broker_auth_scram_enabled: true
    amq_streams_broker_auth_listeners:
      - AUTHENTICATED:SASL_PLAINTEXT
      - REPLICATION:SASL_PLAINTEXT

    amq_streams_broker_auth_sasl_mechanisms:
      - PLAIN
      - SCRAM-SHA-512

    # Kafka Plain Users
    amq_streams_broker_auth_plain_users:
      - username: admin
        password: p@ssw0rd
      - username: kafkauser
        password: p@ssw0rd

    # Setting Kafka user for inter-broker communication
    amq_streams_broker_inter_broker_auth_sasl_mechanisms: PLAIN
    amq_streams_broker_inter_broker_auth_broker_username: interbroker
    amq_streams_broker_inter_broker_auth_broker_password: p@ssw0rd

    # Enabling Broker replication
    amq_streams_broker_offsets_topic_replication_factor: 3
    amq_streams_broker_transaction_state_log_replication_factor: 3
    amq_streams_broker_transaction_state_log_min_isr: 2
  roles:
    - role: amq_streams_zookeeper
  tasks:
    - name: "Ensure Zookeeper is running and available."
      ansible.builtin.include_role:
        name: amq_streams_zookeeper

    - name: "Ensure AMQ Streams Broker is running and available."
      ansible.builtin.include_role:
        name: amq_streams_broker

  post_tasks:
    - name: "Display numbers of Zookeeper instances managed by Ansible."
      ansible.builtin.debug:
        msg: "Numbers of Zookeeper instances: {{ amq_streams_zookeeper_instance_count }}."
      when:
        - amq_streams_zookeeper_instance_count_enabled is defined and amq_streams_zookeeper_instance_count_enabled
     
    - name: "Display numbers of broker instances managed by Ansible."
      ansible.builtin.debug:
        msg: "Numbers of broker instances: {{ amq_streams_broker_instance_count }}."
      when:
        - amq_streams_broker_instance_count_enabled is defined and amq_streams_broker_instance_count_enabled

    - name: "Validate that Zookeeper deployment is functional."
      ansible.builtin.include_role:
        name: amq_streams_zookeeper
        tasks_from: validate.yml

    - name: "Validate that Broker deployment is functional."
      ansible.builtin.include_role:
        name: amq_streams_broker
        tasks_from: validate.yml

The playbook includes a set of different users and their credentials. Note that because these variables contain sensitive data, they should be secured using Ansible Vault or some other secrets management system. However, that task is beyond the scope of this article.

The inventory of hosts to deploy the desired topology looks similar to the following:

[all]
rhel9mw01
rhel9mw02
rhel9mw03

[zookeepers]
rhel9mw01
rhel9mw02
rhel9mw03

[brokers]
rhel9mw01
rhel9mw02
rhel9mw03

Create a new file called inventory with the contents above.

The zookeepers group identifies the hosts to deploy the ZooKeeper component, and the brokers group identifies the hosts to deploy the Kafka brokers.

Run this playbook as follows:

$ ansible-playbook -i inventory playbooks/my-amq_streams_distributed.yml

Note: In order for this playbook to perform the installation outlined here, Ansible must have sudo or root privileges on the target hosts.

Check for successful installation

Once the playbook finishes its execution, you can confirm that the ZooKeeper and Broker services are now running by verifying their status.

ZooKeeper

Check the ZooKeeper service:

[rhmw@rhel9mw01 logs]$ sudo systemctl status amq_streams_zookeeper.service 
● amq_streams_zookeeper.service - amq_streams_zookeeper
     Loaded: loaded (/usr/lib/systemd/system/amq_streams_zookeeper.service; enabled; preset: disabled)
    Drop-In: /usr/lib/systemd/system/service.d
             └─10-timeout-abort.conf
     Active: active (running) since Wed 2023-06-28 15:16:03 CEST; 3min 59s ago
   Main PID: 4873 (java)
      Tasks: 52 (limit: 2298)
     Memory: 125.1M
        CPU: 3.567s
     CGroup: /system.slice/amq_streams_zookeeper.service
             └─4873 java -Xmx256M -Xms256M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog>

Jun 28 15:16:36 rhel9mw01 zookeeper-server-start.sh[4873]: [2023-06-28 15:16:36,050] INFO Committing global session 0x300000721bc0000 (org.apache.zookeeper.server.quorum.LearnerSessionTracker)
Jun 28 15:16:36 rhel9mw01 zookeeper-server-start.sh[4873]: [2023-06-28 15:16:36,320] INFO Committing global session 0x10000064a600000 (org.apache.zookeeper.server.quorum.LearnerSessionTracker)
Jun 28 15:16:36 rhel9mw01 zookeeper-server-start.sh[4873]: [2023-06-28 15:16:36,390] INFO Successfully authenticated client: authenticationID=zkadmin;  authorizationID=zkadmin. (org.apache.zookeeper.server.auth.Sas>
Jun 28 15:16:36 rhel9mw01 zookeeper-server-start.sh[4873]: [2023-06-28 15:16:36,455] INFO Setting authorizedID: zkadmin (org.apache.zookeeper.server.auth.SaslServerCallbackHandler)
Jun 28 15:16:36 rhel9mw01 zookeeper-server-start.sh[4873]: [2023-06-28 15:16:36,458] INFO adding SASL authorization for authorizationID: zkadmin (org.apache.zookeeper.server.ZooKeeperServer)
Jun 28 15:16:36 rhel9mw01 zookeeper-server-start.sh[4873]: [2023-06-28 15:16:36,543] INFO Committing global session 0x300000721bc0001 (org.apache.zookeeper.server.quorum.LearnerSessionTracker)
Jun 28 15:16:48 rhel9mw01 zookeeper-server-start.sh[4873]: [2023-06-28 15:16:48,086] INFO Submitting global closeSession request for session 0x10000064a600000 (org.apache.zookeeper.server.ZooKeeperServer)
Jun 28 15:16:50 rhel9mw01 zookeeper-server-start.sh[4873]: [2023-06-28 15:16:50,524] INFO Committing global session 0x300000721bc0002 (org.apache.zookeeper.server.quorum.LearnerSessionTracker)
Jun 28 15:16:50 rhel9mw01 zookeeper-server-start.sh[4873]: [2023-06-28 15:16:50,650] INFO Committing global session 0x200000647ab0000 (org.apache.zookeeper.server.quorum.LearnerSessionTracker)
Jun 28 15:16:50 rhel9mw01 zookeeper-server-start.sh[4873]: [2023-06-28 15:16:50,816] INFO Committing global session 0x300000721bc0003 (org.apache.zookeeper.server.quorum.LearnerSessionTracker)

Kafka broker

Check the Kafka broker service:

rhmw@rhel9mw01 logs]$ sudo systemctl status amq_streams_broker.service 
● amq_streams_broker.service - amq_streams_broker
     Loaded: loaded (/usr/lib/systemd/system/amq_streams_broker.service; enabled; preset: disabled)
    Drop-In: /usr/lib/systemd/system/service.d
             └─10-timeout-abort.conf
     Active: active (running) since Wed 2023-06-28 15:16:48 CEST; 5min ago
   Main PID: 11241 (java)
      Tasks: 78 (limit: 2298)
     Memory: 319.1M
        CPU: 8.601s
     CGroup: /system.slice/amq_streams_broker.service
             └─11241 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog:gc>

Jun 28 15:16:52 rhel9mw01 kafka-server-start.sh[11241]: [2023-06-28 15:16:52,104] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
Jun 28 15:16:52 rhel9mw01 kafka-server-start.sh[11241]: [2023-06-28 15:16:52,208] INFO [ExpirationReaper-0-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
Jun 28 15:16:52 rhel9mw01 kafka-server-start.sh[11241]: [2023-06-28 15:16:52,240] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
Jun 28 15:16:52 rhel9mw01 kafka-server-start.sh[11241]: [2023-06-28 15:16:52,251] INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Enabling request processing. (kafka.network.SocketServer)
Jun 28 15:16:52 rhel9mw01 kafka-server-start.sh[11241]: [2023-06-28 15:16:52,257] INFO Kafka version: 3.3.2 (org.apache.kafka.common.utils.AppInfoParser)
Jun 28 15:16:52 rhel9mw01 kafka-server-start.sh[11241]: [2023-06-28 15:16:52,257] INFO Kafka commitId: b66af662e61082cb (org.apache.kafka.common.utils.AppInfoParser)
Jun 28 15:16:52 rhel9mw01 kafka-server-start.sh[11241]: [2023-06-28 15:16:52,257] INFO Kafka startTimeMs: 1687958212256 (org.apache.kafka.common.utils.AppInfoParser)
Jun 28 15:16:52 rhel9mw01 kafka-server-start.sh[11241]: [2023-06-28 15:16:52,258] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
Jun 28 15:16:52 rhel9mw01 kafka-server-start.sh[11241]: [2023-06-28 15:16:52,335] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use node rhel9mw01:9091 (i>
Jun 28 15:16:52 rhel9mw01 kafka-server-start.sh[11241]: [2023-06-28 15:16:52,383] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use node rhel9mw01:909

Testing the Kafka cluster

So, our Apache Kafka cluster is up and running and ready to go and manage streaming events. Let's test it by creating a simple topic, producing some messages, and consuming them.

This test can be accomplished using the Kafka CLI scripts below for these actions. These commands are simple examples and can be extended to implement more complex actions in your environment.

As our Kafka cluster requires authentication, we need to create a file with the user credentials and authentication mechanism to reference when invoking the CLI. In our case, this file will use the kafkauser user created by the playbook.

$ cat <<EOF > /tmp/kafka-cli.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafkauser" password="password";
EOF

This command will create the sample-topic:

$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config /tmp/kafka-cli.properties  --create --topic sample-topic --partitions 10 --replication-factor 3
Created topic sample-topic.

The kafka-console-producer.sh script allows for sending messages to a topic. The following actions will publish messages to the topic:

$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sample-topic --producer.config /tmp/kafka-cli.properties
>Hello Ansible Collection for AMQ Streams!!!!
>Another message!!

With a series of messages published, the kafka-console-consumer.sh script can be used to consume the messages. The following command will enable the consumption of the messages:

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sample-topic --consumer.config /tmp/kafka-cli.properties --from-beginning --timeout-ms 10000
Hello Ansible Collection for AMQ Streams!!!!
Another message!!
[2023-06-28 16:04:24,896] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 2 messages

Cool! Your Apache Kafka cluster is ready and fully automated by Ansible.

Summary

By using Ansible and the Ansible collection for AMQ streams as outlined in this article, you can fully automate the deployment of an event streaming platform in any RHEL-based environment without any manual intervention. Ansible performed all the heavy lifting (downloading software, preparing the OS, creating users and groups, deploying the binary files and the configuration, setting up the service, and more) and even setting up the cluster with a set of users.

The Ansible collection for AMQ streams allows you to streamline the installation and configuration of Apache Kafka, thus enabling you to scale deployments as necessary and ensure repeatability across them all.

Last updated: September 21, 2023