Create a simpe RESTful service with vert.x 2.0, RxJava and mongoDB

18 minute read

A new article after an hiatus of almost half a year. In this article we’ll have quick look at how you can get started with vert.x and more interestingly how you can use RxJava to make programming asynchronous systems a lot easier. We’ll cover the following subjects:

  • Create an empty vert.x project using maven
  • Import in IntelliJ and create a simple HTTP server
  • Load data from mongoDB using the vert.x mongoDB persistor module
  • Expose the zips through a REST interface
  • Replace the callbacks with RxJava observers

The first thing to do is very simple, we just use a standard Maven archetype to create a vert.x project. (note the complete final example can be downloaded from github: https://github.com/josdirksen/smartjava/tree/master/vertx-demo-1)

Create an empty vert.x project using maven

Go to the directory where you want to create your vert.x project, type the following and press enter:


jos@Joss-MacBook-Pro.local:~/Dev/playground$ mvn archetype:generate -Dfilter=io.vertx:

This shows all the available io.vertx archetypes (in this case only 1)


[INFO] Scanning for projects...
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom >>>
[INFO] 
[INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom <<<
[INFO] 
[INFO] --- maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Interactive mode
[INFO] No archetype defined. Using maven-archetype-quickstart (org.apache.maven.archetypes:maven-archetype-quickstart:1.0)
Choose archetype:
1: remote -> io.vertx:vertx-maven-archetype (-)
Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): : 

Since there is only one, just enter ‘1’ and press enter. Next it will show you the versions you can choose. For this example I’ve selected the 2.0.1-final version.


Choose io.vertx:vertx-maven-archetype version: 
1: 1.0.0-beta1
2: 1.0.0-beta2
3: 1.0.0-beta3
4: 1.0.0-CR1
5: 1.0.0-CR2
6: 2.0.0-final
7: 2.0.1-final
Choose a number: 7:

Enter ‘7’ and press enter. The next steps allow you to define the name and version of your project:


Define value for property 'groupId': : org.smartjava
Define value for property 'artifactId': : vertx-demo-1
Define value for property 'version':  1.0-SNAPSHOT: : 
Define value for property 'package':  org.smartjava: : 
Confirm properties configuration:
groupId: org.smartjava
artifactId: vertx-demo-1
version: 1.0-SNAPSHOT
package: org.smartjava
 Y: : Y

Enter the values you can see above (or use your own ones) and finally enter ‘Y’ to confirm your choices. Now a project will be created:


[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: vertx-maven-archetype:2.0.1-final
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.smartjava
[INFO] Parameter: artifactId, Value: vertx-demo-1
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: org.smartjava
[INFO] Parameter: packageInPathFormat, Value: org/smartjava
[INFO] Parameter: package, Value: org.smartjava
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: groupId, Value: org.smartjava
[INFO] Parameter: artifactId, Value: vertx-demo-1
[INFO] project created from Archetype in dir: /Users/jos/Dev/playground/vertx-demo-1
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5:37.710s
[INFO] Finished at: Sun Nov 24 14:55:12 CET 2013
[INFO] Final Memory: 9M/24M
[INFO] ------------------------------------------------------------------------

To test whether everything went correct, just go to the directory that was just created, and run ‘mvn install’. this will download all the required libraries, run some tests and install your project to your local maven repo. Now that we’ve got a maven project, we can load it up into our favourite IDE. In my case I use IntelliJ, but Eclipse works in pretty much the same manner.

Import in IntelliJ and create a simple HTTP server

Fire up IntelliJ and select ‘File->Import Project’, navigate to the directory that was created by maven and import the project.

Select File or Directory to Import.png

Just hit ‘next’ on all the questions and you’ve got yourself a project inside IntelliJ. If you create a project based on this archetype you automatically get a number of verticles you can experiment with. A couple of those are defined in groovy. IntelliJ automatically tries to compile those, but since it can’t find a groovy compiler the compile/make process fails. In this example we’ll first focus on the Java part of vert.x so just remove the .groovy files from the ‘src/main/resources’ and the ‘test/resources/integration_tests/groovy’ directory.

We could now run vert.x with the provided handlers directly through maven by installing the module using maven and then calling the ‘vertx:runModIDEA’ goal. Note that you do need to first call ‘mvn:compile’ to see your changes. If you don’t want to use maven to run your project from the IDE, you can also use a different approach, where you use the ‘org.vertx.java.platform.impl.cli.Starter’ class to launch vert.x directly from the IDE. In IntelliJ you create the following launch configuration for this:

Run_Debug Configurations-1.png

If you run this you’ll still see an error. Something like this:


Exception in thread "main" java.lang.ClassNotFoundException: org.vertx.java.platform.impl.cli.Starter
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:190)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:113)

The reason you see this, is because in the pom.xml created by the vert.x archetype, the vert.x libraries are specified as ‘provided’. As a quick fix, open the pom.xml and change the scope of the three io.vertx dependencies from ‘provided’ to ‘compile’. Now, when you run this launcher from IntelliJ, vert.x will launch correctly.


/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/bin/java -Didea.launcher.port=7543 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA 12.app/bin" -Dfile.encoding=UTF-8 -classpath "..." com.intellij.rt.execution.application.AppMain org.vertx.java.platform.impl.cli.Starter runmod org.smartjava~vertx-demo-1~1.0-SNAPSHOT
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Nov 24, 2013 3:43:26 PM org.vertx.java.core.logging.impl.JULLogDelegate info
INFO: Module org.smartjava~vertx-demo-1~1.0-SNAPSHOT successfully installed
Nov 24, 2013 3:43:26 PM org.vertx.java.core.logging.impl.JULLogDelegate info
INFO: PingVerticle started
Nov 24, 2013 3:43:26 PM org.vertx.java.core.logging.impl.JULLogDelegate info
INFO: Succeeded in deploying module

Now that we’ve got a project setup in IntelliJ and can easy run it directly from the IDE (and restart it with ctrl-F5), lets start creating a simple HTTP server so that we can see some output from the browser to make testing easier (note that there are much better ways of testing vert.x and verticles than I’m showing here, but that’s something for a different article). Open up the PingVerticle.java file and replace the start method with the following code:

package org.smartjava;

import org.vertx.java.core.Handler;
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.platform.Verticle;

public class PingVerticle extends Verticle {

  public void start() {

    vertx.createHttpServer().requestHandler(new Handler<HttpServerRequest>() {
        @Override
        public void handle(HttpServerRequest httpServerRequest) {
            httpServerRequest.response().end("Hello smartjava");
        }
    }).listen(8888);


    container.logger().info("Webserver started, listening on port: 8888");

  }
}

Run this, and open a browser to localhost:8888, and you’ll see the following.

localhost_8888.png

That’s a webserver you’ve created in vert.x and ran directly from your IDE. Piece of cake so far. Now lets get some data to play around with.

Load data from mongoDB using the vert.x mongoDB persistor module

I won’t dive into how you install mongoDB, there are articles enough on the internet explaining that. If you’ve run on a Mac and have macports installed you can simply use the following command line to install mongoDB:


 sudo port install mongodb

In the rest of this article I assume you’ve got mongoDB installed and it’s command line utils are available from the console. The first thing we need to do is get some data to play around with. For this example we’ll use the list of zip codes you can download from the mongoDB site: http://media.mongodb.org/zips.json. Download this file, open a console and run the following to first start mongoDB and then import this list of zips into mongoDB.


jos@Joss-MacBook-Pro.local:~/Dev/playground/vertx-demo-1$ mkdir data
jos@Joss-MacBook-Pro.local:~/Dev/playground/vertx-demo-1$ mongod --dbpath ./data/
Sun Nov 24 16:23:51.765 [initandlisten] MongoDB starting : pid=77755 port=27017 dbpath=./data/ 64-bit host=Joss-MacBook-Pro.local
Sun Nov 24 16:23:51.765 [initandlisten] db version v2.4.5
Sun Nov 24 16:23:51.765 [initandlisten] git version: nogitversion
Sun Nov 24 16:23:51.765 [initandlisten] build info: Darwin Joss-MacBook-Pro.local 12.4.0 Darwin Kernel Version 12.4.0: Wed May  1 17:57:12 PDT 2013; root:xnu-2050.24.15~1/RELEASE_X86_64 x86_64 BOOST_LIB_VERSION=1_54
Sun Nov 24 16:23:51.765 [initandlisten] allocator: tcmalloc
Sun Nov 24 16:23:51.765 [initandlisten] options: { dbpath: "./data/" }
Sun Nov 24 16:23:51.766 [initandlisten] journal dir=./data/journal
Sun Nov 24 16:23:51.766 [initandlisten] recover : no journal files present, no recovery needed
Sun Nov 24 16:23:51.779 [FileAllocator] allocating new datafile ./data/local.ns, filling with zeroes...
Sun Nov 24 16:23:51.779 [FileAllocator] creating directory ./data/_tmp
Sun Nov 24 16:23:51.812 [FileAllocator] done allocating datafile ./data/local.ns, size: 16MB,  took 0.031 secs
Sun Nov 24 16:23:51.853 [FileAllocator] allocating new datafile ./data/local.0, filling with zeroes...
Sun Nov 24 16:23:52.254 [FileAllocator] done allocating datafile ./data/local.0, size: 64MB,  took 0.4 secs
Sun Nov 24 16:23:52.260 [initandlisten] command local.$cmd command: { create: "startup_log", size: 10485760, capped: true } ntoreturn:1 keyUpdates:0  reslen:37 480ms
Sun Nov 24 16:23:52.260 [initandlisten] waiting for connections on port 27017
Sun Nov 24 16:23:52.260 [websvr] admin web console waiting for connections on port 28017

Now we can use mongoImport to import the downloaded zip codes:


jos@Joss-MacBook-Pro.local:~/Dev/playground/vertx-demo-1$ wget http://media.mongodb.org/zips.json
--2013-11-24 16:25:45--  http://media.mongodb.org/zips.json
Resolving media.mongodb.org... 54.230.131.14, 54.230.131.51, 54.230.128.129, ...
Connecting to media.mongodb.org|54.230.131.14|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2871006 (2.7M) [application/json]
Saving to: `zips.json'

100%[======================================>] 2,871,006   2.20M/s   in 1.2s    

2013-11-24 16:25:47 (2.20 MB/s) - `zips.json' saved [2871006/2871006]

jos@Joss-MacBook-Pro.local:~/Dev/playground/vertx-demo-1$ mongoimport --db vertx --collection zips --file ./zips.json 
connected to: 127.0.0.1
Sun Nov 24 16:26:28.337 check 9 29470
Sun Nov 24 16:26:28.458 imported 29470 objects
jos@Joss-MacBook-Pro.local:~/Dev/playground/vertx-demo-1$ 

If you’ve installed the mongoDB plugin in IntelliJ you can easily test whether it is working:

PingVerticle.java - [vertx-demo-1] - vertx-demo-1 - [~_Dev_playground_vertx-demo-1].png

At this point we only need to call the mongoDB instance from vert.x and load the data. For this we’ll use the mongodb persistor library. First we need to add this module to the maven build (note that this is mainly for when we want to debug, internally, vert.x resolves this module itself):

      <dependency>
          <groupId>io.vertx</groupId>
          <artifactId>mod-mongo-persistor</artifactId>
          <version>2.1.0-SNAPSHOT</version>
          <scope>compile</scope>
      </dependency>

Vert.x has a very nice and interesting module system (also something for a different article), to be able to use this mongo-persistor we first need to deploy it as a module. This is actually pretty easy to do:

// load the general config object, loaded by using -config on command line
JsonObject appConfig = container.config();

// deploy the mongo-persistor module, which we'll use for persistence
container.deployModule("io.vertx~mod-mongo-persistor~2.1.0-SNAPSHOT", appConfig.getObject("mongo-persistor"));

What we do here, is we load in the configuration for this module, and then call deployModule, with the name of the module, and the relevant part of the configuration. First off, lets look at the configuration we use for this:


{
    "mongo-persistor" : {
        "address": "mongodb-persistor",
        "host": "localhost",
        "port": 27017,
        "pool_size": 10,
        "db_name": "vertx"
    }
}

Nothing to difficult. We just point the mongo-persister unit to our mongoDB instance. The one question you might have is, how do we get this file inside vert.x. For that we just have to make a small change to our launcher and change the program arguments from:


runmod org.smartjava~vertx-demo-1~1.0-SNAPSHOT

to this:


runmod org.smartjava~vertx-demo-1~1.0-SNAPSHOT -conf src/main/resources/config.json

The config.json file, contains the configuration we just showed. So with this setup we’ve got the mongodb-persistor library listening on eventbus address “mongodb-persistor”. Now all that is left to do, is send messages to this endpoint in a format understood by this module. For this first step we’re just going to search for all the zip codes in the state “AL”. If you look through the documentation of https://github.com/vert-x/mod-mongo-persistor/ you can see that we have tell this module the ‘collection’ we want to search through and the type of ‘action’ we want to use. Depending on the action additional configuration is required. To search for all the zipcodes in the state of ‘AL’ we need to create the following json message:


{
    "action": "find",
    "collection": "zips",
    "matcher": {
        "state": "AL"
    }
} 

Lets change the request handler and look at the complete start method:

 public void start() {

        // load the general config object, loaded by using -config on command line
        JsonObject appConfig = container.config();

        // deploy the mongo-persistor module, which we'll use for persistence
        container.deployModule("io.vertx~mod-mongo-persistor~2.1.0-SNAPSHOT", appConfig.getObject("mongo-persistor"));

        // create and run the server
        vertx.createHttpServer().requestHandler(new Handler<HttpServerRequest>() {
            @Override
            public void handle(final HttpServerRequest httpServerRequest) {

                // we send the response from the mongo query back to the client.
                // first create the query
                JsonObject matcher = new JsonObject().putString("state", "AL");
                JsonObject json = new JsonObject().putString("collection", "zips")
                        .putString("action", "find")
                        .putObject("matcher", matcher);

                // send it over the bus
                vertx.eventBus().send("mongodb-persistor", json, new Handler<Message<JsonObject>>() {

                    @Override
                    public void handle(Message<JsonObject> message) {
                        // send the response back, encoded as string
                        httpServerRequest.response().end(message.body().encodePrettily());
                    }
                });
            }
        }).listen(8888);

        // output that the server is started
        container.logger().info("Webserver started, listening on port: 8888");
    }

Here you can see that we create the correct json message, send it over the bus, and wait with sending the response back until we get a response from mongoDB. We prettify this response and send it back to the client:

localhost_8888-1.png

Expose the zips through a REST interface

Now that we’ve got the basis backend components in place, lets look at what it takes to create a simple REST based frontend. We’ll skip the mediatype specific filtering (I’ll add that to a later article), for now we’ll just look at the HTTP verbs and the urls. For this part we want to support the following REST calls:


* GET /zips
 Show all the zipcode information that are stored in mongoDB
* GET /zips/:id
 Show the information belonging to the specified zip code
* GET /zips?state=:state&city=:city
 Simple search service, where you can search for zip codes per city or state

* POST /zips/:id
 Update existing zip code information

Very simple, but the main goal here is to show how it is done, not how you can create a full RESTful service. To handle these various URLs and verbs, vert.x provides a route matcher: (method bodies left out for clarity)

        RouteMatcher matcher = new RouteMatcher();

        // the matcher for the complete list and the search
        matcher.get("/zips", new Handler<HttpServerRequest>() {...}

        // the matcher for a specific id
        matcher.get("/zips/:id", new Handler<HttpServerRequest>() {...}

        // the matcher for the update
        matcher.post("/zips/:id", new Handler<HttpServerRequest>() {...}
   
        vertx.createHttpServer().requestHandler(matcher).listen(8888);

For those of you who’ve worked with libraries such as sinatra or scalatra, this’ll look familiar. We define the method we want to process (get and post in this case), the url we’re interested in and the handler that will be called when a request is received. As you can see in the last line, we pass in this handler to process the requests for the server we created.

Now lets have a quick look at the implementation of these handlers. This is where we create the mongo-persistor messages that communicate with mongoDB for us. I won’t go into too much detail of these methods since they are pretty much self explanatory:

        // the matcher for the complete list and the search
        matcher.get("/zips", new Handler<HttpServerRequest>() {
            public void handle(final HttpServerRequest req) {

                JsonObject json = new JsonObject();
                MultiMap params = req.params();

                if (params.size() > 0 && params.contains("state") || params.contains("city")) {
                    // create the matcher configuration
                    JsonObject matcher = new JsonObject();
                    if (params.contains("state")) matcher.putString("state", params.get("state"));
                    if (params.contains("city")) matcher.putString("city", params.get("city"));

                    // create the message for the mongo-persistor verticle
                    json = new JsonObject().putString("collection", "zips")
                            .putString("action", "find")
                            .putObject("matcher", matcher);

                } else {
                    // create the query
                    json = new JsonObject().putString("collection", "zips")
                            .putString("action", "find")
                            .putObject("matcher", new JsonObject());
                }

                JsonObject data = new JsonObject();
                data.putArray("results", new JsonArray());
                // and call the event we want to use
                vertx.eventBus().send("mongodb-persistor", json, new ReplyHandler(req, data));
            }
        });

In this method we retrieve all the zipcodes from mongoDB. since mongo-persistor doesn’t return everything we have to iterate over the response. We do this using the following ReplyHandler:

    private static class ReplyHandler implements Handler<Message<JsonObject>> {

        private final HttpServerRequest request;
        private JsonObject data;

        private ReplyHandler(final HttpServerRequest request, JsonObject data) {
            this.request = request;
            this.data = data;
        }

        @Override
        public void handle(Message<JsonObject> event) {
            // if the response contains more message, we need to get the rest
            if (event.body().getString("status").equals("more-exist")) {
                JsonArray results = event.body().getArray("results");

                for (Object el : results) {
                    data.getArray("results").add(el);
                }

                event.reply(new JsonObject(), new ReplyHandler(request, data));
            } else {

                JsonArray results = event.body().getArray("results");
                for (Object el : results) {
                    data.getArray("results").add(el);
                }

                request.response().putHeader("Content-Type", "application/json");
                request.response().end(data.encodePrettily());
            }
        }
    }

In this replyHandler we just walk through the results and keep asking for more content until we don’t see the status “more-exist” anymore. I’ll skip the handler where we just retrieve a single zip code, since it isn’t that interesting. The next handler processes the post function with which we update an existing element.

        matcher.post("/zips/:id", new Handler<HttpServerRequest>() {
            public void handle(final HttpServerRequest req) {

                // process the body
                req.bodyHandler(new Handler<Buffer>() {

                    @Override
                    public void handle(Buffer event) {
                        // normally we'd validate the input, for now just assume it is correct.
                        final String body = event.getString(0,event.length());

                        // create the query
                        JsonObject newObject = new JsonObject(body);
                        JsonObject matcher = new JsonObject().putString("_id", req.params().get("id"));
                        JsonObject json = new JsonObject().putString("collection", "zips")
                                .putString("action", "update")
                                .putObject("criteria", matcher)
                                .putBoolean("upsert", false)
                                .putBoolean("multi",false)
                                .putObject("objNew",newObject);

                        // and call the event we want to use
                        vertx.eventBus().send("mongodb-persistor", json, new Handler<Message<JsonObject>>() {
                            @Override
                            public void handle(Message<JsonObject> event) {
                                // we could handle the errors here, but for now
                                // assume everything went ok, and return the original
                                // and updated json
                                req.response().end(body);
                            }
                        });
                    }
                });
            }
        });

The code in itself isn’t that complex. We first use a handler to process the request, from this handler we create a new handler that is used to get the body of the request and finally a handler is created that updates the database and sends a response back. Even though it isn’t complex, it gets a bit cumbersome and hard to read when more and more handlers are involved. So in the last section of this article we’ll have a look at how you can replace the nested handlers using rxjava provided functionality.

Replace the callbacks with RxJava observers

For the rxjava code we’ll just add a couple of extra handlers that match a different url. So instead of /zips/90210 the url will be /rxzips/90210. To start first add the following dependency to your maven configuration:


      <dependency>
          <groupId>io.vertx</groupId>
          <artifactId>mod-rxjava</artifactId>
          <version>1.0.0-beta2-SNAPSHOT</version>
          <scope>compile</scope>
      </dependency>

Before diving into how rxjava and vert.x can be used together a quick quote (from the rxjava site) that explains why this is useful:

Java Futures are straightforward to use for a single level of asynchronous execution but they start to add non-trivial complexity when they’re nested. It is difficult to use Futures to optimally compose conditional asynchronous execution flows (or impossible, since latencies of each request vary at runtime). This can be done, of course, but it quickly becomes complicated (and thus error-prone) or it prematurely blocks on Future.get(), which eliminates the benefit of asynchronous execution. RxJava Observables on the other hand are intended for composing flows and sequences of asynchronous data.

With the RxJava Observables it is easy to compose flows and sequences of asynchronous data. If you look through the last couple of code samples from our example you could see why this would be useful. In the last ‘post’ example we had three nested callbacks, with observables it is much easier to compose this and let the code actually tell what is happening.

Now lets extend the post method to do the following:

  1. first get the body
  2. after we have the body, we update the element in the database
  3. next we get the latest version from the database, after the update has succeeded
  4. after we've got the latest version we return this in the response.

If we did this using callbacks we’d probably need four nested levels of callbacks. In rxjava we can do this in the following manner:

        matcher.post("/rxzips/:id", new Handler<HttpServerRequest>() {
            public void handle(final HttpServerRequest req) {
                // first access the buffer as an observable. We do this this way, since
                // we want to keep using the matchhandler and we can't do that with rxHttpServer
                Observable<Buffer> reqDataObservable = RxSupport.toObservable(req);

                // after we have the body, we update the element in the database
                Observable<RxMessage<JsonObject>> updateObservable = reqDataObservable.flatMap(new Func1<Buffer, Observable<RxMessage<JsonObject>>>() {
                    @Override
                    public Observable<RxMessage<JsonObject>> call(Buffer buffer) {
                        System.out.println("buffer = " + buffer);
                        // create the message
                        JsonObject newObject = new JsonObject(buffer.getString(0, buffer.length()));
                        JsonObject matcher = new JsonObject().putString("_id", req.params().get("id"));
                        JsonObject json = new JsonObject().putString("collection", "zips")
                                .putString("action", "update")
                                .putObject("criteria", matcher)
                                .putBoolean("upsert", false)
                                .putBoolean("multi", false)
                                .putObject("objNew", newObject);

                        // and return an observable
                        return rxEventBus.send("mongodb-persistor", json);
                    }
                });

                // use the previous input again, so we could see whether the update was successful.
                Observable<RxMessage<JsonObject>> getLatestObservable = updateObservable.flatMap(new Func1<RxMessage<JsonObject>, Observable<RxMessage<JsonObject>>>() {
                    @Override
                    public Observable<RxMessage<JsonObject>> call(RxMessage<JsonObject> jsonObjectRxMessage) {
                        System.out.println("jsonObjectRxMessage = " + jsonObjectRxMessage);
                        // next we get the latest version from the database, after the update has succeeded
                        // this isn't dependent on the previous one. It just has to wait till the previous
                        // one has updated the database, but we could check whether the previous one was successfully
                        JsonObject matcher = new JsonObject().putString("_id", req.params().get("id"));
                        JsonObject json2 = new JsonObject().putString("collection", "zips")
                                .putString("action", "find")
                                .putObject("matcher", matcher);
                        return rxEventBus.send("mongodb-persistor", json2);
                    }
                });

                // after we've got the latest version we return this in the response.
                getLatestObservable.subscribe(new Action1<RxMessage<JsonObject>>() {
                    @Override
                    public void call(RxMessage<JsonObject> jsonObjectRxMessage) {
                        req.response().end(jsonObjectRxMessage.body().encodePrettily());
                    }
                });
            }
        });

Still a big piece of code, but most are comments and caused by the fact that Java doesn’t support closures (yet). So what happens here?

  1. We first create an observer from the request ``` reqDataObservable = RxSupport.toObservable(req)``` . This means that we want to be informed when data is available in the buffer of our request.
  2. Since we want to do something with this data, we use the ``` reqDataObservable.flatMap``` function. This allows us to specify what happens when some data comes available on the previously created observable. So instead of nesting callbacks, we just specify the flow of data through the various asynchronous calls. When data is received we use that to update the database. Note that we use the ``` rxEventBus.send``` method. This doesn't make the call yet, but once again returns an observable.
  3. As the third step we use the output from the previous one to (possibly) determine whether the update was successful. We then get the latest version from the database. This is once again done using observables.
  4. As long as we don't have a subscriber nothing really happens. Since we're interested in the result from the final asynchronous action we use the ``` getLatestObservable.subscribe``` function and 'wait' (it is still non-blocking) for the result from the final database read. Once that is received we send the response back based on the received message.

In this example we’ve only showed a couple of very small parts of rxjava:

  • We used observables to make working and sequencing asynchronous actions easier.
  • We use the flatmap function to pass the result from one asynchronous call into the other, and so eliminating nested callbacks
  • We used the rxSupport and rxEventbus rxJava vert.x extensions to easily create rxJava observables
  • And we kicked off the complete sequence by subscribing to the last observable in the chain

In a future article we’ll dive a bit deeper into rxJava how you can also organize more complex asynchronous flows.

Updated: