<< December 29, 2014 | Home | December 31, 2014 >>

A reactive and performant Spray + Akka solution to "Playing with concurrency and performance in Java and Node.js"

In my previous post I examined a fictitious trading engine and compared a Java based blocking solution to a Node.js based non-blocking solution. At the end of the post I wrote that:

I suspect that following the recent success of Node.js, more and more asynchronous Java libraries will start to appear.

Well such libraries already exist, for example: Akka, Spray, and this Mysql async driver.

I set myself the challenge of creating a non-blocking Java based solution using exactly those libraries, so that I could compare its performance to that of the Node.js solution created for the last article. The first thing you might have noticed is that these are all Scala based libraries, but I wrote this solution in Java even though it is a little less syntactically elegant. In the last article I introduced a solution based upon Akka whereby the trading engine was wrapped in an actor. Here, I have dropped Tomcat as the HTTP server and replaced it with Spray, which neatly integrates the HTTP server straight into Akka. In theory this should make no difference to performance, because Spray is NIO just as Tomcat 8 is, out of the box. But what attracted me to this solution was that overall, the number of threads is greatly reduced, as Spray, Akka and the async Mysql library all use the same execution context. Running on my Windows development machine, Tomcat has over 30 threads compared to just a few over 10 for the solution built here, or compared to Websphere or JBoss where there are hundreds of threads. The execution context is basically a pool of threads which run tasks that are given to it. Since all the libraries used in the solution presented here were non-blocking, the number of threads can be kept low and close to the theoretical optimum, so that as little context switching takes place as possible, making the process run efficiently.

The code written for this article is on GitHub. The first part of the program is the main method which starts up Spray and Akka:


Line 1 creates an actor system which is public so that I can access it from elsewhere, since it is used to access the single execution context which I want to use throughout the program. (In code where maintainability is an issue I would write something so that this object could be injected into the relevant parts of the program.) Line 5 then uses the system to instantiate an actor which is used to handle all HTTP requests for purchase and sales orders. Lines 7-11 just set up configuration data for the server. Lines 12 and 13 are where we then take the configuration and our actor and tell Akka IO use them and the HTTP module to send all HTTP requests as messages to our actor from line 5. Lines 15-17 are where I effectively setup a timer task which fires every 5 seconds to output some statistics. The important part here is to notice that I am not using Java's Timer to schedule the task since that just adds more unnecessary threads to my process. Instead I use the same execution context as Akka, so as few threads as possible are created.

Next is the actor for handling the HTTP requests:


Line 3 shows an example of how integrating Scala in a Java program can be ugly, but how you can sometimes hide away those ugly parts by adding your own abstractions. The HTTP actor which responds to HTTP requests has 3 jobs. The first job, on line 6, is where it creates a router which I shall describe below, and which it can use to delegate work to. The second job is to handle all new connections on lines 24-25 which tells Spray that this actor will also handle the actual requests and not only the connections. The third job this actor has is shown on lines 9-18 where the actor takes an HTTP request and delegates (routes) some work to another actor in the system.

This actor knows the HTTP model but the HTTP abstraction doesn't leak into the next layer of the system. Instead, the actor passes domain objects (or value objects or case classes or similar) onto the actors which encapsulate the trading engines. The construction of such a domain objects can be seen on lines 15 and 16, using data extracted from the HTTP request, e.g. on line 13, or out of say a JSON object in the request body. Spray contains useful directives which can help you extract the data from the request, and abstract a little away from HTTP, if that is what you want. Which domain object to construct depend on the REST-like interface which I have built and is handled on lines 9, 12 and 19. Had I used Scala, I could have written more elegant code using pattern matching on the HttpRequest object. The domain object is passed onto the trading engine by getting the router from line 6 to route the domain object to a suitable actor, on line 17. Last, but not least, line 18 is where the sales order request is acknowledged in an HTTP response which passes a JSON object back to the consumer, together with the unique ID assigned to the order, so that its status can be queried later (it gets persisted into the sales objects).

The next snippet shows how we partition the market and create a number of actors to handle requests in parallel.


This code is similar to what we did in the last article. In order to scale out and use more than one core concurrently, the market is partitioned by product ID and each trading engine runs concurrently for a different market partition. In the solution presented here, an EngineActor is created per partition and wrapped in a Routee on line 10. A map of actors keyed by product ID is also filled on line 14. The router is built using the routees and the map on line 19 and it is this which the HttpActor uses in the previous snippet, when delegating work. Note also line 17, which starts the trading engine contained in the EngineActor, so that it is up and running, ready to trade purchase and sales orders, when they are passed to these actors.

The EngineActor class isn't shown here explicitly since it is almost identical to the actors used in the last article, and it simply encapsulate a trading engine which handles all products from a particular market partition. Line 19 above uses a RoutingLogic to build the router, which is shown below:


The select(...) method on line 10 is called by the router when it receives an object which it must route to the correct actor. Using the map created in the previous listing, and the product ID obtained from the request, it is easy to find the actor which contains the trading engine responsible for the relevant market partition. By returning the routee which wraps that actor, Akka will pass the order object on to the correct EngineActor, which then puts the data into the model when that message is handled at a time when the trading engine is between trading cycles and the actor next checks its inbox.

OK, so that is the front end dealt with. The second major change that was required to the solution from the previous article, was the design of the method which persists sales after trading takes place. In the Java based solution I was synchronously iterating over each sale and sending an insert statement to the database and only processing the next sale once the database had replied. With the solution presented here, I chose to process the sales in parallel by fire off an insert request to the database and immediately moving to the next sale and doing the same. The responses were handled asynchronously within the execution context using a callback which I provided. I wrote the program to wait for the last insert to be acknowledged before trading continued with newly created purchase and sales orders which had arrived since the last trading session had started. This is shown in the following listing:


The persistSales(...) method is called by the trading engine after each trading cycle, and is passed a list of sales made during that trading cycle, and a callback function to be called once all the persistence is complete. If nothing was sold, then line 38 calls the callback immediately. Otherwise, a counter is created on line 5 which is initialised with the number of sales to be persisted. Each sale is persisted asynchronously on lines 7-15. Note how a Future is returned on line 15 and how we use another callback on lines 16-35 to handle completion of the future - there is no blocking done here, waiting for the future to complete! The above mentioned counter is decremented on line 25, once the sale is persisted, and once all sales are persisted, the callback passed into the persistSales(...) method is called. Note that the class JFunction1 used on line 16 is a shim allowing easier integration of Scala - the code is on GitHub at the link given above. Lines 21 and 22 show that I had a little problem with the async Mysql library that I used. It is still a beta, and doesn't seem to have a way to get hold of the generated (autoincrement) primary key of the sale. Note also line 35, where I pass in the execution context which Akka is using, so that the Future which handles completion of the insert statement is processed on one of the existing threads, rather than some new thread - again, keeping the total number of threads as low as possible.

This listing also shows an interesting problem, namely that the thread which calls the database to insert the data is not necessarily the same thread which might need to close the connection [1]. In normal Java EE and Spring there is often use of thread local storage (also see here). If you called through to a bean from the function handling the completion of the future, resources which are injected into it may not work, because the container cannot work out what the context is. Scala solves this problem using implicit parameters, which are passed into methods under the hood.

The listing above uses the PersistenceComplete callback, which is shown below on lines 14-16. It also uses a connection pool which is created using the following code. Yet again, the execution context which Akka uses is passed over to the async Mysql library, on line 10 below. Line 10 below also shows a non-default pool configuration where I allow a maximum queue size of up to a thousand. During load testing I was getting a lot of errors indicating that the pool was saturated, and increasing this value solved the problem.


The callback passed into persistSales(...) is shown in the next listing. The following code is hardly different from the original shown in the last article, except that it is now asynchronous in style. It is called once all sales are persisted and only then does the callback send a message (via its event listener) to the actor, on line 14 below. That message will normally be at the back of the inbox after a load of new purchase and sales orders. Each of those messages will be processed, leading to the trading engine model being updated with the new orders, before trading is recommenced.


The final code listing is the modification to the Node.js solution which was made so that it too would persist sales in parallel, rather than one after the other, as was the case in the last article.


Line 5 fetches a connection from the pool and the same connection is reused for all sales, "in parallel", and only released, i.e. returned to the pool, once the last sale is persisted, on line 19.

So, yet again, it's time to compare the solutions via some load tests. This time I chose to see what maximum rate of sales I could achieve with each of the following three solutions:

  • Case 1 - The solution presented here, namely Spray + Akka + the async Mysql driver,
  • Case 2 - The modified Node.js solution using persistence in parallel,
  • Case 3 - The original Tomcat non-blocking connector, but with synchronous persistence.

The cases were run using the hardware from the last article, with the trading engines running on the fast hardware and the database on the slow hardware, because that was the best setup to show how blocking I/O causes performance problems. For each case, there were three variables which I could adjust while tuning. These were:

  • Number of trading engines (either as actors or as child processes),
  • Time waited by client between calls to the server,
  • Number of concurrent clients.

The last two basically tuned the number of requests per second, since the connections were not kept open awaiting the trading results (see previous article). The results were as follows, with the best performance shown in bold.

Case 1 - Spray + Akka + async Mysql driver
# trading engines client wait time between calls concurrent clients sales per minute approx. CPU on trading hardware
8 100ms 60 42,810 25-35%
8 80ms 70 62,392 25-35%
8 60ms 80 75,600 30-40%
8 40ms 90 59,217 30-50%
10 60ms 80 too many DB connection problems
5 60ms 60 67,398 25-35%
6 60ms 80 79,536 25-35%

 

Case 2 - Node.js with persistence in parallel
# trading engines client wait time between calls concurrent clients sales per minute approx. CPU on trading hardware
8 200ms 30 6,684 40-50%
8 100ms 60 started to lag behind
8 100ms 40 17,058 25-35%
8 100ms 50 started to lag behind
12 100ms 50 20,808 45-60%
16 100ms 60 24,960 45-65%
20 100ms 80 32,718 45-70%
25 60ms 80 51,234 75-85%
30 50ms 80 22,026 75-85%
25 10ms 70 17,604 75-90%

 

Case 3 - Tomcat 8 NIO, with synchronous blocking persistence
# trading engines client wait time between calls concurrent clients sales per minute approx. CPU on trading hardware
4 200ms 30 9,586 5%
4 150ms 30 10,221 5%
8 200ms 30 9,510 5%


The results show that bolting a NIO connector onto Tomcat and thinking that you are non-blocking and performant is dangerous, as that solution underperformed by a factor of nearly 8 compared to the Akka solution. The results also show that by using non-blocking libraries and writing a non-blocking solution in Java, it is possible to create very performant solution in comparison to Node.js. Not only was the Java solution capable of some 50% throughput, it used less than half the CPU doing so.

Very important: please note that this is a result particular to the algorithms used here and my architecture, design and implementation. It is also dependent on using "non-standard" Java libraries, and indeed, the Mysql library I used was missing functionality, for example reading generated primary keys out of the result of an insert. Please do your own experiments for your use cases before drawing conclusions on relative performance of Java vs. Scala vs. Node.js!

A noteworthy point when comparing the variation of the number of trading engines: in Node.js it directly controlled the number of child processes, analagous to the number of threads; in the Akka solution it had no effect whatsoever on the number of threads in the system - that number stayed constant! In Akka solutions, varying the number of actors has an effect on the number of messages in their inboxes.

Further information pertaining to the use of Akka and Spray can be found at this good video. Please take the time to also quickly read up about the reactive manifesto. The Akka solution presented here is reactive because it is responsive (highest throughput of all three cases), resilient (Akka provides easy ways to deal with failure, although none were necessary here), elastic (it automatically scales out because Akka manages the thread pool size in the execution context and it scales up because Akka provides transparent location of actors), and it is message driven (due to using the actor model).

[1] The Mysql library used here doesn't require that the connection be closed and returned to the pool, as e.g. Apache database pool does. Doing so in fact causes problems! Leaving it open causes no problems, as proven by the load tests which I ran.

Copyright © 2014, Ant Kutschera

Social Bookmarks :  Add this post to Slashdot    Add this post to Digg    Add this post to Reddit    Add this post to Delicious    Add this post to Stumble it    Add this post to Google    Add this post to Technorati    Add this post to Bloglines    Add this post to Facebook    Add this post to Furl    Add this post to Windows Live    Add this post to Yahoo!