Featured image for Apache Camel K with Red Hat OpenShift.

When deploying applications on Kubernetes, certain platform characteristics strongly influence the application's architecture. In a greenfield setting, it's all about harnessing the ephemeral nature of stateless applications. Applications are built to run in scenarios where there is an expectation of high availability via horizontal scaling. Not only can the application scale out, but Kubernetes' orchestration characteristics emphasize that no individual pod is safe from destruction. Kubernetes is the epitome of the old U.S. Navy Seal saying: "Two is one, and one is none."

Workloads on Kubernetes don't always fit this model, however. Some workloads are singular in nature, and parallelization isn't an option. For example, suppose an application connects out to an external service and receives information asynchronously via a TCP socket or websocket. As part of this process, the application receives data, transforms the structure, and publishes that data into an Apache Kafka topic. In this case, only a single connection can be active at one time because of the possibility of publishing duplicate data (see Figure 1).

There are two processes, but only one is active at a time.
Figure 1: Only one process can be active at a time.

The quick solution to this problem is actually a fundamental characteristic of Kubernetes. If the deployment is created with the replicas set to 1, then when the controller detects the pod is no longer running, it will attempt to create a new one. However, real-world situations can be more complicated. Some applications require a long startup time due to cache warming needs. When you combine slow startup times (minutes) for the pod with business requirements to minimize the loss of downtime, the default solution becomes unsuitable. In this situation, we will want to have multiple instances up and ready to take over the consumption as quickly as possible.

This article shows you how to implement leader election in Kubernetes using Apache Camel. To follow along with the examples, see the demo code available on GitHub.

Hot-warm with leader election

To run an application as hot-warm means to have multiple instances of the application running and ready to serve requests, but only one instance actually doing the work. Within Kubernetes, this means having multiple pods ready at all times, but only one pod active for a particular process. In this scenario, the pods negotiate among themselves which one is active.

Apache Camel has a component (called master) that is built exactly for this scenario. As the docs explain, the Camel-Master endpoint lets us ensure only a single consumer in a cluster consumes from a given endpoint, with automatic failover if that Java virtual machine (JVM) dies. To achieve this goal, the endpoint requires a shared resource and locking. The component has multiple implementations for the locking mechanism, including the following:

  • camel-atomix
  • camel-consul
  • camel-file
  • camel-infinispan
  • camel-jgroups-raft
  • camel-jgroups
  • camel-kubernetes
  • camel-zookeeper

Within the component's configuration, the developer provides a namespace to designate the shared resource. All processes that use the same namespace for the locking will ensure that only one process at a time obtains the lock. When a process has the lock, it is the leader, and the process will run. If it loses the lock for any reason, the component will stop the process, as well.

Leader election with a Camel route

The first example uses the traditional Camel route domain-specific language (DSL), where we incorporate the master component along with the timer component. For this example, we are using Quarkus and Quarkus Camel extensions to implement the functionality:

import org.apache.camel.builder.RouteBuilder;
import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class TimerLoggerClusteredRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("master:timer-logger-ns:timer://current-leader-check-timer?fixedRate=true&period=500")
                .log("Current Leader");
    }

}

The first thing you should notice is that the master DSL is prepended to the timer DSL. The DSL designates a namespace of timer-logger-ns, so all instances of this application will check the locks at regular intervals to see if it's available. However, all the implementation details are obscured from the application as it relates to how the lock is created or managed. It's simply a logical construct around locking a namespace.

Testing the example locally

Although we will ultimately deploy this application out to a Kubernetes cluster, we want to demonstrate and test this functionality in our local environment. When doing so, we won't have access to the Kubernetes leases, so we will implement the locking using the camel-file component. Because of the environmental differences, we will leverage Quarkus profiles to produce the correct CamelClusterService implementation for our environment:

@ApplicationScoped
public class ClusterLockProducer {

    @ConfigProperty(name = "namespace")
    Optional<String> namespace;

    @Produces
    @UnlessBuildProfile("prod")
    public CamelClusterService fileLockClusterService(CamelContext camelContext) throws Exception {
        FileLockClusterService service = new FileLockClusterService();
        service.setRoot("cluster");
        service.setAcquireLockDelay(1, TimeUnit.SECONDS);
        service.setAcquireLockInterval(1, TimeUnit.SECONDS);
        return service;
    }

    @Produces
    @IfBuildProfile("prod")
    public CamelClusterService kubernetesClusterService(CamelContext camelContext) {
        KubernetesClusterService service = new KubernetesClusterService();
        if (namespace.isPresent()){
            service.setKubernetesNamespace(namespace.get());
        }
        return service;
    }

}

For the local environment, we set up the FileLockClusterService. The setRoot allows us to designate the location of the files used for the lock, relative or absolute. After the process starts, the files are created and used as locks to designate the current leader, as shown in Figure 2.

When the process is started with the current configuration in our local development environment, it places the locks into a “cluster” folder setup in the application's working directory.
Figure 2: The target folder that contains the files for locking.

If a new instance of the application is started locally, then the newly started application will not be able to obtain the locks and therefore will not run the timer component. If you kill the leader, the other application will check the lock, see that it's not locked, and subsequently obtain the lock and start processing. Additionally, we can apply settings to designate how often we want the service to check the locks and acquire the lock.

Leader election without Camel routes

The Camel route example in the previous section shows how easy it is to add the additional DSL to create and manage singleton processes. However, there are times when the processes to be managed do not fit well into a Camel route, or the processes might already be implemented and we don't necessarily want to have to rewrite the code. Luckily the mechanisms supplied by the master component are available outside of the DSL.

Let's say I have a regular service that can be started and stopped. In this example, the application-scoped bean will instantiate a single-threaded scheduled executor that logs messages at a certain fixed rate.

@ApplicationScoped
public class BeanLoggerService {

    private static final Logger LOGGER = LoggerFactory.getLogger(BeanLoggerService.class);
    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    private ScheduledFuture scheduledFuture;

    public void start() {
        if (scheduledFuture == null) {
            scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> {
                LOGGER.info("{}", System.currentTimeMillis());
            }, 1, 1, TimeUnit.SECONDS);
        }
    }

    public void stop() {
        if (scheduledFuture != null) {
            this.scheduledFuture.cancel(true);
            this.scheduledFuture = null;
        }
    }

}

I want to ensure that this service is always running as a singleton among the JVMs. I can accomplish this by obtaining the CamelClusterService directly and adding an event listener to start and stop the service when leadership changes are detected:

@Startup
public class BeanLoggerLeadershipListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(BeanLoggerLeadershipListener.class);

    private CamelContext camelContext;
    private BeanLoggerService beanLoggerService;

    public BeanLoggerLeadershipListener(CamelContext camelContext, BeanLoggerService beanLoggerService) {
        this.camelContext = camelContext;
        this.beanLoggerService = beanLoggerService;
    }

    void onStart(@Observes StartupEvent ev) throws Exception {
        CamelClusterService camelClusterService = ClusterServiceHelper.lookupService(camelContext).orElseThrow(() -> new RuntimeException("Unable to lookupService for CamelClusterService"));
        if (camelClusterService instanceof KubernetesClusterService) {
            KubernetesClusterService kcs = (KubernetesClusterService) camelClusterService;
            LOGGER.info("KubernetesClusterService: LeaseResourceType={}, KubernetesNamespace={}, KubernetesResourceName={}, MasterUrl={}",
                    kcs.getLeaseResourceType().name(), kcs.getKubernetesNamespace(), kcs.getKubernetesResourceName(), kcs.getMasterUrl());
        }
        camelClusterService.getView("bean-logger-ns").addEventListener((CamelClusterEventListener.Leadership) (view, leader) -> {
            LOGGER.info("LeadershipEvent[bean-logger-ns]: {}", leader);
            boolean weAreLeader = leader.isPresent() && leader.get().isLocal();
            if (weAreLeader) {
                beanLoggerService.start();
            } else {
                beanLoggerService.stop();
            }
        });
    }

}

In this example, we are using Quarkus's lifecycle hooks to run code to register the leadership event listener needed to start and stop the BeanLoggerService. Note that the additional logging code in there will be used to better demonstrate the scenario running in Red Hat OpenShift.

Running Camel-Master on OpenShift

In a local environment, we used the FileLockClusterService. Now that we are ready to deploy this application on OpenShift, we will switch the implementation from using files to using Kubernetes leases. To start, let’s take a look at the deployment manifest for the application.

Note: I used Kustomize to manage all the manifests, so you will notice the absence of the namespace. That's managed in my main kustomization.yaml file:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: quarkus-camel-master-demo-deployment
  labels:
    app: quarkus-camel-master-demo
spec:
  replicas: 2
  selector:
    matchLabels:
      app: quarkus-camel-master-demo
  template:
    metadata:
      labels:
        app: quarkus-camel-master-demo
    spec:
      serviceAccountName: "camel-leader-election"
      containers:
        - name: quarkus-camel-master-demo-container
          image: quay.io/stephennimmo/quarkus-camel-master-demo:0.0.1-SNAPSHOT
          imagePullPolicy: Always
          env:
            - name: QUARKUS_LOG_LEVEL
              value: "DEBUG"
            - name: NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
          ports:
            - containerPort: 8080

In this deployment, we have two replicas, but we want only one pod to run the processes. The configuration contains two items of note:

  • Notice the serviceAccountName in the deployment config. For this deployment, we need to set up a service account that specifically has access to the Kubernetes leases.
  • We are passing in the namespace as a configuration parameter to be used to set up the KubernetesClusterService to point the same namespace as our application. This will tell the application to attempt to acquire the lease objects in the same namespace as our application.

Before we deploy, we need to set up the service account and give it permissions to read and write to the lease objects:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: camel-leader-election
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: camel-leader-election
rules:
  - apiGroups:
      -""
      - "coordination.k8s.io"
    resources:
      - configmaps
      - secrets
      - pods
      - leases
    verbs:
      - create
      - delete
      - deletecollection
      - get
      - list
      - patch
      - update
      - watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: camel-leader-election
subjects:
  - kind: ServiceAccount
    name: camel-leader-election
roleRef:
  kind: Role
  name: camel-leader-election
  apiGroup: rbac.authorization.k8s.io

Leadership elections in OpenShift

Once we deploy the application into OpenShift, the application will use the KubernetesClusterService implementation of the CamelClusterService to perform the leadership elections. To do this, the service will periodically query the lease information and attempt to update the information if the last update has not been performed in the designated lease time. The configuration for the timing of the leader election activity is more detailed, which should be expected; we are no longer simply checking a file lock, but rather working in more of a heartbeat monitoring pattern. Let's take a look at the running pods, the leases, and the associated YAML file.

% oc get pods
NAME                                                   READY   STATUS    RESTARTS   AGE
quarkus-camel-master-demo-deployment-894569d67-fcjhc   1/1     Running   0          4d1h
quarkus-camel-master-demo-deployment-894569d67-lt5jv   1/1     Running   0          4d1h

% oc get leases
NAME                      HOLDER                                                 AGE
leaders-bean-logger-ns    quarkus-camel-master-demo-deployment-894569d67-lt5jv   15d
leaders-timer-logger-ns   quarkus-camel-master-demo-deployment-894569d67-fcjhc   18d
% oc get lease leaders-timer-logger-ns  -o yaml 
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
  creationTimestamp: "2021-08-08T15:07:29Z"
  labels:
    provider: camel
  managedFields:
  - apiVersion: coordination.k8s.io/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:spec:
        f:renewTime: {}
    manager: okhttp
    operation: Update
    time: "2021-08-26T15:52:43Z"
  name: leaders-timer-logger-ns
  namespace: quarkus-camel-master-demo
  resourceVersion: "26043451"
  selfLink: /apis/coordination.k8s.io/v1/namespaces/quarkus-camel-master-demo/leases/leaders-timer-logger-ns
  uid: 4a79df33-d9d7-4ae1-a16b-d964254f46c6
spec:
  acquireTime: "2021-08-22T14:23:28.586420Z"
  holderIdentity: quarkus-camel-master-demo-deployment-894569d67-fcjhc
  leaseDurationSeconds: 15
  leaseTransitions: 20
  renewTime: "2021-08-26T15:52:43.467366Z"

The holderIdentity shows that the pod named quarkus-camel-master-demo-deployment-894569d67-fcjhc is currently the leader and is running the timer-logger process. If we look at the logs (Figure 3), we will see that this is the case. We turned on the debugging for the leadership detection so we can actually see the interactions with the leases being updated.

Screenshot of the OpenShift logging view of the pod with the leader election.
Figure 3: The OpenShift logging view of the pod with the leader election.

One of the more interesting pieces to note: This pod is currently only the leader as it relates to the timer-logger-ns. Remember, we actually have an additional master namespace for the bean-logger-ns. Let’s take a look at that lease:

% oc get lease leaders-bean-logger-ns -o yaml 
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
  creationTimestamp: "2021-08-10T15:47:54Z"
  labels:
    provider: camel
  managedFields:
  - apiVersion: coordination.k8s.io/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:spec:
        f:renewTime: {}
    manager: okhttp
    operation: Update
    time: "2021-08-26T16:01:12Z"
  name: leaders-bean-logger-ns
  namespace: quarkus-camel-master-demo
  resourceVersion: "26053368"
  selfLink: /apis/coordination.k8s.io/v1/namespaces/quarkus-camel-master-demo/leases/leaders-bean-logger-ns
  uid: faae3687-ea77-45f5-9bad-910c11a21c2b
spec:
  acquireTime: "2021-08-22T14:23:28.424619Z"
  holderIdentity: quarkus-camel-master-demo-deployment-894569d67-lt5jv
  leaseDurationSeconds: 15
  leaseTransitions: 4
  renewTime: "2021-08-26T16:01:12.382697Z"

For the bean-logging process, the leader is actually the other pod: quarkus-camel-master-demo-deployment-894569d67-lt5jv. The leadership election for each master namespace is completely independent. This behavior might or might not be suitable for the application's needs, so keep that in mind if you are building multiple dependent processes that need to always run in a single pod.

Conclusion

To learn more about how Red Hat can support your integration needs, including support for Apache Camel, check out the Red Hat Fuse website for more information.

The demo code for this article is available on GitHub.

Follow me on Twitter @stephennimmo.

Comments