Parallel Streams
Java 8 introduced the Stream API, which allows functional-style operations on streams of elements. Parallel streams leverage multi-core processors by dividing the stream into multiple substreams and processing them in parallel.
import java.util.Arrays;
import java.util.List;
public class ParallelStreamExample {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Sequential stream
int sum = numbers.stream()
.filter(n -> n % 2 == 0)
.mapToInt(Integer::intValue)
.sum();
System.out.println("Sum (sequential): " + sum); // Output: 30
// Parallel stream
int parallelSum = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.mapToInt(Integer::intValue)
.sum();
System.out.println("Sum (parallel): " + parallelSum); // Output: 30
}
}
Reactive Programming
Reactive programming is a programming paradigm oriented around data flows and the propagation of change. The java.util.concurrent.Flow API, introduced in Java 9, provides the basis for reactive programming.
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
public class ReactiveExample {
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
MySubscriber subscriber = new MySubscriber();
publisher.subscribe(subscriber);
System.out.println("Publishing items...");
publisher.submit("Item 1");
publisher.submit("Item 2");
publisher.submit("Item 3");
publisher.close();
TimeUnit.SECONDS.sleep(1);
}
}
class MySubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // Request one item
}
@Override
public void onNext(String item) {
System.out.println("Received: " + item);
subscription.request(1); // Request next item
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("All items received");
}
}
Interesting Facts and Tips
ThreadLocal
ThreadLocal variables are another way to achieve thread safety by providing each thread with its own isolated instance of a variable.
public class ThreadLocalExample {
private static final ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
public static void main(String[] args) {
Runnable task = () -> {
threadLocal.set((int) (Math.random() * 100D));
System.out.println(Thread.currentThread().getName() + " initial value: " + threadLocal.get());
};
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
}
}
Volatile Keyword
The volatile keyword in Java ensures that changes to a variable are always visible to all threads. It’s used to signal the JVM that a variable’s value will be modified by different threads.
public class VolatileExample {
private volatile boolean running = true;
public void stop() {
running = false;
}
public void run() {
while (running) {
System.out.println("Running");
}
}
public static void main(String[] args) throws InterruptedException {
VolatileExample example = new VolatileExample();
Thread thread = new Thread(example::run);
thread.start();
Thread.sleep(1000);
example.stop();
}
}
Daemon Threads
Daemon threads are low-priority threads that run in the background to perform tasks such as garbage collection. They do not prevent the JVM from exiting when all user threads finish their execution.
public class DaemonThreadExample {
public static void main(String[] args) {
Thread daemonThread = new Thread(() -> {
while (true) {
System.out.println("Daemon thread running");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
daemonThread.setDaemon(true);
daemonThread.start();
System.out.println("Main thread ending");
}
}
Fork/Join Pool Common Pool: Java 8 introduced a common pool for the Fork/Join framework, which is used by parallel streams and other parallel tasks by default.
CompletableFuture Join: Use join() instead of get() for unwrapping results in CompletableFuture. join() throws unchecked exceptions, making it more suitable for stream operations.
Avoiding Deadlocks: Always acquire locks in a consistent order to avoid deadlocks. Using tryLock() with a timeout can help avoid blocking indefinitely.
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class DeadlockAvoidanceExample {
private final Lock lock1 = new ReentrantLock();
private final Lock lock2 = new ReentrantLock();
public void method1() {
if (lock1.tryLock()) {
try {
if (lock2.tryLock()) {
try {
System.out.println("Executing method1");
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}
}
public void method2() {
if (lock2.tryLock()) {
try {
if (lock1.tryLock()) {
try {
System.out.println("Executing method2");
} finally {
lock1.unlock();
}
}
} finally {
lock2.unlock();
}
}
}
public static void main(String[] args) {
DeadlockAvoidanceExample example = new DeadlockAvoidanceExample();
Thread t1 = new Thread(example::method1);
Thread t2 = new Thread(example::method2);
t1.start();
t2.start();
}
}
Conclusion
Multi-threading and asynchronous programming in Java are vast and powerful topics. From basic thread creation and synchronization to advanced concepts like the Fork/Join framework and reactive programming, Java provides extensive support for concurrent and parallel execution. Understanding these concepts and best practices can help you write efficient, high-performance applications. By exploring advanced features and interesting facts, you can further enhance your concurrency skills and make your Java programs more robust and scalable.