03 March 2023

Spring @ConfigurationProperties and unit tests

 This helps to initiate spring @ConfigurationProperties in a junit test with spring extension.


Spring version - 2.5.2

JDK - 11


@ExtendWith(SpringExtension.class)
@EnableConfigurationProperties(value = CustomConfig.class)
@ContextConfiguration(classes = {ApplicationService.class},
initializers = ConfigDataApplicationContextInitializer.class)
@ActiveProfiles(value = "test")

class ApplicationServiceTest {} 

ConfigDataApplicationContextInitializer.class is needed if you are using yaml format.


21 June 2014

Synchronization for Resource Initialization


There are certain situations where we need to do this type of thing in programming


  1. Initiate a resource - (normally one time)
  2. Keep using the resource
  3. In a fault when using the resource, reinitialize resource
  4. And keep using again


Few examples can be JMX proxy/RMI connection or TCP connection to a legacy system.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import java.io.IOException;

public class Connection {

    public void connect() {
        //
    }

    public void sendMessage(String message) throws IOException {
        // do send message and throw IOException in case of connection error
    }

}


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.io.IOException;

public class Client {

    private Connection connection;

    public Client() {
        initConnection();
    }

    private void initConnection() {
        connection = new Connection();
        connection.connect();
    }

    public void process() {

        boolean success = false;
        while (!success) {
            try {
                connection.sendMessage("some message");
                success = true;
            } catch (IOException e) {
                initConnection();
            }
        }
    }
}

Connection class encapsulate resource management. Client.process() method use connection class to send an message and in case of an error, will reconnect by calling Connection.connect()

One issue can rise here when Client.process() method accessed concurrently which is not so rare in an event driven systems. When multiple threads call Client.process() method at a time where connection is unavailable, each thread will call connection.connect() method. This is not desirable since it can cause concurrent issues.

One solution is to synchronous access to initConnection() method.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public class Client {

    private AtomicReference<Connection> connection = new AtomicReference<Connection>();
    private Semaphore mutex = new Semaphore(1);

...

    private void initConnection() throws InterruptedException {
        try {
            mutex.acquire();
            Connection c = new Connection();
            c.connect();
            connection.set(c);
        } finally {
            mutex.release();
        }
    }

 ...
}

Accessing connection object through AtomicReference make sure thread safety when multiple threads modify it.

Now concurrency issues have been dealt with. But practically there will be another problem in a condition of considerable number of threads accesses initConnection() method.

Consider two threads A and B. Thread A will acquire mutex and calling c.connect() method where thread B will wait at line 10. After thread A successfully initiate the connection and set to AtomicReference, thread B will acquire the mutex and do the same thing again. Causing unnecessarily reconnect. This might be noticeable specially as the concurrent access increases.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Client {

    private final AtomicReference<Connection> connection = new AtomicReference<Connection>();
    private final Semaphore mutex = new Semaphore(1);
    private final Semaphore mutexStatus = new Semaphore(1);

    ...

    private void initConnection() throws InterruptedException {
        boolean needToReconnect = false;
        try {
            needToReconnect = mutexStatus.tryAcquire();
            mutex.acquire();
            if (!needToReconnect) {
                return;
            }
            Connection c = new Connection();
            c.connect();
            connection.set(c);
        } finally {
            mutex.release();
            if (needToReconnect) {
                mutexStatus.release();
            }
        }
    }

    ...

}


As a solution another Semaphore is included to check the status of the mutex - to check whether someone have already inside the mutex. tryAcquire() method can be used in this case. If another thread already acquired the mutex means a reconnection process is already underway so thread can return immediately after previous thread release the mutex (line 15).

With this if a thread is already initializing the resource other threads can wait and detect it. One problem is if connection initialization failed other threads does not have a way to identify that. But that also can be easily dealt with another boolean variable to check the status of the connections at line 14.


1
2
3
if (!needToReconnect && connection.get().isConnected()) {
     return;
}


This may not be the only way to do this - only the way came in to my mind. Please comment if you have any improvements over this. Thanks

03 June 2014

Thread usage of CompletableFuture callback functions


Java 8 has presented a new long waited feature to all the java coders around the world - CompletableFuture. With this, callback function and be given to execute at the completion of task. Also multiple Futures can be stacks. Blog written by "Tomasz Nurkiewicz" give a good overview about the usage.

Intention of this small blog is to illustrate an interesting observation I came across while playing with CompletableFuture.

CompletableFuture provide several methods to wire callback functions such as whenComplete(), whenCompleteAsync(), thenApply(), thenApplyAsync(). Where *Async methods will run in a separate thread and non-async will run in the same thread which will complete the stage. According to API doc

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

Following code sample will illustrate this.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.util.concurrent.*;

public class FutureTest {



    public static void main(String[] args) throws ExecutionException, InterruptedException {
        new FutureTest().test();
    }

    private void test() throws ExecutionException, InterruptedException {

        ExecutorService executor = Executors.newFixedThreadPool(3);

        CompletableFuture<TestClass> f = CompletableFuture.supplyAsync(() -> executeTask("T1"), executor);

        System.out.println("Is task completed 1 [" + f.isDone() + "]");

        f.thenApplyAsync(t -> {
            System.out.println("Inside then apply async [" + Thread.currentThread().getName() + "]");
            return "";
        }, executor);

        System.out.println("Is task completed 2 [" + f.isDone() + "]");

        f.whenComplete((t, e) -> {
            System.out.println("Inside when complete [" + Thread.currentThread().getName() + "]");
        });

        System.out.println("Is task completed 3 [" + f.isDone() + "]");

        f.thenApply(t -> {
            System.out.println("Inside then apply [" + Thread.currentThread().getName() + "]");
            return "";
        });


        System.out.println("Main thread completed");
    }

    private TestClass executeTask(String name) {
        System.out.println("Inside task [" + name + "] thead[" + Thread.currentThread().getName() + "]");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task execution complete");
        return new TestClass(name);
    }

}


When executed result is as we expected


1
2
3
4
5
6
7
8
9
Is task completed 1 [false]
Inside task [T1] thread[pool-1-thread-1]
Is task completed 2 [false]
Is task completed 3 [false]
Main thread completed
Task execution complete
Inside then apply [pool-1-thread-1]
Inside when complete [pool-1-thread-1]
Inside then apply async [pool-1-thread-2]


Task use a thread from executor server. Methods thenApply() and whenComplete() both run on same thread as task - pool-1-thread-1. thenApplyAsync() uses another thread from pool - pool-1-thread-2.

Interesting question is what happens if task is already completed. I did this by simply removing Thread.sleep(1000) in task.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package ds;

import java.util.concurrent.*;

public class FutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        new FutureTest().test();
    }

    private void test() throws ExecutionException, InterruptedException {

        ExecutorService executor = Executors.newFixedThreadPool(3);

        CompletableFuture<TestClass> f = CompletableFuture.supplyAsync(() -> executeTask("T1"), executor);

        System.out.println("Is task completed 1 [" + f.isDone() + "]");

        f.thenApplyAsync(t -> {
            System.out.println("Inside then apply async [" + Thread.currentThread().getName() + "]");
            return "";
        }, executor);

        System.out.println("Is task completed 2 [" + f.isDone() + "]");

        f.whenComplete((t, e) -> {
            System.out.println("Inside when complete [" + Thread.currentThread().getName() + "]");
        });

        System.out.println("Is task completed 3 [" + f.isDone() + "]");

        f.thenApply(t -> {
            System.out.println("Inside then apply [" + Thread.currentThread().getName() + "]");
            return "";
        });


        System.out.println("Main thread completed");
    }

    private TestClass executeTask(String name) {
        System.out.println("Inside task [" + name + "] thread[" + Thread.currentThread().getName() + "]");
        return new TestClass(name);
    }

}

And output is


1
2
3
4
5
6
7
8
Is task completed 1 [false]
Inside task [T1] thread[pool-1-thread-1]
Is task completed 2 [true]
Inside then apply async [pool-1-thread-2]
Inside when complete [main]
Is task completed 3 [true]
Inside then apply [main]
Main thread completed

Even thought we cannot guarantee when a thread will executed (which thread will execute first if there a two or more thread are in ready state) results are as expected when we run this simple example.

Task is already completed when line 17 is executed. Because of that methods whenComple() and thenApply() both executed on main thread. While thenApplyAsync() is executed on another thread from thread pool.

This is a small observation but we might need to these little information to understand what is going on in a heavily utilized live application. As I mentions earlier we cannot guarantee order of the thread execution. Unexpected execution orders can happen in a application where number of threads are large.

We might need to consider this specially if the amount of work we put on thenApply() and whenComplet() methods are considerably heavy as it can block execution of main thread if the task is already completed. Something which can give unexpected results.



19 July 2012

The Senate Bus Problem


Recently I did a assignment in my MSc in Concurrent programming. The problem was taken from Little Book of Semaphores and the name of the problem was given as "Senate Bus Problem".

The problem in interest is described in the book as follows.

This problem was originally based on the Senate bus at Wellesley College. Riderscome to a bus stop and wait for a bus. When the bus arrives, all the waitingriders invoke boardBus, but anyone who arrives while the bus is boarding hasto wait for the next bus. The capacity of the bus is 50 people; if there are morethan 50 people waiting, some will have to wait for the next bus.When all the waiting riders have boarded, the bus can invoke depart. If thebus arrives when there are no riders, it should depart immediately.Puzzle: Write synchronization code that enforces all of these constraints.



One solution given in book can be implemented in java


1:  public class BusImpl implements Bus {  
2:    private int waiting = 0;  
3:    private Semaphore mutex = new Semaphore(1);  
4:    private Semaphore bus = new Semaphore(0);  
5:    private Semaphore boarded = new Semaphore(0);  
6:    @Override  
7:    public void takeRide() throws InterruptedException {  
8:      mutex.acquire();  
9:      waiting++;  
10:      mutex.release();  
11:      bus.acquire();  
12:      board();  
13:      boarded.release();  
14:    }  
15:    public void board() {  
16:      System.out.println("Rider boarded");  
17:    }  
18:    @Override  
19:    public void arrive() throws InterruptedException {  
20:      mutex.acquire();  
21:      int n = Math.min(waiting, 50);  
22:      for(int i = 0; i < n; i++) {  
23:        bus.release();  
24:        boarded.acquire();  
25:      }  
26:      waiting = Math.max((waiting - 50), 0);  
27:      mutex.release();  
28:      depart();  
29:    }  
30:    public void depart() {  
31:      System.out.println("Bus departed.");  
32:    }  
33:  }  




Both solutions in book of coarse optimized for minimum usage of variables. When considering performance it is obvious that we can do better. One thing noticed was in the line bus.release() it is almost sequential.  For example if number of riders are 3 the minimum context switches needed will be 7 (starting with bus arrival).


I came up this solution :- 



1:  public class MultiDoorBusImpl implements Bus {  
2:    private int waitingRiderCount = 0;  
3:    private int noOfRidersAllowed = 0;  
4:    private int boardedRidersCount = 0;  
5:    private Semaphore mutex1 = new Semaphore(1);  
6:    private Semaphore mutex2 = new Semaphore(1);  
7:    private Semaphore bus = new Semaphore(0);  
8:    private Semaphore allBoarded = new Semaphore(0);  
9:    @Override  
10:    public void takeRide() throws InterruptedException {  
11:      mutex1.acquire();  
12:      waitingRiderCount++;  
13:      mutex1.release();  
14:      bus.acquire();  
15:      board();  
16:      mutex2.acquire();  
17:      boardedRidersCount++;  
18:      if (boardedRidersCount == noOfRidersAllowed) {  
19:        allBoarded.release();  
20:      }  
21:      mutex2.release();  
22:    }  
23:    @Override  
24:    public void board() {  
25:    }  
26:    @Override  
27:    public void arrive() throws InterruptedException {  
28:      mutex1.acquire();  
29:      noOfRidersAllowed = Math.min(50, waitingRiderCount);  
30:      if (noOfRidersAllowed > 0) {  
31:        bus.release(noOfRidersAllowed);  
32:        allBoarded.acquire();  
33:      }  
34:      mutex1.release();  
35:      depart();  
36:    }  
37:    @Override  
38:    public void depart() {  
39:    }  
40:  }  

Where in this case minimum number of context switches needed for 3 riders will be 5 (starting with bus arrival). Of cause it uses more semaphores/variables.



Have to say that in my nearly 5 years of  work I have faced little bit of concurrent issues, but this book is total in different level.






27 June 2012

Dynamic dependency injection with Lift framework


Recently I have been working with scala and Lift.
Among many other features lift provide a convenient way to fordependency injection. Although cake pattern is the way to do dependency injection for scala users, it lakes in certain areas. Specially when working with multiple modules where abstraction and concrete implementation are in separate modules.
Lift Injector comes in handy in this context.
For example think of you have two module project core-logic and repository. core-logic is the place you want to keep all the business logic without infrastructure level details. core-logic will have some abstract  interfaces for repositories. e.g:-
trait UserRepository {
def createUserAccount(ua: UserAccount): Either[ErrorContext, UserAccount]
def findUserAccountByUsername(username: String): Either[ErrorContext, UserAccount]
}
repository is the place you want to keep the infrastructure (e.g:- db layer) level details.
class UserRepositoryImpl extends UserRepository {
      override def createUserAccount(ua: UserAccount): Either[ErrorContext, UserAccount] = {
      …
     }
}
Using static injection method it is difficult to inject this kind of dependency injection
object UserManagementDepInjector extends SimpleInjector {
}
With lift SimpleInjector you can register injections dynamically. This can be done at boot (if you are using life web framework)
class Boot {
   def boot {
      UserManagementDepInjector.registerInjection(() =&gt; {new UserRepositoryImpl()})(Manifest.classType(classOf[UserRepository]))
      …
   }
}
As in above we can register injections and Injector itself will handle injecting the correct concrete implementation.
class UserService {
   val userRepository: UserRepository = UserManagementDepInjector.inject[UserRepository].openTheBox
}
Need to mention that using of Box.openTheBox method may not be the recommended way. But I think in most practical cases application is not useful if that dependency is not available, so why bother !!!