Asynchronous Programming: Advanced Concepts

Parallel Streams

Java 8 introduced the Stream API, which allows functional-style operations on streams of elements. Parallel streams leverage multi-core processors by dividing the stream into multiple substreams and processing them in parallel.

Java
import java.util.Arrays;
import java.util.List;

public class ParallelStreamExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // Sequential stream
        int sum = numbers.stream()
                         .filter(n -> n % 2 == 0)
                         .mapToInt(Integer::intValue)
                         .sum();
        System.out.println("Sum (sequential): " + sum); // Output: 30

        // Parallel stream
        int parallelSum = numbers.parallelStream()
                                 .filter(n -> n % 2 == 0)
                                 .mapToInt(Integer::intValue)
                                 .sum();
        System.out.println("Sum (parallel): " + parallelSum); // Output: 30
    }
}

Reactive Programming

Reactive programming is a programming paradigm oriented around data flows and the propagation of change. The java.util.concurrent.Flow API, introduced in Java 9, provides the basis for reactive programming.

Java
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public class ReactiveExample {
    public static void main(String[] args) throws InterruptedException {
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        MySubscriber subscriber = new MySubscriber();
        publisher.subscribe(subscriber);

        System.out.println("Publishing items...");
        publisher.submit("Item 1");
        publisher.submit("Item 2");
        publisher.submit("Item 3");

        publisher.close();
        TimeUnit.SECONDS.sleep(1);
    }
}

class MySubscriber implements Flow.Subscriber<String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); // Request one item
    }

    @Override
    public void onNext(String item) {
        System.out.println("Received: " + item);
        subscription.request(1); // Request next item
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("All items received");
    }
}

Interesting Facts and Tips

ThreadLocal

ThreadLocal variables are another way to achieve thread safety by providing each thread with its own isolated instance of a variable.

Java
public class ThreadLocalExample {
    private static final ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);

    public static void main(String[] args) {
        Runnable task = () -> {
            threadLocal.set((int) (Math.random() * 100D));
            System.out.println(Thread.currentThread().getName() + " initial value: " + threadLocal.get());
        };

        Thread t1 = new Thread(task);
        Thread t2 = new Thread(task);
        t1.start();
        t2.start();
    }
}

Volatile Keyword

The volatile keyword in Java ensures that changes to a variable are always visible to all threads. It’s used to signal the JVM that a variable’s value will be modified by different threads.

Java
public class VolatileExample {
    private volatile boolean running = true;

    public void stop() {
        running = false;
    }

    public void run() {
        while (running) {
            System.out.println("Running");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        VolatileExample example = new VolatileExample();
        Thread thread = new Thread(example::run);
        thread.start();

        Thread.sleep(1000);
        example.stop();
    }
}

Daemon Threads

Daemon threads are low-priority threads that run in the background to perform tasks such as garbage collection. They do not prevent the JVM from exiting when all user threads finish their execution.

Java
public class DaemonThreadExample {
    public static void main(String[] args) {
        Thread daemonThread = new Thread(() -> {
            while (true) {
                System.out.println("Daemon thread running");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        daemonThread.setDaemon(true);
        daemonThread.start();

        System.out.println("Main thread ending");
    }
}

Fork/Join Pool Common Pool: Java 8 introduced a common pool for the Fork/Join framework, which is used by parallel streams and other parallel tasks by default.

CompletableFuture Join: Use join() instead of get() for unwrapping results in CompletableFuture. join() throws unchecked exceptions, making it more suitable for stream operations.

Avoiding Deadlocks: Always acquire locks in a consistent order to avoid deadlocks. Using tryLock() with a timeout can help avoid blocking indefinitely.

Java
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class DeadlockAvoidanceExample {
    private final Lock lock1 = new ReentrantLock();
    private final Lock lock2 = new ReentrantLock();

    public void method1() {
        if (lock1.tryLock()) {
            try {
                if (lock2.tryLock()) {
                    try {
                        System.out.println("Executing method1");
                    } finally {
                        lock2.unlock();
                    }
                }
            } finally {
                lock1.unlock();
            }
        }
    }

    public void method2() {
        if (lock2.tryLock()) {
            try {
                if (lock1.tryLock()) {
                    try {
                        System.out.println("Executing method2");
                    } finally {
                        lock1.unlock();
                    }
                }
            } finally {
                lock2.unlock();
            }
        }
    }

    public static void main(String[] args) {
        DeadlockAvoidanceExample example = new DeadlockAvoidanceExample();
        Thread t1 = new Thread(example::method1);
        Thread t2 = new Thread(example::method2);
        t1.start();
        t2.start();
    }
}

Conclusion

Multi-threading and asynchronous programming in Java are vast and powerful topics. From basic thread creation and synchronization to advanced concepts like the Fork/Join framework and reactive programming, Java provides extensive support for concurrent and parallel execution. Understanding these concepts and best practices can help you write efficient, high-performance applications. By exploring advanced features and interesting facts, you can further enhance your concurrency skills and make your Java programs more robust and scalable.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Scroll to Top