I'm always excited to take on new projects and collaborate with innovative minds.

Email

contact@niteshsynergy.com

Website

https://www.niteshsynergy.com/

Multithreading

Multithreading enables a program to run multiple threads concurrently, improving performance and responsiveness. Each thread runs independently but shares the same memory, making efficient resource utilization possible. It's ideal for tasks like handling user input, background processing, or parallel computations. However, it requires careful synchronization to avoid race conditions and deadlocks.

Multithreading enables a program to run multiple threads concurrently, improving performance and responsiveness. Each thread runs independently but shares the same memory, making efficient resource utilization possible. It's ideal for tasks like handling user input, background processing, or parallel computations. However, it requires careful synchronization to avoid race conditions and deadlocks.

1. What is Multithreading?

Multithreading in Java is a process of executing multiple threads simultaneously to maximize CPU utilization. A thread is the smallest unit of a process that can be scheduled independently.

 

2. Why Multithreading?

  1. Maximizes CPU Utilization: Keeps the CPU busy by performing multiple tasks concurrently.
  2. Improves Performance: Reduces response time for applications like games or server-based systems.
  3. Handles Multiple Tasks: Ideal for real-time use cases like handling concurrent requests in web servers or processing video games.
  4. Responsive Systems: Keeps GUIs responsive (e.g., gaming apps with background music and rendering).

3. Complex Use Case: Gaming (e.g., Free Fire or Racing Games)

In a game:

  • One thread handles user input (keyboard/mouse).
  • Another handles rendering graphics.
  • A third manages background music or network synchronization.

 

Threads vs. Processes

Java executes threads (lightweight processes) within a JVM, sharing memory and resources. No code required here.

 

Thread Lifecycle (States)

Thread states: NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED.

 

Thread thread = new Thread(() -> System.out.println("Running"));  
System.out.println(thread.getState()); // NEW  
thread.start();  
System.out.println(thread.getState()); // RUNNABLE  
 

Creating Threads:

1. Using Thread Class:

package com.niteshsynergy.multithreading;

class MyThread extends Thread {
   @Override
   public void run() {
       System.out.println("Thread running using Thread class");
   }
}

public class Main {
   public static void main(String[] args) {
       MyThread thread = new MyThread();
       thread.start();
   }
}

2. Using Runnable Interface:

package com.niteshsynergy.multithreading;

class MyRunnable implements Runnable {
   @Override
   public void run() {
       System.out.println("Thread running using Runnable interface");
   }
}

public class Main {
   public static void main(String[] args) {
       Thread thread = new Thread(new MyRunnable());
       thread.start();
   }
}
 

3. Using Anonymous Inner Class:

package com.niteshsynergy.multithreading;

public class Main {
   public static void main(String[] args) {
       Thread thread = new Thread(new Runnable() {
           @Override
           public void run() {
               System.out.println("Thread running using anonymous inner class");
           }
       });
       thread.start();
   }
}
 

4. Using Lambda Expression (Java 8+):

package com.niteshsynergy.multithreading;

public class Main {
   public static void main(String[] args) {
       Thread thread = new Thread(() -> System.out.println("Thread running using lambda expression"));
       thread.start();
   }
}
 

5. Using ExecutorService:

package com.niteshsynergy.multithreading;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
   public static void main(String[] args) {
       ExecutorService executor = Executors.newSingleThreadExecutor();
       executor.submit(() -> System.out.println("Thread running using ExecutorService"));
       executor.shutdown();
   }
}
 

6. Using Callable with ExecutorService:

package com.niteshsynergy.multithreading;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
   public static void main(String[] args) {
       ExecutorService executor = Executors.newSingleThreadExecutor();

       Callable<String> task = () -> {
           return "Thread running using Callable and ExecutorService";
       };

       try {
           Future<String> future = executor.submit(task);
           System.out.println(future.get());
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           executor.shutdown();
       }
   }
}
 

 

Creating a complete real-time, complex project is extensive, but I can provide a full implementation for a simplified real-time multiplayer gaming application using Java, showcasing multithreading, socket communication, and concurrency.

Here’s an example of a "Multiplayer Tic-Tac-Toe Game":

Overview: Multiplayer Tic-Tac-Toe

  1. Technology Stack:
    • Java Multithreading: Manages multiple players.
    • Socket Communication: Real-time communication between server and clients.
    • Concurrency: Synchronized board access.
  2. Structure:
    • GameServer: Listens for players, creates game sessions.
    • GameClient: Connects players to the server.
    • GameSession: Handles game logic and state for two players.
    • Shared Board: Synchronized for thread-safe operations.
image-183.png

 

Code

1. Constants.java (Utilities)

package utils;

public class Constants {
   public static final int PORT = 5000;
   public static final String HOST = "localhost";
}
 

2. Board.java (Model)

package model;

import java.util.Arrays;

public class Board {
   private char[][] board = new char[3][3];
   private char winner = '\0';

   public Board() {
       for (char[] row : board) {
           Arrays.fill(row, ' ');
       }
   }

   public synchronized boolean makeMove(int row, int col, char symbol) {
       if (row < 0 || row >= 3 || col < 0 || col >= 3 || board[row][col] != ' ') {
           return false;
       }
       board[row][col] = symbol;
       return true;
   }

   public synchronized boolean checkWinner(char symbol) {
       // Check rows, columns, and diagonals for victory
       for (int i = 0; i < 3; i++) {
           if ((board[i][0] == symbol && board[i][1] == symbol && board[i][2] == symbol) ||
               (board[0][i] == symbol && board[1][i] == symbol && board[2][i] == symbol)) {
               winner = symbol;
               return true;
           }
       }
       if ((board[0][0] == symbol && board[1][1] == symbol && board[2][2] == symbol) ||
           (board[0][2] == symbol && board[1][1] == symbol && board[2][0] == symbol)) {
           winner = symbol;
           return true;
       }
       return false;
   }

   public synchronized boolean isFull() {
       for (char[] row : board) {
           for (char cell : row) {
               if (cell == ' ') return false;
           }
       }
       return true;
   }

   public synchronized char getWinner() {
       return winner;
   }

   public synchronized void printBoard() {
       for (char[] row : board) {
           System.out.println(Arrays.toString(row));
       }
   }
}
 

 

3. GameSession.java (Server)

package server;

import model.Board;

import java.io.*;
import java.net.Socket;

public class GameSession implements Runnable {
   private Socket player1;
   private Socket player2;
   private Board board;

   public GameSession(Socket player1, Socket player2) {
       this.player1 = player1;
       this.player2 = player2;
       this.board = new Board();
   }

   @Override
   public void run() {
       try {
           DataInputStream input1 = new DataInputStream(player1.getInputStream());
           DataOutputStream output1 = new DataOutputStream(player1.getOutputStream());
           DataInputStream input2 = new DataInputStream(player2.getInputStream());
           DataOutputStream output2 = new DataOutputStream(player2.getOutputStream());

           output1.writeUTF("Welcome Player 1 (Symbol: X)");
           output2.writeUTF("Welcome Player 2 (Symbol: O)");

           char currentSymbol = 'X';

           while (true) {
               DataInputStream currentInput = (currentSymbol == 'X') ? input1 : input2;
               DataOutputStream currentOutput = (currentSymbol == 'X') ? output1 : output2;

               currentOutput.writeUTF("Your move (row and column): ");
               int row = currentInput.readInt();
               int col = currentInput.readInt();

               if (board.makeMove(row, col, currentSymbol)) {
                   if (board.checkWinner(currentSymbol)) {
                       currentOutput.writeUTF("You win!");
                       break;
                   }
                   if (board.isFull()) {
                       output1.writeUTF("Game is a draw!");
                       output2.writeUTF("Game is a draw!");
                       break;
                   }
                   currentSymbol = (currentSymbol == 'X') ? 'O' : 'X';
               } else {
                   currentOutput.writeUTF("Invalid move. Try again.");
               }
           }
       } catch (IOException e) {
           e.printStackTrace();
       }
   }
}
 

4. GameServer.java (Main Server)

package server;

import utils.Constants;

import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class GameServer {
   public static void main(String[] args) {
       ExecutorService pool = Executors.newFixedThreadPool(10);
       try (ServerSocket serverSocket = new ServerSocket(Constants.PORT)) {
           System.out.println("Game server is running...");

           while (true) {
               Socket player1 = serverSocket.accept();
               System.out.println("Player 1 connected!");

               Socket player2 = serverSocket.accept();
               System.out.println("Player 2 connected!");

               pool.execute(new GameSession(player1, player2));
           }
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}
 

5. GameClient.java (Client)

 

package client;

import utils.Constants;

import java.io.*;
import java.net.Socket;

public class GameClient {
   public static void main(String[] args) {
       try (Socket socket = new Socket(Constants.HOST, Constants.PORT);
            DataInputStream input = new DataInputStream(socket.getInputStream());
            DataOutputStream output = new DataOutputStream(socket.getOutputStream())) {

           BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

           System.out.println(input.readUTF()); // Welcome message

           while (true) {
               System.out.println(input.readUTF()); // Your move or result
               if (input.available() > 0) break;

               System.out.print("Enter row: ");
               int row = Integer.parseInt(reader.readLine());
               System.out.print("Enter col: ");
               int col = Integer.parseInt(reader.readLine());

               output.writeInt(row);
               output.writeInt(col);
           }
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}
 

image-184.png

 

1. start() Method

Purpose: To start a new thread and execute its run() method in a separate call stack.

Use Case: Starting multiple threads for parallel processing.

 

class DownloadTask extends Thread {
   private final String fileName;

   public DownloadTask(String fileName) {
       this.fileName = fileName;
   }

   @Override
   public void run() {
       System.out.println(Thread.currentThread().getName() + " started downloading: " + fileName);
       try {
           Thread.sleep(2000); // Simulate download time
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       System.out.println(Thread.currentThread().getName() + " finished downloading: " + fileName);
   }
}

public class StartExample {
   public static void main(String[] args) {
       Thread t1 = new DownloadTask("File1.zip");
       Thread t2 = new DownloadTask("File2.zip");

       t1.start();
       t2.start();
   }
}
 

2. run() Method

Purpose: Defines the task that the thread will execute when started.

Use Case: Customizing thread behavior.

class DataProcessor implements Runnable {
   @Override
   public void run() {
       System.out.println(Thread.currentThread().getName() + " is processing data...");
   }
}

public class RunExample {
   public static void main(String[] args) {
       DataProcessor task = new DataProcessor();
       Thread thread = new Thread(task, "ProcessorThread");
       thread.start();
   }
}
 

3. sleep() Method

Purpose: Pauses the thread execution for a specified time.

Use Case: Simulating delay in task execution.

class Scheduler extends Thread {
   @Override
   public void run() {
       for (int i = 1; i <= 5; i++) {
           System.out.println("Task " + i + " executed by " + Thread.currentThread().getName());
           try {
               Thread.sleep(1000); // Pause for 1 second
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
}

public class SleepExample {
   public static void main(String[] args) {
       Thread scheduler = new Scheduler();
       scheduler.start();
   }
}
 

4. join() Method

Purpose: Waits for a thread to finish before continuing execution of the main thread.

Use Case: Ensuring thread completion before proceeding.

 

class DatabaseMigration extends Thread {
   private final String dbName;

   public DatabaseMigration(String dbName) {
       this.dbName = dbName;
   }

   @Override
   public void run() {
       System.out.println("Migrating " + dbName);
       try {
           Thread.sleep(3000); // Simulate time for migration
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       System.out.println(dbName + " migration completed.");
   }
}

public class JoinExample {
   public static void main(String[] args) {
       Thread db1 = new DatabaseMigration("DB1");
       Thread db2 = new DatabaseMigration("DB2");

       db1.start();
       db2.start();

       try {
           db1.join(); // Wait for db1 to finish
           db2.join(); // Wait for db2 to finish
       } catch (InterruptedException e) {
           e.printStackTrace();
       }

       System.out.println("All database migrations are completed.");
   }
}
 

5. yield() Method

Purpose: Suggests the thread scheduler to pause and allow other threads of the same priority to execute.

Use Case: Demonstrating cooperative multitasking.

 

class CalculationTask extends Thread {
   @Override
   public void run() {
       for (int i = 0; i < 5; i++) {
           System.out.println(Thread.currentThread().getName() + " calculating...");
           if (i == 2) {
               System.out.println(Thread.currentThread().getName() + " yielding...");
               Thread.yield();
           }
       }
   }
}

public class YieldExample {
   public static void main(String[] args) {
       Thread t1 = new CalculationTask();
       Thread t2 = new CalculationTask();

       t1.start();
       t2.start();
   }
}
 

6. isAlive() Method

Purpose: Checks if a thread is still running.

Use Case: Monitoring the status of a thread.

class FileUploader extends Thread {
   @Override
   public void run() {
       System.out.println(Thread.currentThread().getName() + " uploading file...");
       try {
           Thread.sleep(2000); // Simulate upload time
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       System.out.println(Thread.currentThread().getName() + " upload complete.");
   }
}

public class IsAliveExample {
   public static void main(String[] args) {
       Thread uploader = new FileUploader();
       uploader.start();

       while (uploader.isAlive()) {
           System.out.println("Uploader thread is still running...");
       }

       System.out.println("Uploader thread has finished.");
   }
}
 

7. getName() Method

Purpose: Retrieves the name of the thread.

Use Case: Logging and debugging.

class Task extends Thread {
   public Task(String name) {
       super(name); // Assign a name to the thread
   }

   @Override
   public void run() {
       System.out.println("Thread " + Thread.currentThread().getName() + " is running.");
   }
}

public class GetNameExample {
   public static void main(String[] args) {
       Thread t1 = new Task("Worker-1");
       Thread t2 = new Task("Worker-2");

       t1.start();
       t2.start();
   }
}
 

image-185.png

 

 

image-186.png

 

1. Synchronized Keyword

Purpose: Ensures only one thread accesses a critical section at a time.

Use Case: Prevent data inconsistency during concurrent updates.

class Counter {
   private int count = 0;

   public synchronized void increment() {
       count++;
   }

   public synchronized int getCount() {
       return count;
   }
}

public class SynchronizedExample {
   public static void main(String[] args) {
       Counter counter = new Counter();

       Thread t1 = new Thread(() -> {
           for (int i = 0; i < 1000; i++) counter.increment();
       });

       Thread t2 = new Thread(() -> {
           for (int i = 0; i < 1000; i++) counter.increment();
       });

       t1.start();
       t2.start();

       try {
           t1.join();
           t2.join();
       } catch (InterruptedException e) {
           e.printStackTrace();
       }

       System.out.println("Final Count: " + counter.getCount());
   }
}
 

2. Static Synchronization

Purpose: Synchronizes static methods to ensure thread safety across all instances of the class.

Use Case: Shared resources accessed via static methods.

 

class Database {
   public static synchronized void connect(String threadName) {
       System.out.println(threadName + " connecting to database...");
       try {
           Thread.sleep(2000);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       System.out.println(threadName + " disconnected.");
   }
}

public class StaticSyncExample {
   public static void main(String[] args) {
       Thread t1 = new Thread(() -> Database.connect("Thread-1"));
       Thread t2 = new Thread(() -> Database.connect("Thread-2"));

       t1.start();
       t2.start();
   }
}
 

3. Synchronizing Blocks

Purpose: Synchronizes specific blocks of code instead of the entire method to improve performance.

Use Case: Optimizing thread-safe operations.

class Printer {
   public void print(String message) {
       synchronized (this) {
           System.out.print("[");
           try {
               Thread.sleep(500);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           System.out.println(message + "]");
       }
   }
}

public class SyncBlockExample {
   public static void main(String[] args) {
       Printer printer = new Printer();

       Thread t1 = new Thread(() -> printer.print("Hello"));
       Thread t2 = new Thread(() -> printer.print("World"));

       t1.start();
       t2.start();
   }
}
 

4. Inter-Thread Communication (wait(), notify(), notifyAll())

Purpose: Allows threads to communicate by waiting and notifying when a condition changes.

Use Case: Resource sharing between threads.

 

class Message {
   private String message;
   private boolean hasMessage = false;

   public synchronized void write(String msg) {
       while (hasMessage) {
           try {
               wait(); // Wait until the message is read
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
       message = msg;
       hasMessage = true;
       System.out.println("Message written: " + msg);
       notify();
   }

   public synchronized String read() {
       while (!hasMessage) {
           try {
               wait(); // Wait until the message is written
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
       hasMessage = false;
       notify();
       return message;
   }
}

public class InterThreadCommExample {
   public static void main(String[] args) {
       Message msg = new Message();

       Thread writer = new Thread(() -> msg.write("Hello World!"));
       Thread reader = new Thread(() -> System.out.println("Message read: " + msg.read()));

       writer.start();
       reader.start();
   }
}
 

5. Producer-Consumer Problem

Purpose: Demonstrates coordination between producer and consumer threads.

 

import java.util.LinkedList;

class ProducerConsumer {
   private LinkedList<Integer> list = new LinkedList<>();
   private final int CAPACITY = 5;

   public void produce() throws InterruptedException {
       int value = 0;
       while (true) {
           synchronized (this) {
               while (list.size() == CAPACITY) {
                   wait();
               }
               System.out.println("Produced: " + value);
               list.add(value++);
               notify();
               Thread.sleep(500);
           }
       }
   }

   public void consume() throws InterruptedException {
       while (true) {
           synchronized (this) {
               while (list.isEmpty()) {
                   wait();
               }
               int value = list.removeFirst();
               System.out.println("Consumed: " + value);
               notify();
               Thread.sleep(500);
           }
       }
   }
}

public class ProducerConsumerExample {
   public static void main(String[] args) {
       ProducerConsumer pc = new ProducerConsumer();

       Thread producer = new Thread(() -> {
           try {
               pc.produce();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       });

       Thread consumer = new Thread(() -> {
           try {
               pc.consume();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       });

       producer.start();
       consumer.start();
   }
}
 

 

6. Thread Priorities

Purpose: Assign thread execution priority.

 

public class ThreadPriorityExample {
   public static void main(String[] args) {
       Thread t1 = new Thread(() -> System.out.println("Thread 1"));
       Thread t2 = new Thread(() -> System.out.println("Thread 2"));
       Thread t3 = new Thread(() -> System.out.println("Thread 3"));

       t1.setPriority(Thread.MIN_PRIORITY); // 1
       t2.setPriority(Thread.MAX_PRIORITY); // 10
       t3.setPriority(Thread.NORM_PRIORITY); // 5

       t1.start();
       t2.start();
       t3.start();
   }
}
 

7. Thread Groups

Purpose: Organizes threads into groups for easier management.

public class ThreadGroupExample {
   public static void main(String[] args) {
       ThreadGroup group = new ThreadGroup("Group-A");

       Thread t1 = new Thread(group, () -> {
           System.out.println(Thread.currentThread().getName() + " running.");
       }, "Thread-1");

       Thread t2 = new Thread(group, () -> {
           System.out.println(Thread.currentThread().getName() + " running.");
       }, "Thread-2");

       t1.start();
       t2.start();

       System.out.println("Active threads in group: " + group.activeCount());
       group.list();
   }
}
 

image-187.png

 

In Java, locks are mechanisms used to enforce thread synchronization. There are two main types of locks:

  1. Class-Level Lock
  2. Object-Level Lock  

    Note: From Nitesh Synergy side this question is not valid ( From our side we say it static member level & instance level lock)

1. Object-Level Lock

Definition:
An object-level lock is a mechanism that synchronizes access to an instance of a class. It ensures that only one thread can execute synchronized instance methods of an object at a time.

Key Points:

  • Acquired when synchronized is used on an instance method or block.
  • It applies only to the specific object instance, meaning different threads can access synchronized methods of different instances simultaneously.

 

class SharedResource {
   public synchronized void print() {
       System.out.println(Thread.currentThread().getName() + " is executing print()");
       try {
           Thread.sleep(1000); // Simulate some work
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       System.out.println(Thread.currentThread().getName() + " has finished execution.");
   }
}

public class ObjectLevelLockExample {
   public static void main(String[] args) {
       SharedResource resource1 = new SharedResource();
       SharedResource resource2 = new SharedResource();

       // Two threads working on the same object
       Thread t1 = new Thread(resource1::print, "Thread-1");
       Thread t2 = new Thread(resource1::print, "Thread-2");

       // A thread working on a different object
       Thread t3 = new Thread(resource2::print, "Thread-3");

       t1.start();
       t2.start();
       t3.start();
   }
}
 

Behavior: Thread-1 and Thread-2 will synchronize on the same object resource1, so only one will execute the print() method at a time. Thread-3 operates on resource2 independently since it’s a different instance.

 

 

2. Class-Level Lock

Definition:
A class-level lock is a mechanism that synchronizes access to static methods or blocks. It ensures that only one thread can execute synchronized static methods or blocks of the class at a time, across all instances.

Key Points:

. Acquired when synchronized is used on a static method or block.

.It applies to the class, not to specific instances. Hence, it affects all threads interacting with static synchronized methods, regardless of the object instance.

 

class SharedResource {
   public static synchronized void printStatic() {
       System.out.println(Thread.currentThread().getName() + " is executing printStatic()");
       try {
           Thread.sleep(1000); // Simulate some work
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       System.out.println(Thread.currentThread().getName() + " has finished execution.");
   }

   public void printInstance() {
       synchronized (SharedResource.class) { // Class-level lock
           System.out.println(Thread.currentThread().getName() + " is executing printInstance() with class lock.");
           try {
               Thread.sleep(1000); // Simulate some work
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           System.out.println(Thread.currentThread().getName() + " has finished execution.");
       }
   }
}

public class ClassLevelLockExample {
   public static void main(String[] args) {
       SharedResource resource1 = new SharedResource();
       SharedResource resource2 = new SharedResource();

       // Threads accessing static synchronized method (class-level lock)
       Thread t1 = new Thread(SharedResource::printStatic, "Thread-1");
       Thread t2 = new Thread(SharedResource::printStatic, "Thread-2");

       // Threads using synchronized block with class-level lock
       Thread t3 = new Thread(resource1::printInstance, "Thread-3");
       Thread t4 = new Thread(resource2::printInstance, "Thread-4");

       t1.start();
       t2.start();
       t3.start();
       t4.start();
   }
}
 

Behavior: Thread-1 and Thread-2 will synchronize on the class-level lock because printStatic() is a static synchronized method. Thread-3 and Thread-4 will also synchronize on the class-level lock, even though they use different instances, because the synchronized block explicitly locks on the SharedResource.class.

 

image-188.png
  • Use object-level locks when synchronizing access to instance-specific data.
  • Use class-level locks when synchronizing access to shared resources or static data across all instances.

 

Race Condition and Deadlock

Both race condition and deadlock are common issues in multi-threaded programming, where multiple threads interact with shared resources. These issues can result in incorrect behavior or performance degradation. Here's a breakdown of these concepts:

 

1. Race Condition

Definition:
A race condition occurs when two or more threads attempt to modify shared data simultaneously, leading to unpredictable results. The final outcome depends on the timing and order in which the threads execute, which is usually not under the control of the developer.

Example: Consider a simple bank account system where multiple threads update the balance of an account at the same time.

 

class BankAccount {
   private int balance = 0;

   public void deposit(int amount) {
       int temp = balance;
       temp = temp + amount; // Simulating some operation
       balance = temp;
   }

   public int getBalance() {
       return balance;
   }
}

public class RaceConditionExample {
   public static void main(String[] args) {
       BankAccount account = new BankAccount();

       // Thread 1 deposits 100
       Thread t1 = new Thread(() -> account.deposit(100));
       // Thread 2 deposits 200
       Thread t2 = new Thread(() -> account.deposit(200));

       t1.start();
       t2.start();

       try {
           t1.join();
           t2.join();
       } catch (InterruptedException e) {
           e.printStackTrace();
       }

       System.out.println("Final Account Balance: " + account.getBalance());
   }
}
 

Problem: In the above example, both threads are attempting to modify the balance at the same time. If both threads read the balance value before it is updated, the result will be incorrect.

 

How to Solve Race Conditions Solution: 

To solve a race condition, we can use synchronization mechanisms such as: synchronized keyword - Ensure that only one thread can access critical sections of code.

 Atomic classes - Use classes from java.util.concurrent.atomic (e.g., AtomicInteger, AtomicLong), which provide thread-safe operations. 

Example Solution (using synchronized):

class BankAccount {
   private int balance = 0;

   public synchronized void deposit(int amount) {
       int temp = balance;
       temp = temp + amount;
       balance = temp;
   }

   public int getBalance() {
       return balance;
   }
}

public class SynchronizedRaceConditionExample {
   public static void main(String[] args) {
       BankAccount account = new BankAccount();

       // Thread 1 deposits 100
       Thread t1 = new Thread(() -> account.deposit(100));
       // Thread 2 deposits 200
       Thread t2 = new Thread(() -> account.deposit(200));

       t1.start();
       t2.start();

       try {
           t1.join();
           t2.join();
       } catch (InterruptedException e) {
           e.printStackTrace();
       }

       System.out.println("Final Account Balance: " + account.getBalance());
   }
}
 

In this solution, the deposit method is synchronized, ensuring that only one thread can execute it at a time, thus preventing the race condition.

 

 

2. Deadlock

Definition:
Deadlock occurs when two or more threads are blocked forever because they are each waiting for the other to release a resource. This leads to a situation where none of the threads can proceed, resulting in a "deadlock."

Example: Consider two threads attempting to lock two resources (e.g., Resource A and Resource B). If Thread 1 locks Resource A and waits for Resource B, while Thread 2 locks Resource B and waits for Resource A, both threads will be stuck.

 

class ResourceA {
   synchronized void methodA(ResourceB b) {
       System.out.println("Thread 1 locked ResourceA");
       b.last();
   }

   synchronized void last() {
       System.out.println("Thread 1 unlocked ResourceA");
   }
}

class ResourceB {
   synchronized void methodB(ResourceA a) {
       System.out.println("Thread 2 locked ResourceB");
       a.last();
   }

   synchronized void last() {
       System.out.println("Thread 2 unlocked ResourceB");
   }
}

public class DeadlockExample {
   public static void main(String[] args) {
       ResourceA resourceA = new ResourceA();
       ResourceB resourceB = new ResourceB();

       Thread t1 = new Thread(() -> resourceA.methodA(resourceB));
       Thread t2 = new Thread(() -> resourceB.methodB(resourceA));

       t1.start();
       t2.start();
   }
}
 

 

Problem: Thread 1 locks Resource A and waits for Resource B. Thread 2 locks Resource B and waits for Resource A. Both threads are stuck, causing a deadlock.

 

How to Solve Deadlock

  1. Avoid Nested Locks: Minimize the number of locks that need to be acquired at the same time.
  2. Lock Ordering: Establish a consistent order in which locks are acquired to avoid circular dependencies.
  3. Timeouts: Use timeouts when trying to acquire locks, allowing threads to back out if they cannot acquire all required resources.
  4. Deadlock Detection: Periodically check if a deadlock situation exists and try to recover from it.

Example Solution (Lock Ordering):

 

class ResourceA {
   synchronized void methodA(ResourceB b) {
       System.out.println("Thread 1 locked ResourceA");
       b.last();
   }

   synchronized void last() {
       System.out.println("Thread 1 unlocked ResourceA");
   }
}

class ResourceB {
   synchronized void methodB(ResourceA a) {
       System.out.println("Thread 2 locked ResourceB");
       a.last();
   }

   synchronized void last() {
       System.out.println("Thread 2 unlocked ResourceB");
   }
}

public class DeadlockResolved {
   public static void main(String[] args) {
       ResourceA resourceA = new ResourceA();
       ResourceB resourceB = new ResourceB();

       // Fix by acquiring locks in a specific order
       Thread t1 = new Thread(() -> {
           synchronized (resourceA) {
               System.out.println("Thread 1 locked ResourceA");
               synchronized (resourceB) {
                   resourceB.last();
               }
           }
       });

       Thread t2 = new Thread(() -> {
           synchronized (resourceB) {
               System.out.println("Thread 2 locked ResourceB");
               synchronized (resourceA) {
                   resourceA.last();
               }
           }
       });

       t1.start();
       t2.start();
   }
}
 

In this example, we ensure that both threads acquire the resources in the same order (ResourceA then ResourceB), which prevents the circular dependency that causes deadlock.

 

Summary of Solutions Race Condition: Use synchronization techniques like synchronized methods/blocks or atomic operations to avoid concurrent access to shared resources. 

Deadlock: To avoid deadlocks, you can: Follow a lock ordering strategy. Avoid holding multiple locks. Implement timeout mechanisms.

 

 

2. What is the difference between wait(), notify(), and notifyAll() in Java?

Explanation: wait(): Causes the current thread to wait until another thread sends a signal via notify() or notifyAll(). notify(): Wakes up one thread that is waiting on the object’s monitor. notifyAll(): Wakes up all threads that are waiting on the object’s monitor.

 

class SharedResource {
   private int value = 0;

   // Producer
   public synchronized void produce() throws InterruptedException {
       while (value == 1) {
           wait();  // Wait if value is 1
       }
       value = 1;  // Produce
       System.out.println("Produced: " + value);
       notify();  // Notify consumer
   }

   // Consumer
   public synchronized void consume() throws InterruptedException {
       while (value == 0) {
           wait();  // Wait if value is 0
       }
       value = 0;  // Consume
       System.out.println("Consumed: " + value);
       notify();  // Notify producer
   }
}

public class WaitNotifyExample {
   public static void main(String[] args) {
       SharedResource resource = new SharedResource();

       Thread producer = new Thread(() -> {
           try {
               while (true) {
                   resource.produce();
                   Thread.sleep(1000);
               }
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       });

       Thread consumer = new Thread(() -> {
           try {
               while (true) {
                   resource.consume();
                   Thread.sleep(1000);
               }
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       });

       producer.start();
       consumer.start();
   }
}

 

Executors (Executor, ExecutorService)

The Executor interface provides a higher-level replacement for managing threads manually. It abstracts thread management, making it easier to manage tasks.

  • ExecutorService extends Executor and provides methods for managing the lifecycle of tasks, like shutting down the executor or retrieving results.
     

package com.niteshsynergy.multithreading;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorExample {
   public static void main(String[] args) {
       ExecutorService executorService = Executors.newFixedThreadPool(2);
       executorService.submit(() -> System.out.println("Task 1 executed by: " + Thread.currentThread().getName()));
       executorService.submit(() -> System.out.println("Task 2 executed by: " + Thread.currentThread().getName()));
       executorService.shutdown();
   }
}
 

2. Thread pools (FixedThreadPool, CachedThreadPool)

  • FixedThreadPool: Creates a thread pool with a fixed number of threads.
  • CachedThreadPool: Creates a pool that creates new threads as needed but reuses previously constructed threads when available.

package com.niteshsynergy.multithreading;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolExample {
   public static void main(String[] args) {
       // FixedThreadPool Example
       ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
       fixedThreadPool.submit(() -> System.out.println("Fixed ThreadPool: " + Thread.currentThread().getName()));

       // CachedThreadPool Example
       ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
       cachedThreadPool.submit(() -> System.out.println("Cached ThreadPool: " + Thread.currentThread().getName()));

       fixedThreadPool.shutdown();
       cachedThreadPool.shutdown();
   }
}
 

3. Callable and Future:


Callable is similar to Runnable, but it can return a result. 

The Future interface allows you to retrieve the result of the task execution.

package com.niteshsynergy.multithreading;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableFutureExample {
   public static void main(String[] args) throws Exception {
       ExecutorService executorService = Executors.newCachedThreadPool();
       Callable<Integer> task = () -> {
           System.out.println("Task executed by: " + Thread.currentThread().getName());
           return 42;
       };
       Future<Integer> future = executorService.submit(task);
       System.out.println("Result: " + future.get()); // Retrieve result
       executorService.shutdown();
   }
}
 

4. Locks (ReentrantLock, ReadWriteLock)

  • ReentrantLock allows a thread to lock a resource, and if it already holds the lock, it can reenter the same lock.
  • ReadWriteLock allows multiple threads to read a resource simultaneously, but only one can write.

package com.niteshsynergy.multithreading;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockExample {
   private static final Lock lock = new ReentrantLock();

   public static void main(String[] args) {
       Runnable task = () -> {
           lock.lock();
           try {
               System.out.println("Task executed by: " + Thread.currentThread().getName());
           } finally {
               lock.unlock();
           }
       };
       
       new Thread(task).start();
       new Thread(task).start();
   }
}
 

 

5. Condition for signaling

Conditions allow threads to communicate with each other when they are waiting for a certain condition to be met.

 

package com.niteshsynergy.multithreading;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionExample {
   private static final Lock lock = new ReentrantLock();
   private static final Condition condition = lock.newCondition();

   public static void main(String[] args) {
       Runnable task = () -> {
           try {
               lock.lock();
               System.out.println(Thread.currentThread().getName() + " waiting...");
               condition.await(); // Thread waits until notified
               System.out.println(Thread.currentThread().getName() + " resumed");
           } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
           } finally {
               lock.unlock();
           }
       };
       
       Thread t1 = new Thread(task);
       Thread t2 = new Thread(task);
       t1.start();
       t2.start();
       
       try {
           Thread.sleep(2000); // Simulate some work
           lock.lock();
           condition.signalAll(); // Notify waiting threads
       } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
       } finally {
           lock.unlock();
       }
   }
}
 

6. Atomic Variables

Atomic variables are used for performing thread-safe operations without the need for locks.

 

package com.niteshsynergy.multithreading;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicVariableExample {
   private static final AtomicInteger atomicInt = new AtomicInteger(0);

   public static void main(String[] args) {
       Runnable task = () -> {
           int oldValue = atomicInt.getAndIncrement();
           System.out.println("Old value: " + oldValue + ", New value: " + atomicInt.get());
       };
       
       new Thread(task).start();
       new Thread(task).start();
   }
}
 

 

7. Concurrent Collections (ConcurrentHashMap, CopyOnWriteArrayList)

  • ConcurrentHashMap allows thread-safe operations without locking the entire map.
  • CopyOnWriteArrayList creates a new copy of the list each time a modification is made.

package com.niteshsynergy.multithreading;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentCollectionExample {
   public static void main(String[] args) {
       ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
       map.put("1", "One");
       map.put("2", "Two");
       System.out.println(map);

       List<String> list = new CopyOnWriteArrayList<>();
       list.add("A");
       list.add("B");
       System.out.println(list);
   }
}
 

8. Fork/Join Framework

The Fork/Join framework is designed for parallel tasks that can be broken down into smaller subtasks.

 

package com.niteshsynergy.multithreading;

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;

public class ForkJoinExample {
   static class FibonacciTask extends RecursiveTask<Integer> {
       private final int n;

       public FibonacciTask(int n) {
           this.n = n;
       }

       @Override
       protected Integer compute() {
           if (n <= 1) return n;
           FibonacciTask f1 = new FibonacciTask(n - 1);
           f1.fork();
           FibonacciTask f2 = new FibonacciTask(n - 2);
           return f2.compute() + f1.join();
       }
   }

   public static void main(String[] args) {
       ForkJoinPool pool = new ForkJoinPool();
       FibonacciTask task = new FibonacciTask(10);
       System.out.println("Fibonacci result: " + pool.invoke(task));
   }
}
 

9. Phaser, CyclicBarrier, CountDownLatch

These are advanced synchronization aids to coordinate threads:

  • Phaser: For coordination of threads for phases.
  • CyclicBarrier: Allows threads to wait for each other at a common barrier.
  • CountDownLatch: Allows threads to wait until a certain number of operations are completed.

 

package com.niteshsynergy.multithreading;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
   public static void main(String[] args) throws InterruptedException {
       CyclicBarrier barrier = new CyclicBarrier(2, () -> System.out.println("Barrier reached, starting tasks"));

       Runnable task = () -> {
           try {
               System.out.println(Thread.currentThread().getName() + " waiting at barrier");
               barrier.await();
               System.out.println(Thread.currentThread().getName() + " started task after barrier");
           } catch (Exception e) {
               Thread.currentThread().interrupt();
           }
       };

       new Thread(task).start();
       new Thread(task).start();
   }
}
 

10. Deadlock, Livelock, and Starvation

  • Deadlock: Occurs when two or more threads are blocked forever, each waiting for the other to release a resource.
  • Livelock: Threads are not blocked but continuously change states, still not making progress.
  • Starvation: When a thread is perpetually denied access to resources.

 

1. Deadlock

Deadlock occurs when two or more threads are blocked forever, waiting for each other to release locks.

 

package com.niteshsynergy.multithreading;

public class DeadlockExample {
   private static final Object LOCK1 = new Object();
   private static final Object LOCK2 = new Object();

   public static void main(String[] args) {
       Thread thread1 = new Thread(() -> {
           synchronized (LOCK1) {
               System.out.println("Thread 1: Holding LOCK1...");
               try { Thread.sleep(50); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
               synchronized (LOCK2) {
                   System.out.println("Thread 1: Acquired LOCK2...");
               }
           }
       });

       Thread thread2 = new Thread(() -> {
           synchronized (LOCK2) {
               System.out.println("Thread 2: Holding LOCK2...");
               try { Thread.sleep(50); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
               synchronized (LOCK1) {
                   System.out.println("Thread 2: Acquired LOCK1...");
               }
           }
       });

       thread1.start();
       thread2.start();
   }
}
 

2. Livelock

Livelock occurs when threads keep responding to each other's actions but cannot make progress. They are not blocked but are unable to proceed.

package com.niteshsynergy.multithreading;

import java.util.concurrent.atomic.AtomicBoolean;

public class LivelockExample {
   static class SharedResource {
       private final AtomicBoolean isLocked = new AtomicBoolean(false);

       public void tryLock(String threadName) {
           while (true) {
               if (!isLocked.get()) {
                   isLocked.set(true);
                   System.out.println(threadName + " locked the resource.");
                   break;
               } else {
                   System.out.println(threadName + " waiting to lock the resource...");
               }
           }
       }

       public void unlock(String threadName) {
           isLocked.set(false);
           System.out.println(threadName + " unlocked the resource.");
       }
   }

   public static void main(String[] args) {
       SharedResource resource = new SharedResource();

       Runnable task1 = () -> {
           while (true) {
               resource.tryLock("Thread 1");
               resource.unlock("Thread 1");
           }
       };

       Runnable task2 = () -> {
           while (true) {
               resource.tryLock("Thread 2");
               resource.unlock("Thread 2");
           }
       };

       new Thread(task1).start();
       new Thread(task2).start();
   }
}
 

3. Starvation

Starvation occurs when a thread is unable to gain regular access to shared resources and is perpetually delayed.

package com.niteshsynergy.multithreading;

import java.util.concurrent.locks.ReentrantLock;

public class StarvationExample {
   private static final ReentrantLock lock = new ReentrantLock(true); // Fair lock

   public static void main(String[] args) {
       Runnable task = () -> {
           while (true) {
               if (lock.tryLock()) {
                   try {
                       System.out.println(Thread.currentThread().getName() + " acquired the lock");
                       Thread.sleep(100);
                   } catch (InterruptedException e) {
                       Thread.currentThread().interrupt();
                   } finally {
                       System.out.println(Thread.currentThread().getName() + " released the lock");
                       lock.unlock();
                   }
                   break;
               } else {
                   System.out.println(Thread.currentThread().getName() + " waiting for the lock...");
               }
           }
       };

       Thread thread1 = new Thread(task);
       Thread thread2 = new Thread(task);
       Thread thread3 = new Thread(task);

       thread1.start();
       thread2.start();
       thread3.start();
   }
}
 

Key Takeaways:

  • Deadlock: Ensure proper ordering of lock acquisition to avoid cyclic dependency.
  • Livelock: Introduce mechanisms like timeouts to avoid infinite state changes.
  • Starvation: Use fair locking (like ReentrantLock(true)) to ensure all threads get a fair chance.

 

The provided content explains advanced topics in Java's concurrency framework along with code examples for each concept. Here's a quick recap and why it's important:

  1. Executors and Thread Pools: Simplifies thread management, enabling developers to execute tasks efficiently without dealing directly with threads.
  2. Callable and Future: Supports returning results or handling exceptions from a task.
  3. Locks and Conditions: Provides fine-grained control over synchronization beyond the synchronized keyword, supporting complex thread coordination.
  4. Atomic Variables: Ensures lock-free thread-safe updates, boosting performance in concurrent environments.
  5. Concurrent Collections: Optimized for high-concurrency scenarios, avoiding full synchronization on data structures.
  6. Fork/Join Framework: Facilitates parallelism by recursively breaking tasks into subtasks.
  7. Synchronization Aids:
    • Phaser, CyclicBarrier, CountDownLatch: Coordinate and manage thread execution phases or barriers.
  8. Deadlock, Livelock, and Starvation: Understanding and avoiding common pitfalls in concurrent programming.
  9. ThreadLocal: Provides thread-specific variables, ensuring isolation across threads.

 

ThreadLocal: Isolated Variables for Each Thread

Isolated Variables for Each Thread The ThreadLocal class in Java allows each thread to have its own independently initialized variable. This is useful when you want to avoid sharing variables across threads, ensuring thread isolation.

Key Points:

Each thread accessing a ThreadLocal variable has its own, isolated value. Useful for scenarios like maintaining user sessions, request contexts, or database connections.

 

package com.niteshsynergy.multithreading;

public class ThreadLocalExample {
   // Define a ThreadLocal variable
   private static final ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> "Default Value");

   public static void main(String[] args) {
       Runnable task1 = () -> {
           System.out.println(Thread.currentThread().getName() + " Initial Value: " + threadLocal.get());
           threadLocal.set("Task 1 Value");
           try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
           System.out.println(Thread.currentThread().getName() + " Updated Value: " + threadLocal.get());
       };

       Runnable task2 = () -> {
           System.out.println(Thread.currentThread().getName() + " Initial Value: " + threadLocal.get());
           threadLocal.set("Task 2 Value");
           try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
           System.out.println(Thread.currentThread().getName() + " Updated Value: " + threadLocal.get());
       };

       Thread thread1 = new Thread(task1, "Thread-1");
       Thread thread2 = new Thread(task2, "Thread-2");

       thread1.start();
       thread2.start();
   }
}
 

Output Explanation

Each thread starts with the default value (Default Value) defined in ThreadLocal.withInitial(). 

Each thread sets its own value (Task 1 Value or Task 2 Value). 

The value is isolated per thread, ensuring no interference between threads.

Use Case Example: User Sessions

Imagine a web server where each user request is handled by a separate thread. ThreadLocal can be used to store user-specific data (like session information).

package com.niteshsynergy.multithreading;

public class UserSessionExample {
   private static final ThreadLocal<String> userSession = ThreadLocal.withInitial(() -> "Guest");

   public static void main(String[] args) {
       Runnable userRequest1 = () -> {
           System.out.println(Thread.currentThread().getName() + " Session: " + userSession.get());
           userSession.set("User1");
           System.out.println(Thread.currentThread().getName() + " Updated Session: " + userSession.get());
       };

       Runnable userRequest2 = () -> {
           System.out.println(Thread.currentThread().getName() + " Session: " + userSession.get());
           userSession.set("User2");
           System.out.println(Thread.currentThread().getName() + " Updated Session: " + userSession.get());
       };

       new Thread(userRequest1, "Request-1").start();
       new Thread(userRequest2, "Request-2").start();
   }
}
 

ThreadLocal API

  • ThreadLocal.get(): Retrieves the value for the current thread.
  • ThreadLocal.set(T value): Sets the value for the current thread.
  • ThreadLocal.remove(): Removes the value for the current thread, freeing memory.

Advantages

  1. Avoids synchronization since each thread has its own value.
  2. Simplifies code when thread-specific data is needed.

 

 

Here’s a detailed example covering advanced concurrency topics in Java, including thread pools, callable/future, locks, atomic variables, and other concurrency aids. This example simulates a real-world scenario where multiple threads are responsible for processing tasks, communicating through a CountDownLatch and a CyclicBarrier to coordinate their execution. Additionally, we’ll use atomic variables and thread-local storage for thread-safe operations.

Scenario:

We are building a multi-threaded system to process orders in a restaurant, where multiple chefs (threads) prepare dishes, waiters (threads) serve them, and the system tracks the orders efficiently using synchronization mechanisms.

 

package com.niteshsynergy.multithreading;

import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;

class RestaurantOrderSystem {

   // Atomic variables for counting prepared dishes
   private static AtomicInteger preparedDishes = new AtomicInteger(0);
   private static AtomicInteger servedDishes = new AtomicInteger(0);

   // Lock to synchronize on preparing and serving dishes
   private static final ReentrantLock lock = new ReentrantLock();

   // ThreadLocal for thread-specific data
   private static ThreadLocal<String> threadSpecificData = ThreadLocal.withInitial(() -> "Initial Thread Data");

   // Executor service for handling chef and waiter tasks
   private static ExecutorService chefService = Executors.newFixedThreadPool(5);
   private static ExecutorService waiterService = Executors.newCachedThreadPool();

   // CountDownLatch to wait until all chefs are done preparing dishes
   private static CountDownLatch chefLatch = new CountDownLatch(5);

   // CyclicBarrier to synchronize the chefs and waiters for coordinated actions
   private static CyclicBarrier barrier = new CyclicBarrier(5);

   public static void main(String[] args) throws InterruptedException {
       
       // Simulate 5 chefs preparing dishes
       for (int i = 0; i < 5; i++) {
           chefService.submit(new ChefTask());
       }

       // Simulate 5 waiters serving dishes
       for (int i = 0; i < 5; i++) {
           waiterService.submit(new WaiterTask());
       }

       // Wait for chefs to finish preparing
       chefLatch.await();
       
       // Shutdown services
       chefService.shutdown();
       waiterService.shutdown();
   }

   // Chef task (simulates cooking a dish)
   static class ChefTask implements Runnable {
       @Override
       public void run() {
           try {
               // Prepare the dish (simulated with sleep)
               System.out.println(Thread.currentThread().getName() + " is preparing a dish...");
               Thread.sleep(1000);
               
               // Use atomic variables for thread-safe counting
               preparedDishes.incrementAndGet();
               
               // Using lock for synchronized dish preparation
               lock.lock();
               try {
                   System.out.println(Thread.currentThread().getName() + " has finished preparing a dish.");
               } finally {
                   lock.unlock();
               }

               // Wait at the barrier to sync with waiters
               barrier.await();
               
               // After all chefs are ready, decrement the latch
               chefLatch.countDown();
               
           } catch (InterruptedException | BrokenBarrierException e) {
               Thread.currentThread().interrupt();
           }
       }
   }

   // Waiter task (simulates serving a dish)
   static class WaiterTask implements Runnable {
       @Override
       public void run() {
           try {
               // Wait at the barrier until all chefs are done
               barrier.await();
               
               // Serve the dish
               System.out.println(Thread.currentThread().getName() + " is serving a dish.");
               servedDishes.incrementAndGet();
               
               // Using atomic variables to track served dishes
               System.out.println("Total served dishes: " + servedDishes.get());
               
               // Accessing thread-specific data
               System.out.println(Thread.currentThread().getName() + " thread-specific data: " + threadSpecificData.get());
               
           } catch (InterruptedException | BrokenBarrierException e) {
               Thread.currentThread().interrupt();
           }
       }
   }
}
 

 

Key Concepts Demonstrated:

 

ExecutorService and Thread Pools: 

FixedThreadPool for chefs (to limit the number of concurrent chefs).

 CachedThreadPool for waiters (since they are more dynamic and may vary in number). 

Callable and Future: Although not explicitly used here, if tasks needed to return a result, you could use Callable (instead of Runnable) and Future to handle the results asynchronously. 

Locks: ReentrantLock is used to protect critical sections, ensuring that only one thread at a time can update shared resources

Atomic Variables: AtomicInteger is used to safely count the prepared and served dishes across multiple threads without the need for synchronization. 

ThreadLocal: ThreadLocal provides each thread with its own independent copy of data, which is useful for storing thread-specific information, like in this case storing data related to each waiter. 

CyclicBarrier: CyclicBarrier synchronizes a group of threads so that they all must reach a common barrier point before any can proceed, which ensures that all chefs finish preparing their dishes before any waiter serves. 

CountDownLatch: CountDownLatch is used to wait for all chefs to finish before the main program continues, ensuring proper sequencing of tasks. 

Advanced Synchronization Aids: 

Deadlock, Livelock, and Starvation: In this code, we've avoided deadlock by ensuring that locks are acquired and released in a timely manner. Starvation and livelock are avoided by using a proper coordination mechanism (e.g., CountDownLatch, CyclicBarrier). 

Thread Coordination: Using CyclicBarrier, chefs must wait for others to finish preparing before they can serve the dishes. The CountDownLatch ensures that the main method waits for all chefs to finish before continuing.

 

 

. Virtual Threads (Project Loom) - Lightweight Threads for Scalability

Use Case: Handle 1 million lightweight tasks concurrently without overloading system resources.

 

package com.niteshsynergy.multithreading;

public class VirtualThreadsExample {
   public static void main(String[] args) {
       for (int i = 0; i < 1_000_000; i++) {
           Thread.startVirtualThread(() -> System.out.println("Processing task in: " + Thread.currentThread()));
       }
   }
}
 

Explanation:
Virtual threads (Java 19+) are lightweight and scale better than traditional threads. Here, we create 1 million virtual threads without exhausting the system.

 

2. Reactive Programming with Reactor

Use Case: Asynchronous processing of a stream of tasks like fetching user data from a service.

package com.niteshsynergy.multithreading;

import reactor.core.publisher.Flux;

public class ReactiveProgrammingExample {
   public static void main(String[] args) {
       Flux<Integer> numbers = Flux.range(1, 10); // Asynchronous stream
       numbers.map(num -> num * 2)
              .filter(num -> num % 4 == 0)
              .subscribe(System.out::println);
   }
}
 

Explanation:
Using Project Reactor, tasks like data transformation and filtering are handled asynchronously, making it ideal for high-throughput systems.

 

3. Performance Optimization - Thread Dumps and Profiling

Use Case: Analyze thread behavior in a deadlock scenario.

package com.niteshsynergy.multithreading;

public class ThreadDumpExample {
   static final Object lock1 = new Object();
   static final Object lock2 = new Object();

   public static void main(String[] args) {
       new Thread(() -> {
           synchronized (lock1) {
               try { Thread.sleep(100); } catch (InterruptedException ignored) {}
               synchronized (lock2) { System.out.println("Thread 1 acquired lock2"); }
           }
       }).start();

       new Thread(() -> {
           synchronized (lock2) {
               try { Thread.sleep(100); } catch (InterruptedException ignored) {}
               synchronized (lock1) { System.out.println("Thread 2 acquired lock1"); }
           }
       }).start();
   }
}
 

Explanation: Run this code and generate a thread dump (jstack <pid>) to analyze the deadlock. Use profiling tools like VisualVM or JProfiler to pinpoint performance bottlenecks.

 

4. JVM Tuning for Multithreaded Applications

Use Case: Optimize heap size and garbage collection for a multithreaded app.

package com.niteshsynergy.multithreading;

public class JvmTuningExample {
   public static void main(String[] args) {
       Runnable task = () -> {
           for (int i = 0; i < 1_000_000; i++) {
               String temp = new String("Object " + i); // Frequent object creation
           }
       };

       for (int i = 0; i < 10; i++) {
           new Thread(task).start();
       }

       System.out.println("Threads started. Monitor memory usage.");
   }
}
 

Explanation: Run the program with JVM options like -Xms512m -Xmx1024m -XX:+UseG1GC. Monitor garbage collection and memory usage to tune parameters.

 

 

5. Memory Consistency Model - Happens-before Relationship

Use Case: Ensure visibility of shared variables across threads.

package com.niteshsynergy.multithreading;

public class MemoryConsistencyExample {
   private static volatile boolean flag = false;

   public static void main(String[] args) {
       new Thread(() -> {
           while (!flag) {
               // Busy wait
           }
           System.out.println("Flag is set to true");
       }).start();

       new Thread(() -> {
           try { Thread.sleep(100); } catch (InterruptedException ignored) {}
           flag = true;
           System.out.println("Flag updated to true");
       }).start();
   }
}
 

Explanation: The volatile keyword ensures visibility of flag across threads. Without it, the change to flag may not be visible to the waiting thread.

6. Volatile Keyword and Visibility

Use Case: Safely increment a counter without locking.

package com.niteshsynergy.multithreading;

import java.util.concurrent.atomic.AtomicInteger;

public class VolatileVisibilityExample {
   private static AtomicInteger counter = new AtomicInteger(0);

   public static void main(String[] args) {
       Runnable task = () -> {
           for (int i = 0; i < 1000; i++) {
               counter.incrementAndGet();
           }
       };

       Thread t1 = new Thread(task);
       Thread t2 = new Thread(task);
       t1.start();
       t2.start();

       try {
           t1.join();
           t2.join();
       } catch (InterruptedException e) {
           e.printStackTrace();
       }

       System.out.println("Final Counter Value: " + counter.get());
   }
}
 

Explanation: Atomic variables like AtomicInteger eliminate the need for explicit synchronization, ensuring thread-safe operations.

 

Expert-Level Multithreading FAQ

1. What is the difference between a Thread and a Virtual Thread?

Answer:

  • A Thread is a lightweight process managed by the OS and JVM, sharing memory with other threads.
  • A Virtual Thread (introduced in Java 19 via Project Loom) is a lightweight, user-managed thread that runs on platform threads but is cheaper to create and manage, allowing millions of threads concurrently.

 

2. How do you avoid deadlocks in multithreaded applications?

Answer:
To avoid deadlocks:

Acquire locks in a consistent order. Use tryLock() from ReentrantLock to avoid blocking indefinitely. Minimize lock scope. Prefer higher-level concurrency constructs like ConcurrentHashMap.

image-189.png

 

4. What is a ThreadLocal, and when should it be used?

Answer:

ThreadLocal is used to store variables specific to a thread, ensuring no data sharing between threads. Use it for thread-safe operations requiring thread-specific states, such as user session management in web apps.

5. What is the role of volatile in Java multithreading? 

Answer: The volatile keyword ensures: Visibility: Changes to a variable are immediately visible to all threads. Atomicity for read/write: No caching or reordering by JVM or CPU.

 

6. How does ForkJoinPool work, and when should it be used? 

Answer: ForkJoinPool is designed for divide-and-conquer tasks. It splits tasks into smaller sub-tasks using RecursiveTask or RecursiveAction, processes them in parallel, and combines results. Use Case: Parallel processing of large data sets like matrix operations.

 

7. What are some advanced concurrency utilities in java.util.concurrent? 

Answer: Locks: ReentrantLock, ReadWriteLock. Synchronization Aids: Semaphore, CountDownLatch, CyclicBarrier, Phaser. Collections: ConcurrentHashMap, CopyOnWriteArrayList. Executors: Thread pools like FixedThreadPool, ScheduledThreadPool.

 

8. How do you debug a deadlock in a Java application? 

Answer: Use jstack <pid> to generate a thread dump. Look for BLOCKED threads and their locks. Analyze the cycle of dependencies. Fix the order of lock acquisition or use tryLock().

 

9. What is the difference between Callable and Runnable? 

Answer: Runnable: Returns void and cannot throw checked exceptions. Callable: Returns a result (Future<V>) and allows throwing checked exceptions.

 

10. How do you ensure thread safety in high-performance applications?

Answer: Use concurrent collections (ConcurrentHashMap, CopyOnWriteArrayList). Avoid excessive synchronization; use Lock for finer control. Apply immutability principles. Use ThreadLocal for thread-specific data.

 

image-190.png

 

image-191.png

 

 

image-192.png

 

 

image-193.png

 

 

image-194.png

 

 

                   Finally one final code with real time project

Real-Time Complex Multithreading Project: Multi-Stage Order Processing System

Project Overview

This project simulates a multi-threaded Order Processing System used in e-commerce platforms. The system handles multiple orders concurrently and applies multi-stage processing:

  1. Order Validation: Validates orders for correctness.
  2. Payment Processing: Simulates payment transactions.
  3. Inventory Check: Ensures ordered items are in stock.
  4. Order Confirmation: Finalizes and confirms the order.

Features:

  • Thread Pools: Efficient management of threads.
  • Synchronization: Coordination between stages.
  • Phaser: Used for phase-based task completion.
  • CompletableFuture: To handle asynchronous tasks.
  • Virtual Threads (Optional): Lightweight concurrency for handling many orders.

 

Code: Multi-Stage Order Processing System

package com.niteshsynergy.multithreading;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class OrderProcessingSystem {
   
   // Thread pool for managing threads
   private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
   private static final Phaser phaser = new Phaser(1); // Main thread registered
   private static final AtomicInteger orderIdCounter = new AtomicInteger(1);

   // Order class
   static class Order {
       int orderId;
       String product;
       int quantity;
       boolean isValid = false;
       boolean isPaid = false;
       boolean isConfirmed = false;

       public Order(String product, int quantity) {
           this.orderId = orderIdCounter.getAndIncrement();
           this.product = product;
           this.quantity = quantity;
       }

       @Override
       public String toString() {
           return "Order{" +
                   "orderId=" + orderId +
                   ", product='" + product + '\'' +
                   ", quantity=" + quantity +
                   ", isValid=" + isValid +
                   ", isPaid=" + isPaid +
                   ", isConfirmed=" + isConfirmed +
                   '}';
       }
   }

   // Order Validation Task
   static class OrderValidationTask implements Runnable {
       private final Order order;

       public OrderValidationTask(Order order) {
           this.order = order;
       }

       @Override
       public void run() {
           System.out.println("Validating Order: " + order.orderId);
           // Simulate validation
           sleep(1000);
           order.isValid = true;
           System.out.println("Order Validated: " + order.orderId);
           phaser.arriveAndDeregister();
       }
   }

   // Payment Processing Task
   static class PaymentProcessingTask implements Runnable {
       private final Order order;

       public PaymentProcessingTask(Order order) {
           this.order = order;
       }

       @Override
       public void run() {
           System.out.println("Processing Payment for Order: " + order.orderId);
           // Simulate payment processing
           sleep(1500);
           order.isPaid = true;
           System.out.println("Payment Processed for Order: " + order.orderId);
           phaser.arriveAndDeregister();
       }
   }

   // Inventory Check Task
   static class InventoryCheckTask implements Runnable {
       private final Order order;

       public InventoryCheckTask(Order order) {
           this.order = order;
       }

       @Override
       public void run() {
           System.out.println("Checking Inventory for Order: " + order.orderId);
           // Simulate inventory check
           sleep(1200);
           System.out.println("Inventory Available for Order: " + order.orderId);
           phaser.arriveAndDeregister();
       }
   }

   // Order Confirmation Task
   static class OrderConfirmationTask implements Runnable {
       private final Order order;

       public OrderConfirmationTask(Order order) {
           this.order = order;
       }

       @Override
       public void run() {
           System.out.println("Confirming Order: " + order.orderId);
           // Simulate order confirmation
           sleep(800);
           order.isConfirmed = true;
           System.out.println("Order Confirmed: " + order.orderId);
           phaser.arriveAndDeregister();
       }
   }

   public static void main(String[] args) {
       // Example orders
       Order[] orders = {
               new Order("Laptop", 1),
               new Order("Mobile", 2),
               new Order("Headphones", 5)
       };

       for (Order order : orders) {
           phaser.bulkRegister(4); // Register tasks for each order
           
           // Submit tasks to thread pool
           threadPool.submit(new OrderValidationTask(order));
           threadPool.submit(new PaymentProcessingTask(order));
           threadPool.submit(new InventoryCheckTask(order));
           threadPool.submit(new OrderConfirmationTask(order));
       }

       // Wait for all tasks to complete
       phaser.awaitAdvance(phaser.getPhase());
       System.out.println("All orders processed successfully!");

       // Shut down thread pool
       threadPool.shutdown();
   }

   private static void sleep(int millis) {
       try {
           Thread.sleep(millis);
       } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
       }
   }
}
 

Key Multithreading Concepts Used in the Code

 Thread Pool (ExecutorService): Efficient thread management to avoid creating/destroying threads repeatedly.

Phaser: Phase-based synchronization for coordinating tasks in multi-stage processing. 

Synchronization (sleep/custom logic): Used to mimic processing delays. 

Atomic Variables: AtomicInteger ensures thread-safe order ID generation. 

Asynchronous Processing: Each order stage runs independently in parallel threads.

 

 

Execution Flow

  1. Create multiple orders and submit them to the thread pool.
  2. Each order goes through 4 stages: Validation, Payment, Inventory Check, and Confirmation.
  3. Phaser ensures all tasks are completed for an order before finalizing it.
  4. Results are printed in the console, showing all orders processed concurrently.

 

Customizations Virtual Threads: Replace Executors.newFixedThreadPool(10) with a virtual thread pool (Java 19+): java Copy code ExecutorService threadPool = Executors.newVirtualThreadPerTaskExecutor(); Error Handling: Enhance the code with exception handling to simulate payment failures or inventory issues. Logging: Use a logging framework like Log4j or SLF4J for detailed logs.

 

 

Focusing on multithreading for a chat messaging system. We'll simulate the operations related to user registration, message sending, receiving messages, and asynchronous database interactions.

This will cover:

  1. User Registration
  2. Message Sending and Receiving
  3. Database interaction via CompletableFuture
  4. Concurrency management using ExecutorService and Phaser for synchronizing tasks.

 

package com.niteshsynergy.multithreading;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.logging.*;

public class ChatMessagingSystem {

   private static final Logger logger = Logger.getLogger(ChatMessagingSystem.class.getName());
   private static final ExecutorService threadPool = Executors.newVirtualThreadPerTaskExecutor(); // Virtual threads for scalability
   private static final ConcurrentHashMap<String, User> registeredUsers = new ConcurrentHashMap<>();
   private static final AtomicInteger messageCount = new AtomicInteger(0); // To track the number of messages sent
   private static final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
   private static final Phaser phaser = new Phaser(1); // Main thread registered for synchronization

   // User class representing each user in the system
   static class User {
       String username;
       String password;
       boolean isOnline;
       BlockingQueue<String> userInbox;

       public User(String username, String password) {
           this.username = username;
           this.password = password;
           this.isOnline = false;
           this.userInbox = new LinkedBlockingQueue<>();
       }
   }

   // User Registration and Login Task
   static class UserRegistrationTask implements Runnable {
       private final String username;
       private final String password;

       public UserRegistrationTask(String username, String password) {
           this.username = username;
           this.password = password;
       }

       @Override
       public void run() {
           try {
               if (!registeredUsers.containsKey(username)) {
                   registeredUsers.put(username, new User(username, password));
                   logger.info("User registered successfully: " + username);
               } else {
                   logger.warning("User already exists: " + username);
               }
               phaser.arriveAndDeregister();
           } catch (Exception e) {
               logger.severe("Error during registration: " + e.getMessage());
               phaser.arriveAndDeregister();
           }
       }
   }

   // Task for sending messages
   static class SendMessageTask implements Runnable {
       private final String sender;
       private final String receiver;
       private final String message;

       public SendMessageTask(String sender, String receiver, String message) {
           this.sender = sender;
           this.receiver = receiver;
           this.message = message;
       }

       @Override
       public void run() {
           try {
               logger.info(sender + " sending message to " + receiver + ": " + message);
               User receiverUser = registeredUsers.get(receiver);
               if (receiverUser != null && receiverUser.isOnline) {
                   receiverUser.userInbox.put(message); // Message is put in receiver's inbox
                   logger.info("Message delivered to " + receiver);
               } else {
                   logger.warning(receiver + " is offline. Message queued.");
                   messageQueue.put(message); // If the user is offline, queue the message
               }
               phaser.arriveAndDeregister();
           } catch (InterruptedException e) {
               logger.severe("Error sending message: " + e.getMessage());
               phaser.arriveAndDeregister();
           }
       }
   }

   // Task for receiving messages
   static class ReceiveMessageTask implements Runnable {
       private final String username;

       public ReceiveMessageTask(String username) {
           this.username = username;
       }

       @Override
       public void run() {
           try {
               User user = registeredUsers.get(username);
               if (user != null && user.isOnline) {
                   String message = user.userInbox.take(); // Get the next message from inbox
                   logger.info(username + " received message: " + message);
               }
               phaser.arriveAndDeregister();
           } catch (InterruptedException e) {
               logger.severe("Error receiving message: " + e.getMessage());
               phaser.arriveAndDeregister();
           }
       }
   }

   // Task to simulate asynchronous database save
   static class AsyncDatabaseSaveTask implements Runnable {
       private final String data;

       public AsyncDatabaseSaveTask(String data) {
           this.data = data;
       }

       @Override
       public void run() {
           try {
               CompletableFuture<Void> dbSave = CompletableFuture.runAsync(() -> {
                   // Simulate database save with delay
                   try {
                       Thread.sleep(500);
                   } catch (InterruptedException e) {
                       logger.severe("Database save interrupted: " + e.getMessage());
                   }
                   logger.info("Data saved to database: " + data);
               }, threadPool);
               dbSave.join(); // Wait for completion
               phaser.arriveAndDeregister();
           } catch (Exception e) {
               logger.severe("Error saving to database: " + e.getMessage());
               phaser.arriveAndDeregister();
           }
       }
   }

   // Main method to simulate users sending and receiving messages
   public static void main(String[] args) throws InterruptedException {
       Scanner scanner = new Scanner(System.in);

       // Registering users
       threadPool.submit(new UserRegistrationTask("alice", "password123"));
       threadPool.submit(new UserRegistrationTask("bob", "password456"));
       threadPool.submit(new UserRegistrationTask("charlie", "password789"));
       threadPool.submit(new UserRegistrationTask("david", "password000"));
       threadPool.submit(new UserRegistrationTask("eve", "password111"));

       phaser.arriveAndAwaitAdvance();

       // Simulating login and messaging
       registeredUsers.get("alice").isOnline = true;
       registeredUsers.get("bob").isOnline = true;

       threadPool.submit(new SendMessageTask("alice", "bob", "Hello Bob!"));
       threadPool.submit(new SendMessageTask("bob", "alice", "Hello Alice!"));
       threadPool.submit(new SendMessageTask("charlie", "bob", "Hey Bob!"));
       threadPool.submit(new SendMessageTask("bob", "charlie", "Hi Charlie!"));
       threadPool.submit(new SendMessageTask("david", "alice", "Good Morning Alice!"));
       threadPool.submit(new SendMessageTask("eve", "alice", "How are you Alice?"));

       phaser.arriveAndAwaitAdvance();

       // Simulate receiving messages
       threadPool.submit(new ReceiveMessageTask("bob"));
       threadPool.submit(new ReceiveMessageTask("alice"));
       threadPool.submit(new ReceiveMessageTask("charlie"));
       threadPool.submit(new ReceiveMessageTask("david"));
       threadPool.submit(new ReceiveMessageTask("eve"));

       phaser.arriveAndAwaitAdvance();

       // Simulate async database save
       threadPool.submit(new AsyncDatabaseSaveTask("User data for Alice"));
       threadPool.submit(new AsyncDatabaseSaveTask("User data for Bob"));
       threadPool.submit(new AsyncDatabaseSaveTask("User data for Charlie"));
       threadPool.submit(new AsyncDatabaseSaveTask("User data for David"));
       threadPool.submit(new AsyncDatabaseSaveTask("User data for Eve"));

       phaser.arriveAndAwaitAdvance();

       // Shutdown the thread pool after all tasks are completed
       threadPool.shutdown();
   }
}
 

 

Key Features of the Code:
Virtual Threads for Scalability:
The use of Executors.newVirtualThreadPerTaskExecutor() enables creating a large number of lightweight threads to handle concurrent tasks efficiently without overwhelming system resources.

Concurrent User Registration:
Multiple users can register simultaneously through the UserRegistrationTask class, which validates user uniqueness and adds them to the system.

Message Sending and Receiving:
The SendMessageTask and ReceiveMessageTask classes handle the sending and receiving of messages. Messages are placed into a BlockingQueue for the receiver to consume. If the receiver is offline, messages are queued.

Asynchronous Database Operations:
The AsyncDatabaseSaveTask simulates saving data asynchronously to a database using CompletableFuture. It showcases how to handle I/O-bound tasks without blocking the main execution thread.

Concurrency Synchronization:
The Phaser is used to synchronize tasks across multiple threads. It ensures that the main thread waits for the completion of all the subtasks before proceeding further.

Real-Time Messaging:
This code simulates a real-time messaging system where messages are delivered as soon as the recipient is online. The system also handles message queuing for users who are offline.

 

Additional Points:
Scalability:
The system can be scaled by adding more users, messages, and tasks.

Real-time Features:
Simulates a real-time messaging environment with message delivery, notifications, and database updates.

Logging:
The logging system helps track and debug the operations, such as user registration, message sending, and database saving.

This code fully uses multithreading to simulate real-world scenarios of messaging systems.

 

6. Callable and ExecutorService
6.1 Callable
Callable returns a result and can throw exceptions.

6.2 ExecutorService
Manages thread pool efficiently.
Executes tasks asynchronously.
Methods of ExecutorService
submit(Callable/Runnable): Submits a task.
invokeAll(tasks): Executes all tasks and returns Future objects.
shutdown(): Initiates an orderly shutdown.

 

7. Why ExecutorService is Crucial

  1. Efficient Resource Management: Reuses threads, minimizing overhead.
  2. Task Lifecycle Management: Simplifies scheduling and execution.
  3. Advanced Control: Supports advanced thread pool configurations.

 

Real-Time Stock Market Analysis

 

1. Fetch Stock Data

package com.niteshsynergy.multithreading;

import java.util.Random;
import java.util.concurrent.Callable;

public class FetchStockData implements Callable<String> {

   private String stockSymbol;

   public FetchStockData(String stockSymbol) {
       this.stockSymbol = stockSymbol;
   }

   @Override
   public String call() throws Exception {
       // Simulate delay in fetching stock data (e.g., network call)
       Thread.sleep(1000);

       // Random stock price generation for demonstration
       double stockPrice = 100 + (new Random().nextDouble() * 50); // Price between 100 to 150
       String stockData = "Stock: " + stockSymbol + ", Price: " + stockPrice;
       
       // Return the stock data fetched
       return stockData;
   }
}
 

2. Analyze Stock Patterns

package com.niteshsynergy.multithreading;

import java.util.concurrent.Callable;

public class AnalyzeStockPatterns implements Callable<String> {

   private String stockData;

   public AnalyzeStockPatterns(String stockData) {
       this.stockData = stockData;
   }

   @Override
   public String call() throws Exception {
       // Simulate analysis delay
       Thread.sleep(2000);

       // Analyze the fetched stock data
       // In this case, we simply check if the price is above a certain threshold
       String[] data = stockData.split(",");
       double price = Double.parseDouble(data[1].split(":")[1].trim());

       String analysisResult = "Analysis for " + data[0].split(":")[1].trim() + ": ";
       if (price > 120) {
           analysisResult += "Bullish trend detected!";
       } else {
           analysisResult += "Bearish trend detected.";
       }
       
       // Return the analysis result
       return analysisResult;
   }
}
 

3.Notify Users

package com.niteshsynergy.multithreading;

import java.util.concurrent.Callable;

public class NotifyUsers implements Callable<String> {

   private String analysisResult;

   public NotifyUsers(String analysisResult) {
       this.analysisResult = analysisResult;
   }

   @Override
   public String call() throws Exception {
       // Simulate user notification delay
       Thread.sleep(500);

       // Simulate sending notifications to users based on analysis result
       return "Notification sent to users: " + analysisResult;
   }
}
 

4. ExecutorService to Manage Threads

package com.niteshsynergy.multithreading;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class StockMarketAnalysis {

   public static void main(String[] args) {
       // Executor service with a fixed thread pool of size 3
       ExecutorService executorService = Executors.newFixedThreadPool(3);

       // Step 1: Fetch Stock Data
       Callable<String> fetchStockData1 = new FetchStockData("AAPL");
       Callable<String> fetchStockData2 = new FetchStockData("MSFT");
       Callable<String> fetchStockData3 = new FetchStockData("GOOG");

       List<Callable<String>> tasks = new ArrayList<>();
       tasks.add(fetchStockData1);
       tasks.add(fetchStockData2);
       tasks.add(fetchStockData3);

       try {
           // Step 2: Execute all tasks and collect the results
           List<Future<String>> stockDataResults = executorService.invokeAll(tasks);
           
           // Process the fetched stock data results
           for (Future<String> result : stockDataResults) {
               String stockData = result.get(); // Fetch stock data
               System.out.println("Fetched Data: " + stockData);
               
               // Step 3: Analyze Patterns based on stock data
               Callable<String> analyzePattern = new AnalyzeStockPatterns(stockData);
               Future<String> analysisResult = executorService.submit(analyzePattern);
               
               String analysis = analysisResult.get(); // Get analysis result
               System.out.println("Analysis Result: " + analysis);
               
               // Step 4: Notify Users based on the analysis
               Callable<String> notifyUsers = new NotifyUsers(analysis);
               Future<String> notificationResult = executorService.submit(notifyUsers);
               
               String notification = notificationResult.get(); // Get notification result
               System.out.println(notification);
           }
       } catch (InterruptedException | ExecutionException e) {
           e.printStackTrace();
       } finally {
           // Shutdown the executor service gracefully
           executorService.shutdown();
       }
   }
}
 

Explanation of Code Components:

FetchStockData (Callable):

Fetches stock data from an external source (simulated).
Returns a string containing the stock symbol and a randomly generated price.
AnalyzeStockPatterns (Callable):

Analyzes the fetched stock data.
It checks if the stock price is above a certain threshold (120 in this case) to determine if the trend is bullish or bearish.
NotifyUsers (Callable):

Simulates sending notifications to users based on the analysis.
In a real-world scenario, this could be an API call to send emails, push notifications, or SMS.
ExecutorService:

We use an ExecutorService with a fixed thread pool of size 3 to handle three tasks concurrently (fetching data, analyzing it, and notifying users).
The invokeAll method is used to execute all tasks at once and gather the results.
The submit method is used to submit individual tasks like pattern analysis and user notifications.

 

Key Features of This Code:
Callable and ExecutorService: Using Callable allows each task to return a result, and ExecutorService manages the execution of these tasks in parallel.
Concurrency and Parallelism: Tasks like fetching data, analyzing patterns, and notifying users are executed concurrently, simulating a real-time system that handles multiple tasks in parallel.
Future Handling: The use of Future objects allows us to wait for the completion of each task and process the results in order.

 

Why This Is A Complex Example:

  • Simulating Multiple Real-Time Processes: This example mimics how a real-time stock market system would handle multiple independent tasks like data fetching, pattern analysis, and user notifications concurrently.
    ExecutorService & Callable Usage: Demonstrates advanced use of ExecutorService and Callable for efficient parallel task management.
    Scalable Architecture: The design can easily be scaled to handle more stock symbols or additional tasks like stock trading, by simply adding more tasks to the thread pool.

 

Thanks for reading multithreading blog from Nitesh Synergy!…..

 

84 min read
Nov 21, 2024
By Nitesh Synergy
Share