The effects of programming with Java 8 Streams on algorithm performance

Multi-paradigm programming with Java has been possible for many years, with its support for a mix of service oriented, object oriented and aspect oriented programming. Java 8 with its lambdas and java.util.stream.Stream class, is good news because it lets us add the functional programming paradigm into the mix. Indeed there has been a lot of hype around lambdas. But is changing our habits and the way we write our code a wise thing, without first getting to know the dangers that might lurk?

Java 8’s Stream class is neat because it lets you take a collection of data and chain multiple functional calls on that data together, making for tidy code. Map/reduce algorithms are a good example, where you take a collection of data and aggregate it by first selecting or modifying data out of a complex domain and simplifying it (the “map” part), and then reducing it to a single useful value.

Take for example the following data classes (written in Groovy so that I get code generation of constructors, accessors, hash/equals and toString methods for free!):

//Groovy
@Immutable
class City {
    String name
    List<Temperature> temperatures
}
@Immutable
class Temperature {
    Date date
    BigDecimal reading
}

I can use those classes to construct some random weather data in a list of City objects, e.g.:

private static final long ONE_DAY_MS = 1000*60*60*24;
private static final Random RANDOM = new Random();

public static List<City> prepareData(
                      int numCities, int numTemps) {
    List<City> cities = new ArrayList<>();
    IntStream.range(0, numCities).forEach( i ->
        cities.add(
            new City(
                generateName(), 
                generateTemperatures(numTemps)
            )
        )
    );
    return cities;
}

private static List<Temperature> generateTemperatures(
                                         int numTemps) {
    List<Temperature> temps = new ArrayList<>();
    for(int i = 0; i < numTemps; i++){
        long when = System.currentTimeMillis();
        when += ONE_DAY_MS*RANDOM.nextInt(365);
        Date d = new Date(when);
        Temperature t = new Temperature(
                             d, 
                             new BigDecimal(
                                RANDOM.nextDouble()
                             )
                         );
        temps.add(t);
    }
    return temps;
}

private static String generateName() {
    char[] chars = new char[RANDOM.nextInt(5)+5];
    for(int i = 0; i < chars.length; i++){
        chars[i] = (char)(RANDOM.nextInt(26) + 65);
    }
    return new String(chars);
}

Line 7 uses the IntStream class, also from Java 8, to construct a range over which lines 8-13 iterate, adding new cities to the list constructed on line 6. Lines 22-30 generate random temperatures on random days.

If I wanted to then calculate the average temperature recorded in August, across all cities, I could write the following functional algorithm:

Instant start = Instant.now();
Double averageTemperature = cities.stream().flatMap(c ->
    c.getTemperatures().stream()
).filter(t -> {
    LocalDate ld = LocalDateTime.ofEpochSecond(
                       t.getDate().getTime(), 
                       0, 
                       ZoneOffset.UTC
                    ).toLocalDate();
    return ld.getMonth() == Month.AUGUST;
}).map(t ->
    t.getReading()
).collect(
    Collectors.averagingDouble(
        TestFilterMapReducePerformance::toDouble
    )
);

Instant end = Instant.now();
System.out.println(
    "functional calculated in " + 
    Duration.between(start, end) + 
    ": " + averageTemperature);

Line 1 is used to start the clock. The code then creates a stream from the list of cities, on line 2. I then flatten the data by creating a single long list of all temperatures using the flatMap method (also line 2), passing it a lambda on line 3 which returns each list of temperatures as a stream which the flatMap method can append together. Once that is done, I use the filter method on line 4 to throw away any data that is not from August. I then call the map method on line 11 to convert each Temperature object into a BigDecimal and with the resulting stream I use the collect method on line 13 together with a collector which calculates the average. Line 15 needs a helper function to convert instances of BigDecimal into doubles, since line 14 works with doubles rather than BigDecimals:

/** method to convert to double */
public static Double toDouble(BigDecimal a) {
    return a.doubleValue();
}

The number crunching part of the listing above can alternatively be written in an imperative style, as follows:

BigDecimal total = BigDecimal.ZERO;
int count = 0;
for(City c : cities){
    for(Temperature t : c.getTemperatures()){
        LocalDate ld = LocalDateTime.ofEpochSecond(
                          t.getDate().getTime(), 
                          0, 
                          ZoneOffset.UTC).toLocalDate();
        if(ld.getMonth() == Month.AUGUST){
            total = total.add(t.getReading());
            count++;
        }
    }
}
double averageTemperature = total.doubleValue() / count;

In the imperative version of the algorithm I do the mapping, filtering and reducing in a different order, but the result is the same. Which style, functional or imperative, do you think is faster, and by how much?

In order to make a more accurate reading of the performance data, I need to run the algorithms many times so that the hotspot compiler has time to warm up. Running the algorithms multiple times in pseudo-random order, I was able to measure that the code written in the functional style took around an average of 0.93 seconds (using a thousand cities, each with a thousand temperatures; calculated on a laptop with an Intel i5 2.40GHz 64 bit processor with 4 cores). The code written in the imperative style took 0.70 seconds, which is 25% quicker.

So I asked myself if imperative code is always quicker than functional code. Let’s try simply counting the number of temperatures recorded in August. Functional code could look like this:

long count = cities.stream().flatMap(c ->
    c.getTemperatures().stream()
).filter(t -> {
    LocalDate ld = LocalDateTime.ofEpochSecond(
                       t.getDate().getTime(), 
                       0, 
                       ZoneOffset.UTC).toLocalDate();
    return ld.getMonth() == Month.AUGUST;
}).count();

The functional code involves filtering and then calling the count method. Alternatively, the equivalent imperative code could look like this:

long count = 0;
for(City c : cities){
    for(Temperature t : c.getTemperatures()){
        LocalDate ld = LocalDateTime.ofEpochSecond(
                       t.getDate().getTime(), 
                       0, 
                       ZoneOffset.UTC).toLocalDate();
        if(ld.getMonth() == Month.AUGUST){
            count++;
        }
    }
}

In this example, running with a different data set than used to calculate average August temperatures, the imperative code averaged 1.80 seconds while the functional code averaged just a little less. So we can’t deduce that functional code is quicker or slower than imperative code. It really depends on the use case. What is interesting is that we can make the calculations run in parallel by using the parallelStream() method instead of the stream() method. In the case of calculating the average temperature, using a parallel stream means that the average is calculated in 0.46 seconds rather than 0.93 seconds. Counting the temperatures in parallel took 0.90 seconds rather than 1.80 seconds serially. Try writing imperative code which splits up the data, spreads calculations across cores and assembles the results into a single average temperature – it would take a lot of work! Precisely this is one of the main reasons for wanting to add functional programming to Java 8. How does it work? Spliterators and Completers are used to distribute the work in the default ForkJoinPool which by default is optimised to use as many threads as there are cores. Theory dictates that using only as many threads as there are cores means that no time is wasted with context switches, but it depends on whether the work being done contains any blocking I/O – that’s something I discuss in my book on Scala.

Spawning threads is an interesting topic when working with Java EE application servers, as strictly speaking you are not allowed to spawn threads. But since creating a parallel stream doesn’t spawn any threads, there is no need to worry about it! Using parallel streams is entirely legal in a Java EE environment 🙂

You can use a map/reduce algorithm to calculate the total number of temperatures in August too:

int count = cities.stream().map(c ->
    c.getTemperatures().size()
).reduce(
    Integer::sum
).get();

Line 1 creates the stream from the list, and maps (converts) the cities into the number of temperatures for the city using the lambda on line 2. Line 3 reduces the stream of “number of temperatures” into a single value by using the sum method of the Integer class on line 4. Since streams might contain no elements, the reduce method returns an Optional, and we call the get method to get the total count. We can do that safely because we know that the cities contain data. Should you be working with data which might be empty, you could call the orElse(T) method which lets you specify a default value to use if no result is available.

In terms of writing functional code, there is another way to write this algorithm:

long count = cities.stream().map(c ->
    c.getTemperatures().stream().count()
).reduce(
    Long::sum
).get();

Using the above method, the lambda on line 2 counts the size of the list of temperatures by converting it into a steam and calling the count method. In terms of performance, this is a bad way to get the size of a list. With a thousand cities and a thousand temperatures each, the total count was calculated in 160ms using the first algorithm. The second algorithm increases that time to 280ms! The reason is that an ArrayList knows its size since it tracks it as elements are added or removed. A stream on the other hand calculates the size by first mapping each element to the value 1L and then reducing the stream of 1Ls using the Long::sum method. On long lists of data that is a sizeable overhead when compared to simply looking up the size from an attribute in the list.

Comparing the time required by the functional code to the time required by the following imperative code shows that the functional code is twice as slow – the imperative code calculates the total number of temperatures in an average of just 80ms.

long count = 0;
for(City c : cities){
    count += c.getTemperatures().size();
}

Using a parallel stream instead of a sequential stream, again by simply calling the parallelStream() method instead of the stream() method on line 1 three listings above, results in the algorithm requiring an average of 90ms, i.e. slightly more than the imperative code.

A third way to count temperatures is to use Collectors. Here, I used a million cities, each with just two temperatures. The algorithm is:

int count = cities.stream().collect(
    Collectors.summingInt(c -> 
        c.getTemperatures().size()
    )
);

The equivalent imperative code is:

long count = 0;
for(City c : cities){
    count += c.getTemperatures().size();
}

On average, the functional listing took 100ms, which was the same time taken by the imperative listing. Using a parallel stream on the other hand reduced the calculation time by half, to just 50ms.

The next question I asked myself was whether it is possible to determine how much data needs to be procdessed, so that using a parallel stream becomes worthwhile? Spliting data up, submitting it to an ExecutorService like the ForkJoinPool and collecting the results together after the calculation, isn’t free – it costs in terms of performance. It certainly is possible to work out when it pays off to process data in parallel, and the answer is, typically, that it depends on the use case.

In this experiment I calculate the average of a list of numbers. I repeat the work over and over (NUM_RUNS times) simply to get measurable values, since calculating the average of three numbers is too quick to measure reliably. I vary the size of the list from 3 numbers to three million, to determine how big the list needs to get before it pays off using a parallel stream to calculate the average.

The algorithm used was:

double avg = -1.0;
for(int i = 0; i < NUM_RUNS; i++){
    avg = numbers.stream().collect(
        Collectors.averagingInt(n->n)
    );
}

Just for fun, here is another way to do the calculation:

double avg = -1.0;
for(int i = 0; i < NUM_RUNS; i++){
    avg = numbers.stream().
            mapToInt(n->n).
            average().
            getAsDouble();
}

The results were as follows. With just three numbers in the list I ran the calculation 100,000 times. Running the test many times over, showed that on average, the serial calculation took 20ms compared to the parallel calculation that took 370ms. So with a small sample of data, in this case, it isn’t worth using a parallel stream.

On the other hand, with three million numbers in the list the serial calculation took 1.58 seconds compared to only 0.93 seconds for the parallel calculation. So with a large sample of data, in this case, it is worth using a parallel stream. Note that the number of runs was reduced as the data set size was increased, so that I didn’t have to wait as long for the results (I don’t drink coffee!).

# numbers in list Avg. time SERIAL Avg. time PARALLEL NUM_RUNS
3 0.02s 0.37s 100,000
30 0.02s 0.46s 100,000
300 0.07s 0.53s 100,000
3,000 1.98s 2.76s 100,000
30,000 0.67s 1.90s 10,000
300,000 1.71s 1.98s 1,000
3,000,000 1.58s 0.93s 100

Does that mean that parallel streams are only useful for large data sets? No! It entirely depends on the intensity of the calculation at hand. The following futile algorithm simply heats the CPU, but demonstrates a complex calculation.

private void doIntensiveWork() {
    double a = Math.PI;
    for(int i = 0; i < 100; i++){
        for(int j = 0; j < 1000; j++){
            for(int k = 0; k < 100; k++){
                a = Math.sqrt(a+1);
                a *= a;
            }
        }
    }
    System.out.println(a);
}

We can generate a list of two runnables which do this intensive work using the following listing:

private List<Runnable> generateRunnables() {
    Runnable r = () -> {
        doIntensiveWork();
    };
    return Arrays.asList(r, r);
}

Finally, we can measure the time it takes to run the two runnables, for example in parallel (see the call to the parallelStream() method on line 3):

List<Runnable> runnables = generateRunnables();
Instant start = Instant.now();
runnables.parallelStream().forEach(r -> r.run());
Instant end = Instant.now();
System.out.println(
    "functional parallel calculated in " + 
    Duration.between(start, end));

Using a parallel stream it took an average of 260ms to do the intensive work twice. Using a serial stream, it took an average of 460ms, i.e. nearly double the time.

What can we conclude from all these experiments? Well it isn’t possible to conclusively say that functional code is slower than imperative code and it isn’t possible either to say that using parallel streams is faster than using serial stream. What we can conclude is that programmers need to experiment with different solutions and measure the effects of the coding style on performance, when they write performance critical code. But let’s be honest, that isn’t anything new! For me, what you should be taking away with you after reading this post is that there are always many ways to write algorithms and choosing the right way is important. Knowing which way is right is a combination of experience but more importantly playing around with the code and trying different solutions. Finally though, as always, don’t optimise prematurely 🙂

Copyright ©2014, Ant Kutschera