Eclipse Vert.x logo Eclipse Vert.x

This post is the fifth post of my Introduction to Eclipse Vert.x series. In the last post, we saw how Vert.x can interact with a database. To tame the asynchronous nature of Vert.x, we used Future objects. In this post, we are going to see another way to manage asynchronous code: reactive programming. We will see how Vert.x combined with Reactive eXtensions gives you superpowers.

Let’s start by refreshing our memory with the previous posts:

  • The first post described how to build a Vert.x application with Apache Maven and execute unit tests.
  • The second post described how this application became configurable.
  • The third post introduced vertx-web, and a collection management application was developed. This application exposes a REST API used by an HTML/JavaScript front end.
  • In the fourth post, we replaced the in-memory back end with a database and introduced Future to orchestrate our asynchronous operations.

In this post, we are not going to add a new feature. Instead, we'll explore another programming paradigm: reactive programming.

The code of this post is available on the GitHub repo, in the post-5 directory.

Thinking Reactively

Forget everything you know about code and look around. Modeling this world with code is challenging. As developers, we tend to use counter-intuitive approaches. Since the 1980s, object-oriented computing has been seen as the silver bullet. Every entity from our world is represented by an object containing fields and exposing methods. Most of the time, interacting with these objects is done using a blocking and synchronous protocol. You invoke a method and wait for a response. But...the world in which we are living is asynchronous. The interactions are done using events, messages, and stimuli. To overcome the limitations of object orientation, many patterns and paradigms emerged. Recently, functional programming is making a comeback, not to replace object orientation, but to complement it. Reactive programming is a functional event-driven programming approach that is used in combination with the regular object-oriented paradigm.

A few years ago, Microsoft created a reactive programming framework for .NET called Reactive eXtensions (also called ReactiveX or RX). RX is an API for asynchronous programming with observable streams. This API has been ported to several languages such as JavaScript, Python, C++, and Java.

Let's observe our world for a moment. Observe entities in motion: traffic jams, weather, conversations, and financial markets. Things are moving and evolving concurrently. Multiple things happen at the same time, sometimes independently, sometimes in an orchestrated manner. Each object is creating a stream of events. For instance, your mouse cursor position is moving. The sequence of positions is a stream. The number of people in the room may be stable, but someone can come in or go out, generating a new value. So we have another stream of values. There is a fundamental mantra behind reactive programming: events are data and data are events.

What's important to understand about RX and asynchronous programming is the asynchronous nature of streams. You observe a stream, and you are notified when an item is emitted by the stream. You don't know when that will happen, but you are observing. This observation is done using a subscribe operation.

RxJava is a straightforward implementation of RX for the Java programming language. It is an extremely popular library for reactive programming in Java, with applications in networked data processing and graphical user interfaces with JavaFX and Android. RxJava is the lingua franca for reactive libraries in Java, and it provides the following five types to describe publishers:

  Number of items in the stream RxJava 2 types RX signature Callback signature Future signature
Notification, data flow 0..n Observable, Flowable Observable stream()
Flowable stream()
ReadStream method() N/A
Asynchronous operation producing a result 1 Single Single get() void get(Handler<AsyncResult> handler) Future get()
Asynchronous operation producing no or one result 0..1 Maybe Maybe findById(String id) void get(String id, Handler<AsyncResult> handler) Future get(String id)
Asynchronous operation producing no result 0 Completable Completable flush() void flush(Handler<AsyncResult> handler) Future flush()

The difference between Observable and Flowable is that Flowable handles back-pressure (implementing the reactive streams protocol) while Observable does not. Flowable is better suited for large streams of data coming from a backpressure-supporting source (for example, a TCP connection) while Observable is better suited for handling “hot” observables for which backpressure cannot be applied (for example, GUI events).

This post is not an introduction to reactive programming or RX. If you need an introduction-level class about reactive programming and RX, check out this tutorial.

In the previous post, we used Future to compose asynchronous operations. In this post, we are going to use streams and RxJava. How? Thanks to Vert.x and RxJava 2 APIs. Indeed, Vert.x provides a set of RX-ified APIs. However, don't forget that:

  • You can use RxJava without Vert.x
  • You can use Vert.x without RxJava

Combining them gives you superpowers because it extends the asynchronous execution model from Vert.x with the power of RxJava streams and operators.

Enough Talking; Show Me Some Code

It always starts with a Maven dependency. In your pom.xml file add this:

io.vertx
   vertx-rx-java2
   ${vertx.version}

 

Then, open the io.vertx.intro.first.MyFirstVerticle class and replace the import statements with this:

import io.reactivex.Completable;
import io.reactivex.Single;
import io.vertx.core.Future;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.sql.SQLOptions;
import io.vertx.reactivex.CompletableHelper;
import io.vertx.reactivex.config.ConfigRetriever;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.http.HttpServerResponse;
import io.vertx.reactivex.ext.jdbc.JDBCClient;
import io.vertx.reactivex.ext.sql.SQLConnection;
import io.vertx.reactivex.ext.web.Router;
import io.vertx.reactivex.ext.web.RoutingContext;
import io.vertx.reactivex.ext.web.handler.BodyHandler;
import io.vertx.reactivex.ext.web.handler.StaticHandler;

import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;

Notice the io.vertx.reactivex packages. It's where the Vert.x RX API is implemented. So, instead of extending io.vertx.core.AbstractVerticle, we are now extending io.vertx.reactivex.core.AbstractVerticle. The injected vertx instance proposes new methods starting with the rx prefix such as rxDeployVerticle, or rxClose. Methods prefixed with rx are returning RxJava 2 types such as Single or Completable.

From Returning Future to Returning Single and Completable

To benefit from the RX API and be able to use RX operators, we need to use the RX types. For example, previously we had this:

private Future createHttpServer(JsonObject config, 
  Router router) {
  Future future = Future.future();
  vertx
    .createHttpServer()
    .requestHandler(router::accept)
    .listen(
      config.getInteger("HTTP_PORT", 8080),
      res -> future.handle(res.mapEmpty())
    );
  return future;
}

Future is mapped to Completable in RX, that is, a stream just indicating its completion. So with RX, this code becomes the following:

private Completable createHttpServer(JsonObject config,
  Router router) {
  return vertx
    .createHttpServer()
    .requestHandler(router::accept)
    .rxListen(config.getInteger("HTTP_PORT", 8080))
    .toCompletable();
}

Do you spot the difference? We use the rxListen method returning a Single. Because we don't need the server, we transform it into a Completable using the toCompletable method. The rxListen is available because we used the rx-ified vertx instance.

Let's now rewrite the connect method. connect was returning a Future. This is translated into a Single:

private Single connect() {
  return jdbc.rxGetConnection()
    .map(c -> c.setOptions(
       new SQLOptions().setAutoGeneratedKeys(true)));
}

The jdbc client is also providing an rx API. rxGetConnection returns a Single. To enable the key generation, we use the map method. map takes the result from the observed Single and transforms it using a mapper function. Here we just adapt the options.

Following the same principles, the insert method is rewritten as follows:

private Single

insert(SQLConnection connection, Article article, boolean closeConnection) { String sql = "INSERT INTO Articles (title, url) VALUES (?, ?)"; return connection .rxUpdateWithParams(sql, new JsonArray().add(article.getTitle()).add(article.getUrl())) .map(res -> new Article(res.getKeys().getLong(0), article.getTitle(), article.getUrl())) .doFinally(() -> { if (closeConnection) { connection.close(); } }); }

Here, we execute the INSERT statement using rxUpdateWithParams. The result is transformed into an Article. Notice the doFinally. This method is called when the operation has completed or failed. In both cases, if requested, we close the connection.

The same approach is applied to the query method that uses the rxQuery method:

private Single

query(SQLConnection connection) { return connection.rxQuery("SELECT * FROM articles") .map(rs -> rs.getRows().stream() .map(Article::new) .collect(Collectors.toList()) ) .doFinally(connection::close); }

queryOne needs to throw an error if the searched article is not present:

private Single

queryOne(SQLConnection connection, String id) { String sql = "SELECT * FROM articles WHERE id = ?"; return connection.rxQueryWithParams(sql, new JsonArray().add(Integer.valueOf(id)) ) .doFinally(connection::close) .map(rs -> { List rows = rs.getRows(); if (rows.size() == 0) { throw new NoSuchElementException( "No article with id " + id); } else { JsonObject row = rows.get(0); return new Article(row); } }); }

The exception thrown by the mapper function is propagated to the stream. So the observer can react to it and recover.

Transforming Types

We have already seen the toCompletable method above discarding the result from a Single and just informing the subscriber of the successful completion or failure of the operation. In the update and delete methods, we need to do almost the same thing. We execute SQL statements and if we realize that no rows have been changed by these statements we report an error. To implement this, we are using flatMapCompletable. This method is part of the flatMap family, a very powerful RX operator. This method takes as parameter a function. This function is called for each item emitted by the observed stream. If the stream is a Single, it will be called either zero (error case) or one (operation succeeded with a result) times. Unlike the map operator, flatMap function returns a stream. For example, in our context, the flatMapCompletable function is called with an UpdateResult and returns a Completable:

private Completable update(SQLConnection connection, String id,
  Article article) {
  String sql = "UPDATE articles SET title = ?,
    url = ? WHERE id = ?";
  JsonArray params = new JsonArray().add(article.getTitle())
    .add(article.getUrl())
    .add(Integer.valueOf(id));
  return connection.rxUpdateWithParams(sql, params)
    .flatMapCompletable(ur ->
      ur.getUpdated() == 0 ?
        Completable
            .error(new NoSuchElementException(
                "No article with id " + id))
        : Completable.complete()
    )
    .doFinally(connection::close);
}

private Completable delete(SQLConnection connection, String id) {
  String sql = "DELETE FROM Articles WHERE id = ?";
  JsonArray params = new JsonArray().add(Integer.valueOf(id));
  return connection.rxUpdateWithParams(sql, params)
    .doFinally(connection::close)
    .flatMapCompletable(ur ->
        ur.getUpdated() == 0 ?
          Completable
              .error(new NoSuchElementException(
                  "No article with id " + id))
          : Completable.complete()
    );
}

In both cases, we check the number of updated rows, and, if 0, produce a failing Completable. So the subscriber receives either a success (Completable.complete) or the error (Completable.error). Notice that this code can also use the previous approach: using the map operator, throwing an exception, and discarding the result using toCompletable.

Obviously, we can also transform a Completable to a Single:

private Single createTableIfNeeded(
  SQLConnection connection) {
    return vertx.fileSystem().rxReadFile("tables.sql")
        .map(Buffer::toString)
        .flatMapCompletable(connection::rxExecute)
        .toSingleDefault(connection);
}

rxExecute returns a Completable. But here we need to forward the SQLConnection. Fortunately, the toSingleDefault operator transforms the Completable to a Single emitting the given value.

Composing Asynchronous Actions

So far, we are using rx methods and adapting the result. But how can we deal with sequential composition? Execute a first operation and then execute a second one with the result of the first operation? This can be done using the flatMap operator. As stated above, flatMap is a very powerful operator. It receives a function as parameter, and unlike the map operator, this function returns a stream (so Single, Maybe, Completable...). This function is called for each item from the observed streams, and the returned streams are flattened so the items are serialized into a single stream. Because streams are async constructs, calling flatMap creates a sequential composition. Let's see the createSomeDataIfNone method. The initial implementation is the following:

private Future createSomeDataIfNone(
  SQLConnection connection) {
  Future future = Future.future();
  connection.query("SELECT * FROM Articles", select -> {
    if (select.failed()) {
      future.fail(select.cause());
    } else {
      if (select.result().getResults().isEmpty()) {
        Article article1 = new Article("Fallacies of distributed computing",            "https://en.wikipedia.org/wiki/Fallacies_of_distributed_computing");
        Article article2 = new Article("Reactive Manifesto",
            "https://www.reactivemanifesto.org/");
        Future

insertion1 = insert(connection, article1, false); Future

insertion2 = insert(connection, article2, false); CompositeFuture.all(insertion1, insertion2) .setHandler(r -> future.handle(r.map(connection))); } else { future.complete(connection); } } }); return future; }

In this method, we execute a query and depending on the result we insert articles. The RX implementation is the following:

private Single createSomeDataIfNone(
  SQLConnection c) {
  return c.rxQuery("SELECT * FROM Articles")
    .flatMap(rs -> {
      if (rs.getResults().isEmpty()) {
        Article article1 = new Article("Fallacies of distributed computing",
            "https://en.wikipedia.org/wiki/Fallacies_of_distributed_computing");
        Article article2 = new Article("Reactive Manifesto",
            "https://www.reactivemanifesto.org/");
        return Single.zip(
            insert(connection, article1, false),
            insert(connection, article2, false),
            (a1, a2) -> c
        );
      } else {
          return Single.just(c);
        }
    });
}

First, we execute the query. Then, when we have the result, the function passed to the flatMap method is called, implementing the sequential composition. You may wonder about the error case. We don't need to handle it, because the error is propagated to the stream and the final observer receives it. The function is not called when an error happens.

Asynchronous operations can happen concurrently. But sometimes you need to be aware of when they have completed. This is called parallel composition. The zip operator lets you do this. In the createSomeDataIfNone, we are inserting two articles. This operation is done using insert (returning a Single). The zip operator observes the two given occurrences of Single and calls the method passed as last parameter when both have completed. In this case, we just forward the SQLConnection.

Compositing Everything to Get Ready

We have rewritten most of our functions, but we need to adapt the start method. Remember the start sequence we need to achieve:

// Start sequence:
// 1 - Retrieve the configuration
//  |- 2 - Create the JDBC client
//  |- 3 - Connect to the database (retrieve a connection)
//    |- 4 - Create table if needed
//      |- 5 - Add some data if needed
//        |- 6 - Close connection when done
//    |- 7 - Start HTTP server
//  |- 9 - we are done!

This composition can be implemented using the flatMap operator:

retriever.rxGetConfig()
  .doOnSuccess(config ->
    jdbc = JDBCClient.createShared(vertx, config, 
      "My-Reading-List"))
  .flatMap(config ->
    connect()
      .flatMap(connection ->
          this.createTableIfNeeded(connection)
              .flatMap(this::createSomeDataIfNone)
              .doAfterTerminate(connection::close)
      )
      .map(x -> config)
  )
  .flatMapCompletable(c -> createHttpServer(c, router))
  .subscribe(CompletableHelper.toObserver(fut));

The doOnSuccess is an action operator that receives the item from the observed stream, and lets you implement a side-effect. Here we assign the jdbc field.

Then we just use the flatMap operator to orchestrate our different actions. Look at the doAfterTerminate. This operator lets us close the connection when the full stream is consumed, which is very useful for cleanup.

There is an important part in this code. So far, we returned RX types, but never called subscribe. If you don't subscribe, nothing will happen: streams are lazy. So never forget to subscribe. The subscription materializes the pipeline and triggers the emissions. In our code, it triggers the start sequence. The parameter passed to the subscribe method is just reporting failure and success to the Future object passed to the start method. Basically it maps a Future to a Subscriber.

Implementing the HTTP Actions

We are almost done. We just need to update our HTTP actions, the method called on HTTP requests. To simplify the code, let's modify the ActionHelper class. This class provides methods returning Handler<AsyncResult>. But this type is not great with RX APIs where we need subscribers. Let's replace these methods with methods returning more-adequate types:

private static  BiConsumer writeJsonResponse(
  RoutingContext context, int status) {
  return (res, err) -> {
    if (err != null) {
      if (err instanceof NoSuchElementException) {
        context.response().setStatusCode(404)
          .end(err.getMessage());
      } else {
        context.fail(err);
      }
    } else {
      context.response().setStatusCode(status)
        .putHeader("content-type", 
            "application/json; charset=utf-8")
        .end(Json.encodePrettily(res));
    }
  };
}

static  BiConsumer; ok(RoutingContext rc) {
  return writeJsonResponse(rc, 200);
}

static  BiConsumer created(RoutingContext rc) {
  return writeJsonResponse(rc, 201);
}

static Action noContent(RoutingContext rc) {
  return () -> rc.response().setStatusCode(204).end();
}

static Consumer onError(RoutingContext rc) {
  return err -> {
      if (err instanceof NoSuchElementException) {
          rc.response().setStatusCode(404)
           .end(err.getMessage());
      } else {
          rc.fail(err);
      }
  };
}

Now we are ready to implement our HTTP action method. Back to the MyFirstVerticle class: replace the action methods with this:

private void getAll(RoutingContext rc) {
  connect()
      .flatMap(this::query)
      .subscribe(ok(rc));
}

private void addOne(RoutingContext rc) {
  Article article = rc.getBodyAsJson()
    .mapTo(Article.class);
  connect()
      .flatMap(c -> insert(c, article, true))
      .subscribe(created(rc));
}


private void deleteOne(RoutingContext rc) {
  String id = rc.pathParam("id");
  connect()
      .flatMapCompletable(c -> delete(c, id))
      .subscribe(noContent(rc), onError(rc));
}


private void getOne(RoutingContext rc) {
  String id = rc.pathParam("id");
  connect()
      .flatMap(connection -> queryOne(connection, id))
      .subscribe(ok(rc));
}

private void updateOne(RoutingContext rc) {
  String id = rc.request().getParam("id");
  Article article = rc.getBodyAsJson()
    .mapTo(Article.class);
  connect()
      .flatMapCompletable(c -> update(c, id, article))
      .subscribe(noContent(rc), onError(rc));
}

As you can see, these methods are implemented using the operators we saw before. They contain subscribe calls that write the HTTP response. It's as simple as that...

Conclusion

We are done! In this post, we have adapted our code to use reactive programming and RxJava 2. The combination of Vert.x and RxJava brings your reactiveness to another level. You can compose and process asynchronous operations and streams very easily.

Now, don't forget that nothing is free. RX can be hard to understand. It may look weird. And depending on your background, you may prefer Future and callbacks. Vert.x offers you choices, and you are free to choose the model you prefer.

If you want to go further, here are some resources:

The next post in this series will cover the deployment of our application on Kubernetes and OpenShift.

Stay tuned, and happy coding!

Last updated: January 12, 2024