21 June 2014

Synchronization for Resource Initialization


There are certain situations where we need to do this type of thing in programming


  1. Initiate a resource - (normally one time)
  2. Keep using the resource
  3. In a fault when using the resource, reinitialize resource
  4. And keep using again


Few examples can be JMX proxy/RMI connection or TCP connection to a legacy system.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import java.io.IOException;

public class Connection {

    public void connect() {
        //
    }

    public void sendMessage(String message) throws IOException {
        // do send message and throw IOException in case of connection error
    }

}


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.io.IOException;

public class Client {

    private Connection connection;

    public Client() {
        initConnection();
    }

    private void initConnection() {
        connection = new Connection();
        connection.connect();
    }

    public void process() {

        boolean success = false;
        while (!success) {
            try {
                connection.sendMessage("some message");
                success = true;
            } catch (IOException e) {
                initConnection();
            }
        }
    }
}

Connection class encapsulate resource management. Client.process() method use connection class to send an message and in case of an error, will reconnect by calling Connection.connect()

One issue can rise here when Client.process() method accessed concurrently which is not so rare in an event driven systems. When multiple threads call Client.process() method at a time where connection is unavailable, each thread will call connection.connect() method. This is not desirable since it can cause concurrent issues.

One solution is to synchronous access to initConnection() method.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public class Client {

    private AtomicReference<Connection> connection = new AtomicReference<Connection>();
    private Semaphore mutex = new Semaphore(1);

...

    private void initConnection() throws InterruptedException {
        try {
            mutex.acquire();
            Connection c = new Connection();
            c.connect();
            connection.set(c);
        } finally {
            mutex.release();
        }
    }

 ...
}

Accessing connection object through AtomicReference make sure thread safety when multiple threads modify it.

Now concurrency issues have been dealt with. But practically there will be another problem in a condition of considerable number of threads accesses initConnection() method.

Consider two threads A and B. Thread A will acquire mutex and calling c.connect() method where thread B will wait at line 10. After thread A successfully initiate the connection and set to AtomicReference, thread B will acquire the mutex and do the same thing again. Causing unnecessarily reconnect. This might be noticeable specially as the concurrent access increases.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Client {

    private final AtomicReference<Connection> connection = new AtomicReference<Connection>();
    private final Semaphore mutex = new Semaphore(1);
    private final Semaphore mutexStatus = new Semaphore(1);

    ...

    private void initConnection() throws InterruptedException {
        boolean needToReconnect = false;
        try {
            needToReconnect = mutexStatus.tryAcquire();
            mutex.acquire();
            if (!needToReconnect) {
                return;
            }
            Connection c = new Connection();
            c.connect();
            connection.set(c);
        } finally {
            mutex.release();
            if (needToReconnect) {
                mutexStatus.release();
            }
        }
    }

    ...

}


As a solution another Semaphore is included to check the status of the mutex - to check whether someone have already inside the mutex. tryAcquire() method can be used in this case. If another thread already acquired the mutex means a reconnection process is already underway so thread can return immediately after previous thread release the mutex (line 15).

With this if a thread is already initializing the resource other threads can wait and detect it. One problem is if connection initialization failed other threads does not have a way to identify that. But that also can be easily dealt with another boolean variable to check the status of the connections at line 14.


1
2
3
if (!needToReconnect && connection.get().isConnected()) {
     return;
}


This may not be the only way to do this - only the way came in to my mind. Please comment if you have any improvements over this. Thanks

03 June 2014

Thread usage of CompletableFuture callback functions


Java 8 has presented a new long waited feature to all the java coders around the world - CompletableFuture. With this, callback function and be given to execute at the completion of task. Also multiple Futures can be stacks. Blog written by "Tomasz Nurkiewicz" give a good overview about the usage.

Intention of this small blog is to illustrate an interesting observation I came across while playing with CompletableFuture.

CompletableFuture provide several methods to wire callback functions such as whenComplete(), whenCompleteAsync(), thenApply(), thenApplyAsync(). Where *Async methods will run in a separate thread and non-async will run in the same thread which will complete the stage. According to API doc

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

Following code sample will illustrate this.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.util.concurrent.*;

public class FutureTest {



    public static void main(String[] args) throws ExecutionException, InterruptedException {
        new FutureTest().test();
    }

    private void test() throws ExecutionException, InterruptedException {

        ExecutorService executor = Executors.newFixedThreadPool(3);

        CompletableFuture<TestClass> f = CompletableFuture.supplyAsync(() -> executeTask("T1"), executor);

        System.out.println("Is task completed 1 [" + f.isDone() + "]");

        f.thenApplyAsync(t -> {
            System.out.println("Inside then apply async [" + Thread.currentThread().getName() + "]");
            return "";
        }, executor);

        System.out.println("Is task completed 2 [" + f.isDone() + "]");

        f.whenComplete((t, e) -> {
            System.out.println("Inside when complete [" + Thread.currentThread().getName() + "]");
        });

        System.out.println("Is task completed 3 [" + f.isDone() + "]");

        f.thenApply(t -> {
            System.out.println("Inside then apply [" + Thread.currentThread().getName() + "]");
            return "";
        });


        System.out.println("Main thread completed");
    }

    private TestClass executeTask(String name) {
        System.out.println("Inside task [" + name + "] thead[" + Thread.currentThread().getName() + "]");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task execution complete");
        return new TestClass(name);
    }

}


When executed result is as we expected


1
2
3
4
5
6
7
8
9
Is task completed 1 [false]
Inside task [T1] thread[pool-1-thread-1]
Is task completed 2 [false]
Is task completed 3 [false]
Main thread completed
Task execution complete
Inside then apply [pool-1-thread-1]
Inside when complete [pool-1-thread-1]
Inside then apply async [pool-1-thread-2]


Task use a thread from executor server. Methods thenApply() and whenComplete() both run on same thread as task - pool-1-thread-1. thenApplyAsync() uses another thread from pool - pool-1-thread-2.

Interesting question is what happens if task is already completed. I did this by simply removing Thread.sleep(1000) in task.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package ds;

import java.util.concurrent.*;

public class FutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        new FutureTest().test();
    }

    private void test() throws ExecutionException, InterruptedException {

        ExecutorService executor = Executors.newFixedThreadPool(3);

        CompletableFuture<TestClass> f = CompletableFuture.supplyAsync(() -> executeTask("T1"), executor);

        System.out.println("Is task completed 1 [" + f.isDone() + "]");

        f.thenApplyAsync(t -> {
            System.out.println("Inside then apply async [" + Thread.currentThread().getName() + "]");
            return "";
        }, executor);

        System.out.println("Is task completed 2 [" + f.isDone() + "]");

        f.whenComplete((t, e) -> {
            System.out.println("Inside when complete [" + Thread.currentThread().getName() + "]");
        });

        System.out.println("Is task completed 3 [" + f.isDone() + "]");

        f.thenApply(t -> {
            System.out.println("Inside then apply [" + Thread.currentThread().getName() + "]");
            return "";
        });


        System.out.println("Main thread completed");
    }

    private TestClass executeTask(String name) {
        System.out.println("Inside task [" + name + "] thread[" + Thread.currentThread().getName() + "]");
        return new TestClass(name);
    }

}

And output is


1
2
3
4
5
6
7
8
Is task completed 1 [false]
Inside task [T1] thread[pool-1-thread-1]
Is task completed 2 [true]
Inside then apply async [pool-1-thread-2]
Inside when complete [main]
Is task completed 3 [true]
Inside then apply [main]
Main thread completed

Even thought we cannot guarantee when a thread will executed (which thread will execute first if there a two or more thread are in ready state) results are as expected when we run this simple example.

Task is already completed when line 17 is executed. Because of that methods whenComple() and thenApply() both executed on main thread. While thenApplyAsync() is executed on another thread from thread pool.

This is a small observation but we might need to these little information to understand what is going on in a heavily utilized live application. As I mentions earlier we cannot guarantee order of the thread execution. Unexpected execution orders can happen in a application where number of threads are large.

We might need to consider this specially if the amount of work we put on thenApply() and whenComplet() methods are considerably heavy as it can block execution of main thread if the task is already completed. Something which can give unexpected results.