Java 8 has presented a new long waited feature to all the java coders around the world -
CompletableFuture. With this, callback function and be given to execute at the completion of task. Also multiple Futures can be stacks.
Blog written by "Tomasz Nurkiewicz" give a good overview about the usage.
Intention of this small blog is to illustrate an interesting observation I came across while playing with
CompletableFuture.
CompletableFuture provide several methods to wire callback functions such as
whenComplete(),
whenCompleteAsync(),
thenApply(),
thenApplyAsync(). Where *Async methods will run in a separate thread and non-async will run in the same thread which will complete the stage. According to API doc
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.
Following code sample will illustrate this.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| import java.util.concurrent.*;
public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
new FutureTest().test();
}
private void test() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<TestClass> f = CompletableFuture.supplyAsync(() -> executeTask("T1"), executor);
System.out.println("Is task completed 1 [" + f.isDone() + "]");
f.thenApplyAsync(t -> {
System.out.println("Inside then apply async [" + Thread.currentThread().getName() + "]");
return "";
}, executor);
System.out.println("Is task completed 2 [" + f.isDone() + "]");
f.whenComplete((t, e) -> {
System.out.println("Inside when complete [" + Thread.currentThread().getName() + "]");
});
System.out.println("Is task completed 3 [" + f.isDone() + "]");
f.thenApply(t -> {
System.out.println("Inside then apply [" + Thread.currentThread().getName() + "]");
return "";
});
System.out.println("Main thread completed");
}
private TestClass executeTask(String name) {
System.out.println("Inside task [" + name + "] thead[" + Thread.currentThread().getName() + "]");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task execution complete");
return new TestClass(name);
}
}
|
When executed result is as we expected
1
2
3
4
5
6
7
8
9
| Is task completed 1 [false]
Inside task [T1] thread[pool-1-thread-1]
Is task completed 2 [false]
Is task completed 3 [false]
Main thread completed
Task execution complete
Inside then apply [pool-1-thread-1]
Inside when complete [pool-1-thread-1]
Inside then apply async [pool-1-thread-2]
|
Task use a thread from executor server. Methods
thenApply() and
whenComplete() both run on same thread as task -
pool-1-thread-1.
thenApplyAsync() uses another thread from pool -
pool-1-thread-2.
Interesting question is what happens if task is already completed. I did this by simply removing
Thread.sleep(1000) in task.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| package ds;
import java.util.concurrent.*;
public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
new FutureTest().test();
}
private void test() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<TestClass> f = CompletableFuture.supplyAsync(() -> executeTask("T1"), executor);
System.out.println("Is task completed 1 [" + f.isDone() + "]");
f.thenApplyAsync(t -> {
System.out.println("Inside then apply async [" + Thread.currentThread().getName() + "]");
return "";
}, executor);
System.out.println("Is task completed 2 [" + f.isDone() + "]");
f.whenComplete((t, e) -> {
System.out.println("Inside when complete [" + Thread.currentThread().getName() + "]");
});
System.out.println("Is task completed 3 [" + f.isDone() + "]");
f.thenApply(t -> {
System.out.println("Inside then apply [" + Thread.currentThread().getName() + "]");
return "";
});
System.out.println("Main thread completed");
}
private TestClass executeTask(String name) {
System.out.println("Inside task [" + name + "] thread[" + Thread.currentThread().getName() + "]");
return new TestClass(name);
}
}
|
And output is
1
2
3
4
5
6
7
8
| Is task completed 1 [false]
Inside task [T1] thread[pool-1-thread-1]
Is task completed 2 [true]
Inside then apply async [pool-1-thread-2]
Inside when complete [main]
Is task completed 3 [true]
Inside then apply [main]
Main thread completed
|
Even thought we cannot guarantee when a thread will executed (which thread will execute first if there a two or more thread are in ready state) results are as expected when we run this simple example.
Task is already completed when line 17 is executed. Because of that methods
whenComple() and
thenApply() both executed on main thread. While thenApplyAsync() is executed on another thread from thread pool.
This is a small observation but we might need to these little information to understand what is going on in a heavily utilized live application. As I mentions earlier we cannot guarantee order of the thread execution. Unexpected execution orders can happen in a application where number of threads are large.
We might need to consider this specially if the amount of work we put on
thenApply() and
whenComplet() methods are considerably heavy as it can block execution of main thread if the task is already completed. Something which can give unexpected results.