Skip to main content
Redhat Developers  Logo
  • AI

    Get started with AI

    • Red Hat AI
      Accelerate the development and deployment of enterprise AI solutions.
    • AI learning hub
      Explore learning materials and tools, organized by task.
    • AI interactive demos
      Click through scenarios with Red Hat AI, including training LLMs and more.
    • AI/ML learning paths
      Expand your OpenShift AI knowledge using these learning resources.
    • AI quickstarts
      Focused AI use cases designed for fast deployment on Red Hat AI platforms.
    • No-cost AI training
      Foundational Red Hat AI training.

    Featured resources

    • OpenShift AI learning
    • Open source AI for developers
    • AI product application development
    • Open source-powered AI/ML for hybrid cloud
    • AI and Node.js cheat sheet

    Red Hat AI Factory with NVIDIA

    • Red Hat AI Factory with NVIDIA is a co-engineered, enterprise-grade AI solution for building, deploying, and managing AI at scale across hybrid cloud environments.
    • Explore the solution
  • Learn

    Self-guided

    • Documentation
      Find answers, get step-by-step guidance, and learn how to use Red Hat products.
    • Learning paths
      Explore curated walkthroughs for common development tasks.
    • Guided learning
      Receive custom learning paths powered by our AI assistant.
    • See all learning

    Hands-on

    • Developer Sandbox
      Spin up Red Hat's products and technologies without setup or configuration.
    • Interactive labs
      Learn by doing in these hands-on, browser-based experiences.
    • Interactive demos
      Click through product features in these guided tours.

    Browse by topic

    • AI/ML
    • Automation
    • Java
    • Kubernetes
    • Linux
    • See all topics

    Training & certifications

    • Courses and exams
    • Certifications
    • Skills assessments
    • Red Hat Academy
    • Learning subscription
    • Explore training
  • Build

    Get started

    • Red Hat build of Podman Desktop
      A downloadable, local development hub to experiment with our products and builds.
    • Developer Sandbox
      Spin up Red Hat's products and technologies without setup or configuration.

    Download products

    • Access product downloads to start building and testing right away.
    • Red Hat Enterprise Linux
    • Red Hat AI
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform
    • See all products

    Featured

    • Red Hat build of OpenJDK
    • Red Hat JBoss Enterprise Application Platform
    • Red Hat OpenShift Dev Spaces
    • Red Hat Developer Toolset

    References

    • E-books
    • Documentation
    • Cheat sheets
    • Architecture center
  • Community

    Get involved

    • Events
    • Live AI events
    • Red Hat Summit
    • Red Hat Accelerators
    • Community discussions

    Follow along

    • Articles & blogs
    • Developer newsletter
    • Videos
    • Github

    Get help

    • Customer service
    • Customer support
    • Regional contacts
    • Find a partner

    Join the Red Hat Developer program

    • Download Red Hat products and project builds, access support documentation, learning content, and more.
    • Explore the benefits

Building Apache Kafka Streams applications using Red Hat AMQ Streams: Part 1

June 17, 2019
Adam Cattermole
Related topics:
Event-drivenKubernetes
Related products:
Streams for Apache Kafka

    The Apache Kafka project includes a Streams Domain-Specific Language (DSL) built on top of the lower-level Stream Processor API. This DSL provides developers with simple abstractions for performing data processing operations. However, how to build a stream processing pipeline in a containerized environment with Kafka isn't clear. This two-part article series describes the steps required to build your own Apache Kafka Streams application using Red Hat AMQ Streams.

    During this first part of this series, we provide a simple example to use as building blocks for part two. First things first, to set up AMQ Streams, follow the instructions in the AMQ Streams getting started documentation to deploy a Kafka cluster. The rest of this documentation assumes you followed these steps.

    Create a producer

    To get started, we need a flow of data streaming into Kafka. This data can come from a variety of different sources, but for the purposes of this example, let's generate sample data using Strings sent with a delay.

    Next, we need to configure the Kafka producer so that it talks to the Kafka brokers (see this article for a more in-depth explanation), as well as provides the topic name to write to and other information. As our application will be containerized, we can abstract this process away from the internal logic and read from environment variables. This reading could be done in a separate configuration class, but for this simple example, the following is sufficient:

    private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
    private static final int DEFAULT_DELAY = 1000;
    private static final String DEFAULT_TOPIC = "source-topic";
    
    private static final String BOOTSTRAP_SERVERS = "BOOTSTRAP_SERVERS";
    private static final String DELAY = "DELAY";
    private static final String TOPIC = "TOPIC";
    
    private static final String ACKS = "1";
    
    String bootstrapServers = System.getenv().getOrDefault(BOOTSTRAP_SERVERS, DEFAULT_BOOTSTRAP_SERVERS);
    long delay = Long.parseLong(System.getenv().getOrDefault(DELAY, String.valueOf(DEFAULT_DELAY)));
    String topic = System.getenv().getOrDefault(TOPIC, DEFAULT_TOPIC);
    

    We can now create appropriate properties for our Kafka producer using the environment variables (or the defaults we set):

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.ACKS_CONFIG, ACKS);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    

    Next, we create the producer with the configuration properties:

    KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    

    We can now stream the data to the provided topic, assigning the required delay between each message:

    int i = 0;
    while (true) {
        String value = String.format("hello world %d", i);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
        log.info("Sending record {}", record);
        producer.send(record);
        i++;
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            // sleep interrupted, continue
        }
    }
    

    Now that our application is complete, we need to package it into a Docker image and push to Docker Hub. We can then create a new deployment in our Kubernetes cluster using this image and pass the environment variables via YAML:

    apiVersion: extensions/v1beta1
    kind: Deployment
    metadata:
      labels:
        app: basic-example
      name: basic-producer
    spec:
      replicas: 1
      template:
        metadata:
          labels:
            app: basic-producer
        spec:
          containers:
          - name: basic-producer
            image: <docker-user>/basic-producer:latest
            env:
              - name: BOOTSTRAP_SERVERS
                value: my-cluster-kafka-bootstrap:9092
              - name: DELAY
                value: 1000
              - name: TOPIC
                value: source-topic
    

    We can check that data is arriving at our Kafka topic by consuming from it:

    $ oc run kafka-consumer -ti --image=registry.access.redhat.com/amq7/amq-streams-kafka:1.1.0-kafka-2.1.1 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic source-topic --from-beginning
    

    The output should look like this:

      hello world 0
      hello world 1
      hello world 2
      ...
    

    Create a Streams application

    A Kafka Streams application typically reads/sources data from one or more input topics, and writes/sends data to one or more output topics, acting as a stream processor. We can set up the properties and configuration the same way as before, but this time we need to specify a SOURCE_TOPIC and a SINK_TOPIC.

    To start, we create the source stream:

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> source = builder.stream(sourceTopic);
    

    We can now perform an operation on this source stream. For example, we can create a new stream, called filtered, which only contains records with an even-numbered index:

    KStream<String, String> filtered = source
            .filter((key, value) -> {
                int i = Integer.parseInt(value.split(" ")[2]);
                return (i % 2) == 0;
            });
    

    This new stream can then output to the sinkTopic:

    filtered.to(sinkTopic);
    

    We have now created the topology defining our stream application's operations, but we do not have it running yet. Getting it running requires creating the streams object, setting up a shutdown handler, and starting the stream:

    final KafkaStreams streams = new KafkaStreams(builder.build(), props);
    final CountDownLatch latch = new CountDownLatch(1);
    
    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
        @Override
        public void run() {
            streams.close();
            latch.countDown();
        }
    });
    
    try {
        streams.start();
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);
    

    There you go. It's as simple as this to get your Streams application running. Build the application into a Docker image, deploy in a similar way to the producer, and you can watch the SINK_TOPIC for the output.

    Read more

    Building Apache Kafka Streams applications using Red Hat AMQ Streams: Part 2 

    Last updated: September 3, 2019

    Recent Posts

    • Protect data offloaded to GPU-accelerated environments with OpenShift sandboxed containers

    • Case study: Measuring energy efficiency on the x64 platform

    • How to prevent AI inference stack silent failures

    • Preventing GPU waste: A guide to JIT checkpointing with Kubeflow Trainer on OpenShift AI

    • How to manage TLS certificates used by OpenShift GitOps operator

    Red Hat Developers logo LinkedIn YouTube Twitter Facebook

    Platforms

    • Red Hat AI
    • Red Hat Enterprise Linux
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform
    • See all products

    Build

    • Developer Sandbox
    • Developer tools
    • Interactive tutorials
    • API catalog

    Quicklinks

    • Learning resources
    • E-books
    • Cheat sheets
    • Blog
    • Events
    • Newsletter

    Communicate

    • About us
    • Contact sales
    • Find a partner
    • Report a website issue
    • Site status dashboard
    • Report a security problem

    RED HAT DEVELOPER

    Build here. Go anywhere.

    We serve the builders. The problem solvers who create careers with code.

    Join us if you’re a developer, software engineer, web designer, front-end designer, UX designer, computer scientist, architect, tester, product manager, project manager or team lead.

    Sign me up

    Red Hat legal and privacy links

    • About Red Hat
    • Jobs
    • Events
    • Locations
    • Contact Red Hat
    • Red Hat Blog
    • Inclusion at Red Hat
    • Cool Stuff Store
    • Red Hat Summit
    © 2026 Red Hat

    Red Hat legal and privacy links

    • Privacy statement
    • Terms of use
    • All policies and guidelines
    • Digital accessibility

    Chat Support

    Please log in with your Red Hat account to access chat support.