<< Previous | Home

Rule Engine for Node.js

Based on my original blog posting, this posting introduces a ported Javascript version of my rule engine, for Node.js.

I have the requirement to use a rule engine. I want something light weight and fairly simple, yet powerful. While there are products out there which are super good, I don't want something with the big learning overhead. And, I fancied writing my own!

Here are my few basic requirements:

  • Use some kind of expression language to write the rules,
  • It should be possible to store rules in a database,
  • Rules need a priority, so that only the best can be fired,
  • It should also be possible to fire all matching rules,
  • Rules should evaluate against one input which can be an object like a tree, containing all the information which rules need to evaluate against
  • Predefined actions which are programmed in the system should be executed when certains rules fire.

So to help clarify those requirements, imagine the following examples:

1) In some forum system, the administrator needs to be able to configure when emails are sent.

Here, I would write some rules like "when the config flag called sendUserEmail is set to true, send an email to the user" and "when the config flag called sendAdministratorEmail is true and the user has posted less than 5 postings, send an email to the administrator".

2) A tarif system needs to be configurable, so that the best tarif can be offered to customers.

For that, I could write rules like these: "when the person is younger than 26 the Youth tarif is applicable", "when the person is older than 59, the Senior tarif is applicable", and "when the person is neither a youth, nor a senior, then they should be given the Default tarif, unless they have had an account for more than 24 months, in which case they should be offered the Loyalty tarif".

3) A train ticket can be considered to be a product. Depending upon the travel request, different products are suitable.

A rule here, could be something like: "if the travel distance is more than 100km and first class is desired, then product A is to be sold."

Finally, a more complex example, involving some iteration of the input, rather than just property evaluation:

4) Timetable software needs to deterimine when students can leave school.

A rule for that might read: "If a class contains any student under age 10, the entire class gets to leave early. Otherwise, they leave at the normal time."

So, with those requirements in mind, as well as wanting to use Node.js and Javascript (rather than the Java which is what the original engine runs on), I set to work porting the original code. It works like this:

1) An engine is configured with some rules.

2) A rule has these attributes:
    - namespace: an engine may contain many rules, but only some may be relevant to a particular call and this namespace can be used for filtering
    - name: a unique name within a namespace
    - expression: a Javascript expression for the rule
    - outcome: a string which the engine might use if this rules expression evaluates to true
    - priority: an integer. The bigger the value, the higher the priority.
    - description: a useful description to aid the management of rules.

3) The engine is given an input object and evaluates all rules (optionally within a namespace) and either:
    a) returns all rules which evaluate to true,
    b) returns the outcome (string) from the rule with the highest priority, out of all rules evaluating to true,
    c) execute an action (defined within the application) which is associated with the outcome of the rule with the highest priority, out of all rules evaluating to true.

4) "Action"s are objects which the application programmer can supply. An action must have a property called 'name'. When the engine is asked to execute an action based on the rules, the name of the action matching the "winning" rule's outcome property is executed.

5) A rule can be built up of "sub-rules". A subrule is only ever used as a building block on which to base more complex rules. When evaluating rules, the engine will never select a subrule to be the best (highest priority) "winning" rule, i.e. one evaluating to true. Subrules make it easier to build complex rules, as I shall show shortly.

So, time for some code!


First, install the library using npm install maxant-rules, passing it the optional --save argument to save the dependency to the package.json file.

In order to use the library, you need to "require" the library, for example by adding the following lines to your script.

var rules = require('maxant-rules');
var Rule = rules.Rule;
var Engine = rules.Engine;
var NoActionFoundException = rules.NoActionFoundException;
var DuplicateNameException = rules.DuplicateNameException;
var SubRule = rules.SubRule;
var ParseException = rules.ParseException;
var NoMatchingRuleFoundException = rules.NoMatchingRuleFoundException;


First, let's look at the code for the tarif system:

 

var r1 = new Rule("YouthTarif", "input.person.age < 26", "YT2011", 3, "ch.maxant.someapp.tarifs");
var r2 = new Rule("SeniorTarif", "input.person.age > 59", "ST2011", 3, "ch.maxant.someapp.tarifs");
var r3 = new Rule("DefaultTarif", "!#YouthTarif && !#SeniorTarif", "DT2011", 3, "ch.maxant.someapp.tarifs");
var r4 = new Rule("LoyaltyTarif", "#DefaultTarif && input.account.ageInMonths > 24", "LT2011", 4, "ch.maxant.someapp.tarifs");
var rules = [r1, r2, r3, r4];

var engine = new Engine(rules);

var request = {person: {name: "p", age: 24}, account: {ageInMonths: 5}};
var tarif = engine.getBestOutcome(request);


So, in the above code, I have added 4 rules to the engine. Then, I created a tarif request object, which is the input object. That object is passed into the engine, when I ask the engine to give me the best outcome. In this case, the best outcome is the string "YT2011", the name of the most suitable tarif for the customer I added to the tarif request.

How does it all work? When the engine is given the rules, it does some validation on them. Notice how the first two rules refer to an object called "input"? That is the object passed into the "getBestOutcome" method on the engine. The engine creates a local reference to the input object named input and then calls eval for each rules expression. Anytime an expression evaluates to "true", the rule is put to the side as a candidate to be the winner. At the end, the candidates are sorted in order of priority, and the outcome property of the rule with the highest priority is returned by the engine.

Notice how the third and fourth rules contain the '#' character. That is not standard Javascript. The engine examines all rules when they are passed to it, and it replaces any token starting with a hash symbol, with the expression found in the rule named the same as the token. It wraps the expression in brackets. The logger outputs the full rule after reference rules have been resolved and replaced, just in case you want to check the rule.

In the above business case, we were only interested in the best tarif for the customer. Equally, we might have been interested in a list of possible tarifs, so that we could offer the customer a choice. In that case, we could have called the "getMatchingRules" method on the engine, which would have returned all rules, sorted by priority. The tarif names are (in this case) the "outcome" field of the rules.

In the above example, I wanted to receive any of the four outcomes, from the four rules. Sometimes however, you might want to build complex rules based on building blocks, but you might never want those building blocks to be a winning outcome. The train trip example from above can be used to show what I mean here:

var rule1 = new SubRule("longdistance", "input.distance > 100", "ch.maxant.produkte");
var rule2 = new SubRule("firstclass", "input.travelClass == "1", "ch.maxant.produkte");
var rule3 = new Rule("productA", "#longdistance && #firstclass", "productA", 3, "ch.maxant.produkte");
var rules = [rule1, rule2, rule3];

var e = new Engine(rules);

var request = {travelClass: 1, distance: 150};
var rs = e.getMatchingRules(request); 


In the above code, I build rule3 from two subrules. But I never want the outcomes of those building blocks to be output from the engine. So I create them as SubRules. SubRules don't have an outcome property or priority. They are simply used to build up more complex rules. After the engine has used the sub-rules to replace all tokens beginning in a hash during initialisation, it discards the SubRules - they are not evaluated.

The travel request above contains the distance and travel class.

Next, consider the business case of wanting to configure a forum system. The code below introduces actions. Actions are created by the application programmer and supplied to the engine. The engine takes the outcomes (as described in the first example), and searches for actions with the same names as those outcomes, and calls the "execute" method on those actions. This functionality is useful when a system must be capable of predefined things, but the choice of what to do needs to be highly configurable and independent of deployment.

var r1 = new Rule("SendEmailToUser", "input.config.sendUserEmail == true", "SendEmailToUser", 1, "ch.maxant.someapp.config");
var r2 = new Rule("SendEmailToModerator", "input.config.sendAdministratorEmail == true and input.user.numberOfPostings < 5", "SendEmailToModerator", 2, "ch.maxant.someapp.config");
var rules = [r1, r2];
		
var log = [];
log.add = function(i){this[this.length] = i;};
		
var action1 = {name: "SendEmailToUser", execute: function(i) {
    log.add("Sending email to user!");
}};
var action2 = {name: "SendEmailToModerator", execute: function(i) {     log.add("Sending email to moderator!");
}};

var engine = new Engine(rules);

var forumSetup = 
  {config: 
    {sendUserEmail: true, 
     sendAdministratorEmail: true}, 
   user: {numberOfPostings: 2}};
			
engine.executeAllActions(forumSetup, [a1, a2]);


In the code above, the actions are passed to the engine when we call the "executeAllActions" method. In this case, both actions are executed, because the forumSetup object causes both rules to evaluate to true. Note that the actions are executed in the order of highest priority rule first. Each action is only ever executed once - it's name is noted after execution and it will not be executed again, until the engines "execute*Action*" method is called again. Also, if you only want the action associated with the best outcome to be executed, call the "executeBestAction" method instead of "executeAllActions".

Finally, let's consider the classroom example.

var expression = 
    "var result = _.find(input.students, function(student){" +
    "    return student.age < 10;" +
    "});" +
    "result != undefined";

var r1 = new Rule("containsStudentUnder10", expression , "leaveEarly", 1, "ch.maxant.rules", "If a class contains a student under 10 years of age, then the class may go home early");
		
var r2 = new Rule("default", "true" , "leaveOnTime", 0, "ch.maxant.rules", "this is the default");
		
var classroom = {students: [{age: 12}, {age: 10}, {age: 8}]};

var e = new Engine([r1, r2]);
		
var outcome = e.getBestOutcome(classroom);


The outcome above is "leaveEarly", because the classroom contains one student whose age is less than 10. See how I have written an expression which uses the 'find' method from the underscorejs library. The engine simply requires a rule to return true, if the rule is to be considered a candidate for firing.

There are more examples in the tests contained in the source code, which can be found here: https://github.com/maxant/rules/tree/master/rules-js.

So, the requirements are fulfiled, except for "It should be possible to store rules in a database". While this library doesn't support reading and writing rules to / from a database, rules are String based. So it wouldn't be hard to create some database code which reads rules out of a database and populates Rule objects and passes them to the Engine. I haven't added this to the library, because normally these things as well as the management of rules is something quite project specific. And because my library will never be as cool or popular as something like Drools, I'm not sure it would be worth my while to add such functionality.


© 2014, Ant Kutschera

Social Bookmarks :  Add this post to Slashdot    Add this post to Digg    Add this post to Reddit    Add this post to Delicious    Add this post to Stumble it    Add this post to Google    Add this post to Technorati    Add this post to Bloglines    Add this post to Facebook    Add this post to Furl    Add this post to Windows Live    Add this post to Yahoo!

Simple rule engine updated

I have taken the time to upgrade my simple Java rule engine so that it supports Java 8 lambdas and streams and it is now published in Maven Central. The code is now also available on GitHub.

First off, the Maven dependencies. The following dependency is for the basic rule engine which is compatible with Java 6.

<dependency>
  <groupId>ch.maxant</groupId>
  <artifactId>rules</artifactId>
  <version>2.1.0</version>
</dependency>

If you want to use lambdas from Java 8, then you need to also add a dependency as follows:

<dependency>
  <groupId>ch.maxant</groupId>
  <artifactId>rules-java8</artifactId>
  <version>2.1.0</version>
</dependency>

This allows you to write code like that found on lines 15 and 20.

Rule rule1 = new Rule("R1", 
            "input.p1.name == \"ant\" && input.p2.name == \"clare\"", 
            "outcome1", 
            0, 
            "ch.maxant.produits", 
            "Règle spéciale pour famille Kutschera");
Rule rule2 = new Rule("R2", "true", "outcome2", 1, 
                      "ch.maxant.produits", "Régle par défault");
List<Rule> rules = Arrays.asList(rule1, rule2);

//to use a lambda, construct a SamAction and pass it a lambda.
IAction<MyInput, BigDecimal> action1 = 
        new SamAction<MyInput, BigDecimal>(
            "outcome1", 
            i -> new BigDecimal("100.0")
        );
IAction<MyInput, BigDecimal> action2 = 
        new SamAction<MyInput, BigDecimal>(
            "outcome2", 
            i -> new BigDecimal("101.0")
        );

List<IAction<MyInput, BigDecimal>> actions = 
                                 Arrays.asList(action1, action2);

Engine e = new Engine(rules, true);

MyInput input = new MyInput();
Person p1 = new Person("ant");
Person p2 = new Person("clare");
input.setP1(p1);
input.setP2(p2);

BigDecimal price = e.executeBestAction(input, actions);
assertEquals(new BigDecimal("101.0"), price);

If you want to pass a Stream to the Engine rather than a Collection, then use the sub-class found in the second library, for example:

Stream<Rule> streamOfRules = getStreamOfRules();

//to pass in a stream, we need to use a different Engine
Java8Engine e = new Java8Engine(streamOfRules, true);

//use this engine as you would the normal Engine

See the tests on GitHub for more details. In theory, you can pass in a parallel stream - I haven't tried it but would be interested to hear about your successes / failures.

Finally, if you are using Scala, there is still support for that too, in the ScalaEngine which can be found in the following dependency. See the tests on GitHub for more details.

<dependency>
  <groupId>ch.maxant</groupId>
  <artifactId>rules-scala</artifactId>
  <version>2.1.0</version>
</dependency>


Copyright ©2014, Ant Kutschera

Social Bookmarks :  Add this post to Slashdot    Add this post to Digg    Add this post to Reddit    Add this post to Delicious    Add this post to Stumble it    Add this post to Google    Add this post to Technorati    Add this post to Bloglines    Add this post to Facebook    Add this post to Furl    Add this post to Windows Live    Add this post to Yahoo!

The effects of programming with Java 8 Streams on algorithm performance

Multi-paradigm programming with Java has been possible for many years, with its support for a mix of service oriented, object oriented and aspect oriented programming. Java 8 with its lambdas and java.util.stream.Stream class, is good news because it lets us add the functional programming paradigm into the mix. Indeed there has been a lot of hype around lambdas. But is changing our habits and the way we write our code a wise thing, without first getting to know the dangers that might lurk?

Java 8's Stream class is neat because it lets you take a collection of data and chain multiple functional calls on that data together, making for tidy code. Map/reduce algorithms are a good example, where you take a collection of data and aggregate it by first selecting or modifying data out of a complex domain and simplifying it (the "map" part), and then reducing it to a single useful value.

Take for example the following data classes (written in Groovy so that I get code generation of constructors, accessors, hash/equals and toString methods for free!):
//Groovy
@Immutable
class City {
    String name
    List<Temperature> temperatures
}
@Immutable
class Temperature {
    Date date
    BigDecimal reading
}
I can use those classes to construct some random weather data in a list of City objects, e.g.:
private static final long ONE_DAY_MS = 1000*60*60*24;
private static final Random RANDOM = new Random();

public static List<City> prepareData(
                      int numCities, int numTemps) {
    List<City> cities = new ArrayList<>();
    IntStream.range(0, numCities).forEach( i ->
        cities.add(
            new City(
                generateName(), 
                generateTemperatures(numTemps)
            )
        )
    );
    return cities;
}

private static List<Temperature> generateTemperatures(
                                         int numTemps) {
    List<Temperature> temps = new ArrayList<>();
    for(int i = 0; i < numTemps; i++){
        long when = System.currentTimeMillis();
        when += ONE_DAY_MS*RANDOM.nextInt(365);
        Date d = new Date(when);
        Temperature t = new Temperature(
                             d, 
                             new BigDecimal(
                                RANDOM.nextDouble()
                             )
                         );
        temps.add(t);
    }
    return temps;
}

private static String generateName() {
    char[] chars = new char[RANDOM.nextInt(5)+5];
    for(int i = 0; i < chars.length; i++){
        chars[i] = (char)(RANDOM.nextInt(26) + 65);
    }
    return new String(chars);
}
Line 7 uses the IntStream class, also from Java 8, to construct a range over which lines 8-13 iterate, adding new cities to the list constructed on line 6. Lines 22-30 generate random temperatures on random days.

If I wanted to then calculate the average temperature recorded in August, across all cities, I could write the following functional algorithm:
Instant start = Instant.now();
Double averageTemperature = cities.stream().flatMap(c ->
    c.getTemperatures().stream()
).filter(t -> {
    LocalDate ld = LocalDateTime.ofEpochSecond(
                       t.getDate().getTime(), 
                       0, 
                       ZoneOffset.UTC
                    ).toLocalDate();
    return ld.getMonth() == Month.AUGUST;
}).map(t ->
    t.getReading()
).collect(
    Collectors.averagingDouble(
        TestFilterMapReducePerformance::toDouble
    )
);

Instant end = Instant.now();
System.out.println(
    "functional calculated in " + 
    Duration.between(start, end) + 
    ": " + averageTemperature);
Line 1 is used to start the clock. The code then creates a stream from the list of cities, on line 2. I then flatten the data by creating a single long list of all temperatures using the flatMap method (also line 2), passing it a lambda on line 3 which returns each list of temperatures as a stream which the flatMap method can append together. Once that is done, I use the filter method on line 4 to throw away any data that is not from August. I then call the map method on line 11 to convert each Temperature object into a BigDecimal and with the resulting stream I use the collect method on line 13 together with a collector which calculates the average. Line 15 needs a helper function to convert instances of BigDecimal into doubles, since line 14 works with doubles rather than BigDecimals:
/** method to convert to double */
public static Double toDouble(BigDecimal a) {
    return a.doubleValue();
}
The number crunching part of the listing above can alternatively be written in an imperative style, as follows:
BigDecimal total = BigDecimal.ZERO;
int count = 0;
for(City c : cities){
    for(Temperature t : c.getTemperatures()){
        LocalDate ld = LocalDateTime.ofEpochSecond(
                          t.getDate().getTime(), 
                          0, 
                          ZoneOffset.UTC).toLocalDate();
        if(ld.getMonth() == Month.AUGUST){
            total = total.add(t.getReading());
            count++;
        }
    }
}
double averageTemperature = total.doubleValue() / count;
In the imperative version of the algorithm I do the mapping, filtering and reducing in a different order, but the result is the same. Which style, functional or imperative, do you think is faster, and by how much?

In order to make a more accurate reading of the performance data, I need to run the algorithms many times so that the hotspot compiler has time to warm up. Running the algorithms multiple times in pseudo-random order, I was able to measure that the code written in the functional style took around an average of 0.93 seconds (using a thousand cities, each with a thousand temperatures; calculated on a laptop with an Intel i5 2.40GHz 64 bit processor with 4 cores). The code written in the imperative style took 0.70 seconds, which is 25% quicker.

So I asked myself if imperative code is always quicker than functional code. Let's try simply counting the number of temperatures recorded in August. Functional code could look like this:
long count = cities.stream().flatMap(c ->
    c.getTemperatures().stream()
).filter(t -> {
    LocalDate ld = LocalDateTime.ofEpochSecond(
                       t.getDate().getTime(), 
                       0, 
                       ZoneOffset.UTC).toLocalDate();
    return ld.getMonth() == Month.AUGUST;
}).count();
The functional code involves filtering and then calling the count method. Alternatively, the equivalent imperative code could look like this:
long count = 0;
for(City c : cities){
    for(Temperature t : c.getTemperatures()){
        LocalDate ld = LocalDateTime.ofEpochSecond(
                       t.getDate().getTime(), 
                       0, 
                       ZoneOffset.UTC).toLocalDate();
        if(ld.getMonth() == Month.AUGUST){
            count++;
        }
    }
}
In this example, running with a different data set than used to calculate average August temperatures, the imperative code averaged 1.80 seconds while the functional code averaged just a little less. So we can't deduce that functional code is quicker or slower than imperative code. It really depends on the use case. What is interesting is that we can make the calculations run in parallel by using the parallelStream() method instead of the stream() method. In the case of calculating the average temperature, using a parallel stream means that the average is calculated in 0.46 seconds rather than 0.93 seconds. Counting the temperatures in parallel took 0.90 seconds rather than 1.80 seconds serially. Try writing imperative code which splits up the data, spreads calculations across cores and assembles the results into a single average temperature - it would take a lot of work! Precisely this is one of the main reasons for wanting to add functional programming to Java 8. How does it work? Spliterators and Completers are used to distribute the work in the default ForkJoinPool which by default is optimised to use as many threads as there are cores. Theory dictates that using only as many threads as there are cores means that no time is wasted with context switches, but it depends on whether the work being done contains any blocking I/O - that's something I discuss in my book on Scala.

Spawning threads is an interesting topic when working with Java EE application servers, as strictly speaking you are not allowed to spawn threads. But since creating a parallel stream doesn't spawn any threads, there is no need to worry about it! Using parallel streams is entirely legal in a Java EE environment :-)

You can use a map/reduce algorithm to calculate the total number of temperatures in August too:
int count = cities.stream().map(c ->
    c.getTemperatures().size()
).reduce(
    Integer::sum
).get();
Line 1 creates the stream from the list, and maps (converts) the cities into the number of temperatures for the city using the lambda on line 2. Line 3 reduces the stream of "number of temperatures" into a single value by using the sum method of the Integer class on line 4. Since streams might contain no elements, the reduce method returns an Optional, and we call the get method to get the total count. We can do that safely because we know that the cities contain data. Should you be working with data which might be empty, you could call the orElse(T) method which lets you specify a default value to use if no result is available.

In terms of writing functional code, there is another way to write this algorithm:
long count = cities.stream().map(c ->
    c.getTemperatures().stream().count()
).reduce(
    Long::sum
).get();
Using the above method, the lambda on line 2 counts the size of the list of temperatures by converting it into a steam and calling the count method. In terms of performance, this is a bad way to get the size of a list. With a thousand cities and a thousand temperatures each, the total count was calculated in 160ms using the first algorithm. The second algorithm increases that time to 280ms! The reason is that an ArrayList knows its size since it tracks it as elements are added or removed. A stream on the other hand calculates the size by first mapping each element to the value 1L and then reducing the stream of 1Ls using the Long::sum method. On long lists of data that is a sizeable overhead when compared to simply looking up the size from an attribute in the list.

Comparing the time required by the functional code to the time required by the following imperative code shows that the functional code is twice as slow - the imperative code calculates the total number of temperatures in an average of just 80ms.
long count = 0;
for(City c : cities){
    count += c.getTemperatures().size();
}
Using a parallel stream instead of a sequential stream, again by simply calling the parallelStream() method instead of the stream() method on line 1 three listings above, results in the algorithm requiring an average of 90ms, i.e. slightly more than the imperative code.

A third way to count temperatures is to use Collectors. Here, I used a million cities, each with just two temperatures. The algorithm is:
int count = cities.stream().collect(
    Collectors.summingInt(c -> 
        c.getTemperatures().size()
    )
);
The equivalent imperative code is:
long count = 0;
for(City c : cities){
    count += c.getTemperatures().size();
}
On average, the functional listing took 100ms, which was the same time taken by the imperative listing. Using a parallel stream on the other hand reduced the calculation time by half, to just 50ms.

The next question I asked myself was whether it is possible to determine how much data needs to be procdessed, so that using a parallel stream becomes worthwhile? Spliting data up, submitting it to an ExecutorService like the ForkJoinPool and collecting the results together after the calculation, isn't free - it costs in terms of performance. It certainly is possible to work out when it pays off to process data in parallel, and the answer is, typically, that it depends on the use case.

In this experiment I calculate the average of a list of numbers. I repeat the work over and over (NUM_RUNS times) simply to get measurable values, since calculating the average of three numbers is too quick to measure reliably. I vary the size of the list from 3 numbers to three million, to determine how big the list needs to get before it pays off using a parallel stream to calculate the average.

The algorithm used was:
double avg = -1.0;
for(int i = 0; i < NUM_RUNS; i++){
    avg = numbers.stream().collect(
        Collectors.averagingInt(n->n)
    );
}
Just for fun, here is another way to do the calculation:
double avg = -1.0;
for(int i = 0; i < NUM_RUNS; i++){
    avg = numbers.stream().
            mapToInt(n->n).
            average().
            getAsDouble();
}
The results were as follows. With just three numbers in the list I ran the calculation 100,000 times. Running the test many times over, showed that on average, the serial calculation took 20ms compared to the parallel calculation that took 370ms. So with a small sample of data, in this case, it isn't worth using a parallel stream.

On the other hand, with three million numbers in the list the serial calculation took 1.58 seconds compared to only 0.93 seconds for the parallel calculation. So with a large sample of data, in this case, it is worth using a parallel stream. Note that the number of runs was reduced as the data set size was increased, so that I didn't have to wait as long for the results (I don't drink coffee!).

# numbers in list Avg. time SERIAL Avg. time PARALLEL NUM_RUNS
3 0.02s 0.37s 100,000
300.02s0.46s100,000
3000.07s0.53s100,000
3,0001.98s2.76s100,000
30,0000.67s1.90s10,000
300,0001.71s1.98s1,000
3,000,0001.58s0.93s100

Does that mean that parallel streams are only useful for large data sets? No! It entirely depends on the intensity of the calculation at hand. The following futile algorithm simply heats the CPU, but demonstrates a complex calculation.
private void doIntensiveWork() {
    double a = Math.PI;
    for(int i = 0; i < 100; i++){
        for(int j = 0; j < 1000; j++){
            for(int k = 0; k < 100; k++){
                a = Math.sqrt(a+1);
                a *= a;
            }
        }
    }
    System.out.println(a);
}
We can generate a list of two runnables which do this intensive work using the following listing:
private List<Runnable> generateRunnables() {
    Runnable r = () -> {
        doIntensiveWork();
    };
    return Arrays.asList(r, r);
}
Finally, we can measure the time it takes to run the two runnables, for example in parallel (see the call to the parallelStream() method on line 3):
List<Runnable> runnables = generateRunnables();
Instant start = Instant.now();
runnables.parallelStream().forEach(r -> r.run());
Instant end = Instant.now();
System.out.println(
    "functional parallel calculated in " + 
    Duration.between(start, end));
Using a parallel stream it took an average of 260ms to do the intensive work twice. Using a serial stream, it took an average of 460ms, i.e. nearly double the time.

What can we conclude from all these experiments? Well it isn't possible to conclusively say that functional code is slower than imperative code and it isn't possible either to say that using parallel streams is faster than using serial stream. What we can conclude is that programmers need to experiment with different solutions and measure the effects of the coding style on performance, when they write performance critical code. But let's be honest, that isn't anything new! For me, what you should be taking away with you after reading this post is that there are always many ways to write algorithms and choosing the right way is important. Knowing which way is right is a combination of experience but more importantly playing around with the code and trying different solutions. Finally though, as always, don't optimise prematurely :-)

Copyright ©2014, Ant Kutschera
Social Bookmarks :  Add this post to Slashdot    Add this post to Digg    Add this post to Reddit    Add this post to Delicious    Add this post to Stumble it    Add this post to Google    Add this post to Technorati    Add this post to Bloglines    Add this post to Facebook    Add this post to Furl    Add this post to Windows Live    Add this post to Yahoo!

Non-Blocking Asynchronous Java 8 and Scala's Try/Success/Failure

Inspired by a recent newsletter from Heinz Kabutz as well as Scala's Futures which I investigated in my recent book, I set about using Java 8 to write an example of how to submit work to an execution service and respond to its results asynchronously, using callbacks so that there is no need to block any threads waiting for the results from the execution service.

Theory says that calling blocking methods like get on a java.util.concurrent.Future is bad, because the system will need more than the optimum number of threads if it is to continuously do work, and that results in wasting time with context switches.

In the Scala world, frameworks like Akka use programming models that mean that the frameworks will never block - the only time a thread blocks is when a user programs something which blocks, and they are discouraged from doing that. By never blocking, the framework can get away with using about one thread per core which is many less than say a standard JBoss Java EE Application Server, that has as many as 400 threads just after startup. Due largely to the work of the Akka framework, Scala 2.10 added Futures and Promises, but these don't (yet?) exist in Java.

The following code shows the goal I had in mind. It has three parts to it. Firstly, new tasks are added to the execution service using the static future method found in the class ch.maxant.async.Future. It returns a Future, but not one from the java.util.concurrent package, rather a subclass thereof from the ch.maxant.async package. Secondly, that Future has a method called map, following the functional style from Scala or the new Java 8 Stream class. The map method lets you register a callback, or more precisely, let's you map (convert) the value that the first future contains into a new value. The mapping is carried out at some other time in the future, after the first Future is completed and so it results in a new Future. Thirdly, we use another method in the Future class to register a callback to be run once all the futures we create are complete. At no time are any blocking methods of the Future API used!
final Random random = new Random();
int numTasks = 10;
List<Future<Integer>> futures = new ArrayList<>();

for(int i = 0; i < numTasks; i++){
    final int j = i;
    log("adding future " + i);
 
    // PART 1
    //start some work async / in the future
    Future<String> f = future(new Task<String>( () -> {
        sleep(random.nextInt(1000));
        if(j < 5){
            log("working success");
            return "20";
        }else{
            log("working failure");
            throw new Exception();
        }
    }));
 
    // PART 2
    //register a callback, to be called when the work is done
    log("adding mapping callback to future");
    final Future<Integer> f2 = f.map( (Try<String> stringNumber) -> {
        return stringNumber.map( (String s) -> {
            log("mapping '" + s + "' to int");
            return Integer.parseInt(s);
        }).recover( (Exception e) -> {
            log("recovering");
            return -10;
        }).get(); //wont throw an exception, because we provided a recovery!
    });
    
    futures.add(f2);
}

// PART 3
log("registering callback for final result");
Future.registerCallback(futures, (List<Try<Integer>> results) -> {
        
    Integer finalResult = results.stream().map( (Try<Integer> t) -> {
        log("mapping " + t);
        try {
            return t.get();
        } catch (Exception e) {
            return 0;
        }
    }).reduce(0, (Integer i1, Integer i2) -> {
        log("reducing " + i1 + " and " + i2);
        return i1 + i2;
    });
    
    log("final result is " + finalResult);
    Future.shutdown();
    if(finalResult != 50){
        throw new RuntimeException("FAILED");
    }else{
        log("SUCESS");
    }
});
 
System.out.println("Completed submitting all tasks on thread " + Thread.currentThread().getId());
 
//this main thread will now die, but the Future executor is still up and running.  the callback will shut it down and with it, the jvm.
Line 11 calls the future method to register a new Task, which is constructed using a Work instance, constructed here using a Java 8 lambda. The work sleeps for a little time and then either returns the number 20, as a string, or throws an exception, just to demonstrate how errors are handled.

Using the Future that line 11 gets back from the execution service, line 25 maps it's value from a string into an integer, resulting in a Future<Integer> rather than a Future<String>. That result is added to a list of Futures on line 35, which part 3 uses on line 40. The registerCallback method will ensure that the given callback is called after the last future is completed.

The mapping on lines 25-33 is done using a lambda which is passed a Try object. A Try is a little like a Java 8 Optional and is an abstraction (super class) of the Success and Failure classes, which I implemented based on my knowledge of their Scala counterparts. It allows programmers to handle failure more easily than having to explicitly check for errors. My implementation of the Try interface is as follows:
public interface Try<T> {
 
    /** returns the value, or throws an exception if its a failure. */
    T get() throws Exception;
 
    /** converts the value using the given function, resulting in a new Try */
    <S> Try<S> map(Function1<T, S> func);
 
    /** can be used to handle recovery by converting the exception into a {@link Try} */
    Try<T> recover(Recovery<T> r);
 
}
What happens is that the implementation of the Success and Failure handle errors gracefully. For example, if the Future on line 11 of the first listing is completed with an exception, then the lambda on line 25 of the first listing is passed a Failure object, and calling the map method on a Failure does absolutely nothing. No exception is raised, nothing. To compensate, you can call the recover method, for example on line 29 of the first listing, which allows you to handle the exception and return a value with which your program can continue, for example a default value.

The Success class on the other hand implements the map and recover methods of the Try interface differently, such that calling map leads to the given function being called, but calling recover does absolutely nothing. Instead of explicitly coding a try/catch block, the map and recover methods allow for a nicer syntax, one which is more easily validated when reading or reviewing code (which happens more often to code, than writing it).

Since the map and recover methods wrap the results of the functions in Trys, you can chain the calls together, such as lines 26, 29 and 32. The Try API from Scala has many more methods than the three that I have implemented here. Note that I chose not to use a java.util.function.Function in my Try API because it's apply method doesn't throw Exception which meant that the code shown in the first listing wasn't as nice as it now is. Instead I wrote the Function1 interface.

Part 3 of the puzzle is how to get the program to do something useful after all the Futures are complete, without nasty blocking calls like those to the Future#get() method. The solution is to register a callback as shown on line 40. That callback is, like all the others shown here, submitted to the execution service. That means we have no guarantee which thread will run it, and that has a side effect, namely that thread local storage (TLS) no longer works - some frameworks like (older versions of?) Hibernate relied on TLS, and they just won't work here. Scala has a nice way of solving that problem using the implicit keyword, which Java doesn't have (yet...?), so some other mechanism needs to be used. I'm mentioning it, just so that you are aware of it.

So, when the last future completes, lines 40-60 are called, and passed a List of Trys containing Integers, rather than Futures. The registerCallback method converts the futures into appropriate Successes or Failures. But how can we convert those into something useful? With a simple map/reduce of course, and luckily, Java 8 now supports that with the Stream class, which is instantated from the collection of Trys on line 42 by calling the stream() method. First I map (convert) the Trys into their values, and then I reduce the stream to a single value on line 49. Instead of passing my own implementation of a lambda that sums values, I could have used Integer::sum, for example someStream.reduce(0, Integer::sum).

The last time I ran the program, it outputted the following:
Thread-1 says: adding future 0
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 1
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 2
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 3
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 4
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 5
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 6
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 7
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 8
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 9
Thread-1 says: adding mapping callback to future
Thread-1 says: registering callback for final result
Thread-10 says: working success
Completed submitting all tasks on thread 1
Thread-14 says: working success
Thread-10 says: working failure
Thread-14 says: working failure
Thread-12 says: working success
Thread-10 says: working failure
Thread-10 says: mapping '20' to int
Thread-10 says: mapping '20' to int
Thread-10 says: recovering
Thread-10 says: recovering
Thread-10 says: mapping '20' to int
Thread-10 says: recovering
Thread-11 says: working success
Thread-11 says: mapping '20' to int
Thread-13 says: working success
Thread-10 says: mapping '20' to int
Thread-12 says: working failure
Thread-12 says: recovering
Thread-14 says: working failure
Thread-14 says: recovering
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: final result is 50
Thread-14 says: SUCESS
As you can see, the main thread adds all the tasks and registers all the mapping functions (lines 1-20). It then registers the callback (line 21 of the output which corresponds to line 39 of the listing), and finally outputs the text from line 63 in the listing, after which it dies, because it has nothing else to do. Line 22 and lines 24-42 of the output then show the various threads in the pool (which contained 5 threads) processing the work as well as mapping from String to Integer, or recovering from an exception. This is the code in parts 1 and 2 of the first listing. You can see that it is entirely asynchronous, with some mappings / recoveries occuring before all the initial work is complete (compare lines 38 or 40 which are a mapping and recovery respectively, to line 41 of the output, which occurs afterwards and is the last of the initial work). Lines 43-52 are the output of the map/reduce which is part 3 of the main listing. Note that no reduce is logged, because the code I ran, and which is on Github, uses the Integer::sum shortcut mentioned above, rather than lines 50-51 of the first listing shown above.

While all of this is possible using Java 6 (or even 5?), for example by getting the tasks which are submit to the pool to submit the callback themselves, once they are finished, the amount of code needed to do that is larger and the code itself would be uglier than that shown here. Java 8 lambdas, Futures which can be mapped using callbacks and the Try API with its neat error handling all make the solution shown here arguably more maintainable.

The code shown above, as well as the code for the classes in the ch.maxant.async package, are available under the Apache License Version 2.0, and can be downloaded from my Github account.

Copyright ©2013, Ant Kutschera
Tags : , ,
Social Bookmarks :  Add this post to Slashdot    Add this post to Digg    Add this post to Reddit    Add this post to Delicious    Add this post to Stumble it    Add this post to Google    Add this post to Technorati    Add this post to Bloglines    Add this post to Facebook    Add this post to Furl    Add this post to Windows Live    Add this post to Yahoo!

Play 2.0 framework and XA transactions

XA transactions are useful and out of the box, Play 2.0 today does not have support for them.  Here I show how to add that support:

First off, some examples when XA is useful:

- JPA uses two physical connections if you use entities from two different persistence.xml - those two connections might need to be committed in one transaction, so XA is your only option
- Committing a change in a database and at the same time committing a message to JMS.  E.g. you want to guarantee that an email is sent after you successfully commit an order to the database, asynchronously.  There are other ways, but JMS provides a transactional way to do this with little overhead in having to think about failure.
- Writing to a physically different database because of any of several political reasons (legacy system, different department responsible for different database server / different budgets).
- See http://docs.codehaus.org/display/BTM/FAQ#FAQ-WhywouldIneedatransactionmanager

So the way I see it, XA is something Play needs to "support".

Adding support is very easy.  I have created a play plugin which is based on Bitronix.  Resources are configured in the Bitronix JNDI tree (why on earth does Play use a config file rather than JNDI?! anyway...)  You start the transaction like this, "withXaTransaction":

    def someControllerMethod = Action {
                    withXaTransaction { ctx =>
                        TicketRepository.

addValidation(user.get, bookingRef, ctx)
                        ValidationRepository.
addValidation(bookingRef, user.get, ctx)
                    }
                    val tickets = TicketRepository.
getByEventUid(eventUid)
                    Ok(views.html.ticketsInEvent(
eventUid, getTickets(eventUid), user, eventValidationForm))
    }

The ctx object is an XAContext (my own class) which lets you look up resources like a datasource, or set rollback in case of a failure.  So the validation repo does this, using ScalaQuery (I used "withSession" rather than "withTransaction!"):

    def addValidation(bookingRef: String, validator: User, ctx: XAContext) = {
        val ds = ctx.lookupDS("jdbc/maxant/
scalabook_admin")
        Database.forDataSource(ds) withSession { implicit db: Session =>
            Validations.insert(Validation(
bookingRef, validator.email, new java.sql.Timestamp(now)))
        }
    }   

And the ticket repo does the following with JMS:

    def addValidation(user: User, bookingRef: String, ctx: XAContext) = {

        val xml =
            <ticketValidation>
                <bookingReference>{bookingRef}
</bookingReference>
                <validatorId>{user.email}</
validatorId>
            </ticketValidation>

        val qcf = ctx.lookupCF("jms/maxant/
scalabook/ticketvalidations")
        val qc = qcf.createConnection("
ticketValidation","password")
        val qs = qc.createSession(false, Session.AUTO_ACKNOWLEDGE)
        val q = qs.createQueue("
ticketValidationQueue") //val q = ctx.lookup(QUEUE).asInstanceOf[Queue]
        val sender = qs.createProducer(q)
        val m = qs.createTextMessage(xml.
toString)
        sender.send(m)
        sender.close
        qs.close
        qc.close
    }   

I've tested it with writing to MySQL and sending a JMS message to JBoss (HornetQ) and it seems to work well (except getting hornetQ to play with Bitronix was a bitch - see here: https://community.jboss.org/thread/206180?tstart=0).

The scala code for the XA support is:

package ch.maxant.scalabook.play20.plugins.xasupport

import play.api.mvc.RequestHeader
import play.api.mvc.Results
import play.api.mvc.Request
import play.api.mvc.AnyContent
import play.api.mvc.Result
import play.api.mvc.Action
import play.api.mvc.Security
import play.api._
import play.api.mvc._
import play.api.data._
import play.api.data.Forms._
import ch.maxant.scalabook.
persistence.UserRepository
import bitronix.tm.
TransactionManagerServices
import java.util.Hashtable
import javax.naming.Context._
import javax.naming.InitialContext
import javax.sql.DataSource
import bitronix.tm.
BitronixTransaction
import java.io.File
import org.scalaquery.session.
Database
import org.scalaquery.SQueryException
import scala.collection.mutable.
ListBuffer
import java.sql.Connection
import java.sql.SQLException
import org.scalaquery.session.Session
import bitronix.tm.
BitronixTransactionManager
import javax.jms.ConnectionFactory

class XAContext {

    private val env = new Hashtable[String, String]()
    env.put(INITIAL_CONTEXT_
FACTORY, "bitronix.tm.jndi.BitronixInitialContextFactory")
    private val namingCtx = new InitialContext(env);

    var rollbackOnly = false
   
    def lookup(name: String) = {
        namingCtx.lookup(name)
    }
    def lookupDS(name: String) = {
        lookup(name).asInstanceOf[
DataSource]
    }
    def lookupCF(name: String) = {
        lookup(name).asInstanceOf[
ConnectionFactory]
    }
}

trait XASupport { self: Controller =>

    private lazy val tm = play.api.Play.current.plugin[
XASupportPlugin] match {
      case Some(plugin) =>
plugin.tm
      case None => throw new Exception("There is no XASupport plugin registered. Make sure it is enabled. See play documentation. (Hint: add it to play.plugins)")
    }

    /**
     * Use this flow control to make resources used inside `f` commit with the XA protocol.
     * Conditions: get resources like drivers or connection factories out of the context passed to f.
     * Connections are opened and closed as normal, for example by the withSession flow control offered
     * by ScalaQuery / SLICK.
     */
    def withXaTransaction[T](f: XAContext => T): T = {
        tm.begin

        //get a ref to the transaction, in case when we want to commit we are no longer on the same thread and TLS has lost the TX.
        //we have no idea what happens inside f!  they might spawn new threads or send work to akka asyncly
        val t = tm.getCurrentTransaction
        Logger("XASupport").info("
Started XA transaction " + t.getGtrid())
        val ctx = new XAContext()
        var completed = false
        try{
            val result = f(ctx)
            completed = true
            if(!ctx.rollbackOnly){
                Logger("XASupport").info("
committing " + t.getGtrid() + "...")
                t.commit
                Logger("XASupport").info("
committed " + t.getGtrid())
            }
            result
        }finally{
            if(!completed || ctx.rollbackOnly){
                //in case of exception, or in case of set rollbackOnly = true
                Logger("XASupport").warn("
rolling back (completed=" + completed + "/ctx.rollbackOnly=" + ctx.rollbackOnly)
                t.rollback
            }
        }
    }
}

class XASupportPlugin(app: play.Application) extends Plugin {

    protected[plugins] var tm: BitronixTransactionManager = null
   
    override def onStart {
        //TODO how about getting config out of jar!
        val file = new File(".", "app/bitronix-default-config.
properties").getAbsolutePath
        Logger("XASupport").info("
Using Bitronix config at " + file)
        val prop = System.getProperty("bitronix.
tm.configuration", file) //default
        System.setProperty("bitronix.
tm.configuration", prop) //override with default, if not set

        //start the TM
        tm = TransactionManagerServices.
getTransactionManager
       
        Logger("XASupport").info("
Started TM with resource config " + TransactionManagerServices.getConfiguration.getResourceConfigurationFilename)
    }

    override def onStop {
        //on graceful shutdown, we want to shutdown the TM too
        Logger("XASupport").info("
Shutting down TM")
        tm.shutdown
        Logger("XASupport").info("TM shut down")
    }

}


Use the code as you like, I'm giving it away for free :-)  Just don't complain if it don't work ;-)

It would be nice to see this plugin extended and turned into something a little more production ready.  Even nicer would be for Play to support a transaction manager natively, including fetching resources out of JNDI.

Have fun!

(copyright Ant Kutschera 2012)

Tags : , ,
Social Bookmarks :  Add this post to Slashdot    Add this post to Digg    Add this post to Reddit    Add this post to Delicious    Add this post to Stumble it    Add this post to Google    Add this post to Technorati    Add this post to Bloglines    Add this post to Facebook    Add this post to Furl    Add this post to Windows Live    Add this post to Yahoo!