Sunday, June 26, 2016

Java 8: CompletableFuture vs Parallel Stream

This post shows how Java 8's CompletableFuture compares with parallel streams when peforming asynchronous computations.

We will use the following class to model a long-running task:

class MyTask {
  private final int duration;
  public MyTask(int duration) {
    this.duration = duration;
  }
  public int calculate() {
    System.out.println(Thread.currentThread().getName());
    try {
      Thread.sleep(duration * 1000);
    } catch (final InterruptedException e) {
      throw new RuntimeException(e);
    }
    return duration;
  }
}

Let's create ten tasks, each with a duration of 1 second:

List<MyTask> tasks = IntStream.range(0, 10)
                                    .mapToObj(i -> new MyTask(1))
                                    .collect(toList());

How can we calculate the list of tasks efficiently?

Approach 1: Sequentially

Your first thought might be to calculate the tasks sequentially, as follows:

public static void runSequentially(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<Integer> result = tasks.stream()
                              .map(MyTask::calculate)
                              .collect(toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

As you might expect, this takes 10 seconds to run, because each task is run one after the other on the main thread.

Approach 2: Using a parallel stream

A quick improvement is to convert your code to use a parallel stream, as shown below:

public static void useParallelStream(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<Integer> result = tasks.parallelStream()
                              .map(MyTask::calculate)
                              .collect(toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

The output is

main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main
Processed 10 tasks in 3043 millis

This time it took 3 seconds because 4 tasks were run in parallel (using three threads from the ForkJoinPool, plus the main thread).

Approach 3: Using CompletableFutures

Let's see if CompletableFutures perform any better:

public static void useCompletableFuture(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<CompletableFuture<Integer>> futures =
      tasks.stream()
           .map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
           .collect(Collectors.toList());

  List<Integer> result =
      futures.stream()
             .map(CompletableFuture::join)
             .collect(Collectors.toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

In the code above, we first obtain a list of CompletableFutures and then invoke the join method on each future to wait for them to complete one by one. Note that join is the same as get, with the only difference being that the former doesn't throw any checked exception, so it's more convenient in a lambda expression.

Also, you must use two separate stream pipelines, as opposed to putting the two map operations after each other, because intermediate stream operations are lazy and you would have ended up processing your tasks sequentially! That's why you first need to collect your CompletableFutures in a list to allow them to start before waiting for their completion.

The output is

ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 4010 millis

It took 4 seconds to process 10 tasks. You will notice that only 3 ForkJoinPool threads were used and that, unlike the parallel stream, the main thread was not used.

Approach 4: Using CompletableFutures with a custom Executor

One of the advantages of CompletableFutures over parallel streams is that they allow you to specify a different Executor to submit their tasks to. This means that you can choose a more suitable number of threads based on your application. Since my example is not very CPU-intensive, I can choose to increase the number of threads to be greater than Runtime.getRuntime().getAvailableProcessors(), as shown below:

public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
  long start = System.nanoTime();
  ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
  List<CompletableFuture<Integer>> futures =
      tasks.stream()
           .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
           .collect(Collectors.toList());

  List<Integer> result =
      futures.stream()
             .map(CompletableFuture::join)
             .collect(Collectors.toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
  executor.shutdown();
}

The output is

pool-1-thread-2
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1009 millis

After this improvement, it now takes only 1 second to process 10 tasks.

As you can see, CompletableFutures provide more control over the size of the thread pool and should be used if your tasks involve I/O. However, if you're doing CPU-intensive operations, there's no point in having more threads than processors, so go for a parallel stream, as it is easier to use.

Sunday, June 19, 2016

Java 8: Default Method Resolution Rules

With the introduction of default methods in Java 8, it is now possible for a class to inherit the same method from multiple places (such as another class or interface). The following rules can be used to determine which method is selected in such cases:

  1. A class or superclass method declaration always takes priority over a default method
  2. Otherwise, the method with the most specific default-providing interface is used
  3. Finally, if the methods are equally specific, there will be a compiler error and you will be forced to explicitly override the method and specify which one your class should call

Let's look at a few examples and apply these rules.

Example 1:

What does the following code print?

public interface A {
  default void name() {
    System.out.println("A");
  }
}

public interface B {
  default void name() {
    System.out.println("B");
  }
}

public class C implements A {
  @Override
  public void name() {
    System.out.println("C");
  }
}

public class D extends C implements A, B {
  public static void main(final String... args) {
    new D().name();
  }
}

Answer: C

This is because, as stated in Rule 1, the method declaration of name() from the superclass C takes priority over the default methods declarations in A and B.

Example 2:

What does the following code print?

public interface A {
  default void name() {
    System.out.println("A");
  }
}

public interface B extends A {
  @Override
  default void name() {
    System.out.println("B");
  }
}

public class C implements A {}

public class D extends C implements A, B {
  public static void main(final String... args) {
    new D().name();
  }
}

Answer: B

Unlike the previous example, C does not override name(), but since it implements A, it has a default method from A. According to Rule 2, if there are no methods in the class or superclass, the most specific default-providing interface is selected. Since B extends A, it is more specific and, as a result, "B" is printed.

Example 3:

What does the following code print?

public interface A {
  default void name() {
    System.out.println("A");
  }
}

public interface B {
  default void name() {
    System.out.println("B");
  }
}

public class D implements A, B {
  public static void main(final String... args) {
    new D().name();
  }
}

Answer: Compiler error! Duplicate default methods named name with the parameters () and () are inherited from the types B and A

In this example, there's no more-specific default-providing interface to select, so the compiler throws an error. To resolve the error, you need to explicitly override the method in D and specify which method declaration you want D to use. For example, if you want to use B's:

class D implements A, B {
  @Override
  public void name() {
    B.super.name();
  }
}

Example 4:

What does the following code print?

public interface A {
  default void name() {
    System.out.println("A");
  }
}

public interface B extends A {}

public interface C extends A {}

public class D implements B, C {
  public static void main(final String... args) {
    new D().name();
  }
}

Answer: A

The sub-interfaces B and C haven't overridden the method, so there is actually only the method from A to choose from. As a side note, if either B or C (but not both) had overridden the method, then Rule 2 would have applied. By the way, this is the diamond problem.

Saturday, June 18, 2016

Java 8: Debugging Stream Pipelines

I've found that stream pipelines can be difficult to debug because stack traces involving lambda expressions are quite cryptic. Consider the following contrived example:

import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

public class Test {
  public static void main(final String[] args) {
    final List<String> list = Arrays.asList("foo", null, "bar");
    list.stream()
        .map(Function.identity())
        .filter(x -> true)
        .map(String::length)
        .forEach(System.out::println);
  }
}

You may have already guessed that the code above will throw a NullPointerException when String.length is called on the null element in the list. I've added extra map and filter operations, which do nothing, just to make the example a bit more interesting. In the real world, you will probably have a number of different operations in your stream pipeline.

Running the code, produces the following stack trace:

Exception in thread "main" java.lang.NullPointerException
  at Test$$Lambda$3/455659002.apply(Unknown Source)
  at java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
  at java.util.stream.ReferencePipeline$2$1.accept(Unknown Source)
  at java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
  at java.util.Spliterators$ArraySpliterator.forEachRemaining(Unknown Source)
  at java.util.stream.AbstractPipeline.copyInto(Unknown Source)
  at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
  at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown Source)
  at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown Source)
  at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
  at java.util.stream.ReferencePipeline.forEach(Unknown Source)
  at Test.main(Test.java:12)

The stack trace shows that a NullPointerException occurred but it doesn't tell you which operation in the pipeline failed. What does Test$$Lambda$3/455659002.apply(Unknown Source) mean and why is there no line number?! Since lambda expressions don't have a name, the compiler makes one up (similar to anonymous classes). In this case, it is Test$$Lambda$3 but that doesn't help us track the bug in our code.

So, what can we do? Let's go old-school and add some logging to our code! We can use peek to print out each element before it is consumed by the next operation in the pipeline.

import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

public class Test {
  public static void main(final String[] args) {
    final List<String> list = Arrays.asList("foo", null, "bar");
    list.stream()
        .peek(x -> System.out.println("Running identity on: " + x))
        .map(Function.identity())
        .peek(x -> System.out.println("Running filter on: " + x))
        .filter(x -> true)
        .peek(x -> System.out.println("Running string length on: " + x))
        .map(String::length)
        .peek(x -> System.out.println("Running print on: " + x))
        .forEach(System.out::println);
  }
}

Running it produces the following output:

Running identity map on: foo
Running filter on: foo
Running string length on: foo
Running print on: 3
3
Running identity map on: null
Running filter on: null
Running string length on: null
Exception in thread "main" java.lang.NullPointerException
  at Test$$Lambda$6/295530567.apply(Unknown Source)
  at java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
  at java.util.stream.ReferencePipeline$11$1.accept(Unknown Source)
  at java.util.stream.ReferencePipeline$2$1.accept(Unknown Source)
  at java.util.stream.ReferencePipeline$11$1.accept(Unknown Source)
  at java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
  at java.util.stream.ReferencePipeline$11$1.accept(Unknown Source)
  at java.util.Spliterators$ArraySpliterator.forEachRemaining(Unknown Source)
  at java.util.stream.AbstractPipeline.copyInto(Unknown Source)
  at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
  at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown Source)
  at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown Source)
  at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
  at java.util.stream.ReferencePipeline.forEach(Unknown Source)
  at Test.main(Test.java:16)

Great! Now we know that the NullPointerException was thrown by the string length lambda!

In general, I think stack traces involving lambdas could be improved in future versions of Java.

Sunday, June 12, 2016

Java 8: Converting Anonymous Classes to Lambda Expressions

Refactoring anonymous classes (that implement a single method) to lambda expressions, makes your code more succint and readable. For example, here's an anonymous class for a Runnable and its lambda equivalent:

// using an anonymous class
Runnable r = new Runnable() {
  @Override
  public void run() {
    System.out.println("Hello");
  }
};

// using a lambda expression
Runnable r2 = () -> System.out.println("Hello");

However, it's not always that simple!

Here are a couple of gotchas:

1. Different scoping rules

There are different scoping rules between anonymous classes and lambda expressions. For example, in lambda expressions, this and super are lexically scoped, meaning they are relative to the enclosing class, but in an anonymous class, they are relative to the anonymous class itself. Similarly, local variables declared in lambda expressions will conflict with variables declared in the enclosing class, but in anonymous classes, they are allowed to shadow variables in the enclosing class. Here is an example:

int foo = 1;
Runnable r = new Runnable() {
  @Override
  public void run() {
    // this is ok!
    int foo = 2;
  }
};

Runnable r2 = () -> {
  // compile error: Lambda expression's local variable foo cannot
  // redeclare another local variable defined in an enclosing scope.
  int foo = 2;
};

2. Overloaded methods

If you have an overloaded method, using lambda expressions can result in an ambiguous method call and will require explicit casting. Here is an example:

// Functional interface
interface Task {
  public void execute();
}

// Overloaded methods
public static void go(final Runnable r) {
  r.run();
}
public static void go(final Task t) {
  t.execute();
}

// Calling the overloaded method:

// When using an anonymous class, there is no ambiguity because
// the type of the class is explicit at instantiation
go(new Task() {
  @Override
  public void execute() {
     System.out.println("Hello");
  }
});

// When using a lambda expression, there is a compile error!
// The method go(Runnable) is ambiguous
go(() -> {
  System.out.println("Hello");
});

// This ambiguity can be solved with an explicit cast
go((Task)() -> {
  System.out.println("Hello");
});

Saturday, May 07, 2016

kdb+/q - Running a function multiple times and concatenating results

Let's say that you have a q function that you want to run for multiple inputs (for example, a list of dates) and you want to concatenate the outputs of each function call into a single table. This is demonstrated below:

// A function that returns some data for a single date.
getData:{[dt]
    t:select from data where date=dt;
    // do some more stuff
    t}

// list of dates
dates:2016.01.01+til 10

// call the function for each date and join results
result:(uj/) getData each dates;

// or, if you want to do some processing after each function call:
result:(uj/) {[dt;param]
    t:getData[dt];
    // do some stuff with param
    t}[;`foo] each dates;