CustomThreadPoolExecutor in Java Executor Framework

Executors Manage thread execution. At the top of the executor, hierarchy is the Executor interface, which is used to initiate a thread. ExecutorService Extends Executor and provides methods that manage execution. There are three implementations of ExecutorService: ThreadPoolExecutor, ScheduledThreadPoolExecutor, and ForkJoinPool. java.util.concurrent also defines the Executors utility class, which includes some static methods that simplify the creation of various executors. Related to executors are the Future and Callable interfaces. A Future contains a value that is returned by a thread after it executes. Thus, its value becomes defined βin the future,β when the thread terminates. Callable defines a thread that returns a value. In this article, we are going to learn about Custom ThreadPoolExecutor in java.
First, let us discuss two concepts been aggressively used here namely thread pool and blocking queue. Β
- ThreadPool is a container in which contains some numbers of threads. These threads are given some tasks. When one thread completes its task next task is given to it. While working in a multi-threading environment itβs not practical to create new individual threads for each new task, because creating a new thread is overhead for the operating system.
- A blocking queue is a queue that blocks when you try to dequeue from it and the queue is empty, If you try to enqueue items to t and the queue is already full. All operations in the blocking queue are thread-safe.
Also, the important specific methods that are to be implemented are as follows:
Method 1: execute()
This method is contained in the Executor interface. This function executes the given task at some time in the future. It returns nothing hence the return type of this method is void.
Method 2: myNewFixedThreadPool()
This is a factory method of Executors class. It is used to create a fixed number of threads in the thread pool.
- Parameter: int number of threads
- Return type: ExecutorServiceΒ
Procedure:
- Create an interface in which we will create a execute method. This method will execute the task given to it.
- In the code above generated, we have implemented a runnable interface. We are printing the current name of the thread with a delay of 1000 milliseconds. These are the tasks which we are going to execute.
- MyExecutor class provides a static method myNewFixedThreadPool in which we will pass the number of threads we want to create. This method tells the thread pool that how many threads are going to be there in the thread pool. These threads will execute tasks till all tasks get completed.
- This is the custom thread pool class. This class is the heart of the whole mechanism. It uses two important concepts LinkedBlockingQueue and Execution class. The execution class is explained further. This class receives the thread count from the myNewFixedThreadPool method. All the tasks we submit are stored in the queue. All the threads will fetch the tasks from the queue. We submit the task by using the execute method of the MyExecuorService.
- Execution class performs the very important task of adding creating the number of threads that we want in our thread pool. This class is where we are defining how to fetch the task from LinkedBlockingQueue.
- Finally, in this class, we are gathering all the pieces together and our custom thread pool is ready.
Implementation: Here we are passing some threads as 3. The number of tasks is 20 and executing them by using execute method.Β
Java
// Java Program to illustrate Concept of// CustomThreadPoolExecutor Executor FrameworkΒ
// Importing LinkedBlockingQueue class from java.util// packageimport java.util.concurrent.LinkedBlockingQueue;Β
// Interface// Custom interface for which contains execute methodinterface MyExecutorService {Β
Β Β Β Β // MethodΒ Β Β Β void execute(Runnable r);}Β
// Class 1// Helper classclass MyExecutors {Β
Β Β Β Β // Member variables of this classΒ Β Β Β int capacity;Β
Β Β Β Β // Passing the number of threads thatΒ Β Β Β // will be in the thread poolΒ Β Β Β static MyExecutorServiceΒ Β Β Β myNewFixedThreadPool(int capacity)Β Β Β Β {Β
Β Β Β Β Β Β Β Β return new MyThreadPool(capacity);Β Β Β Β }}Β
// Class 2// Helper class extending to MyExecutorService interfaceclass MyThreadPool implements MyExecutorService {Β
Β Β Β Β // Member variables of this classΒ Β Β Β static int capacity;Β Β Β Β static int currentCapacity;Β
Β Β Β Β // Creating object of LinkedBlockingQueue classΒ Β Β Β // Declaring object of type RunnableΒ Β Β Β static LinkedBlockingQueue<Runnable>Β Β Β Β Β Β Β Β linkedTaskBlockingQueue;Β
Β Β Β Β // Member variables of this classΒ Β Β Β Execution e;Β
Β Β Β Β // Method 1Β Β Β Β public MyThreadPool(int capacity)Β Β Β Β {Β
Β Β Β Β Β Β Β Β // Member variables of this classΒ
Β Β Β Β Β Β Β Β // this keyword refers to current instance itselfΒ Β Β Β Β Β Β Β this.capacity = capacity;Β Β Β Β Β Β Β Β currentCapacity = 0;Β
Β Β Β Β Β Β Β Β // Creating a linked blocking queue which will blockΒ Β Β Β Β Β Β Β // if its emptyΒ Β Β Β Β Β Β Β // and it will perform thread safe operation.Β Β Β Β Β Β Β Β linkedTaskBlockingQueueΒ Β Β Β Β Β Β Β Β Β Β Β = new LinkedBlockingQueue<Runnable>();Β
Β Β Β Β Β Β Β Β // Creating the object of execution classΒ Β Β Β Β Β Β Β e = new Execution();Β Β Β Β }Β
Β Β Β Β // Method 2Β Β Β Β // @OverrideΒ Β Β Β public void execute(Runnable r)Β Β Β Β {Β
Β Β Β Β Β Β Β Β // Declaring and adding tasks toΒ Β Β Β Β Β Β Β // blocking queue using add() methodΒ Β Β Β Β Β Β Β linkedTaskBlockingQueue.add(r);Β
Β Β Β Β Β Β Β Β // executeMyMethod() method of Execution classΒ Β Β Β Β Β Β Β // which will execute the tasksΒ Β Β Β Β Β Β Β e.executeMyMethod();Β Β Β Β }}Β
// Class 3// Helper class extending Runnable interfaceclass Execution implements Runnable {Β
Β Β Β Β // Method 1 ofΒ this classΒ Β Β Β void executeMyMethod()Β Β Β Β {Β
Β Β Β Β Β Β Β Β // At start the current capacity will be 0Β Β Β Β Β Β Β Β // The another capacity is the number of threads weΒ Β Β Β Β Β Β Β // want to create so we will increase the currentΒ Β Β Β Β Β Β Β // capacity count after creating each thread itΒ Β Β Β Β Β Β Β // means that we will create the threads if theΒ Β Β Β Β Β Β Β // current capacity is less than capacity passed byΒ Β Β Β Β Β Β Β // us i.e number of threads we want to create.Β
Β Β Β Β Β Β Β Β // In this case 3 threads will get createdΒ Β Β Β Β Β Β Β if (MyThreadPool.currentCapacityΒ Β Β Β Β Β Β Β Β Β Β Β < MyThreadPool.capacity) {Β Β Β Β Β Β Β Β Β Β Β Β MyThreadPool.currentCapacity++;Β
Β Β Β Β Β Β Β Β Β Β Β Β // Creating object of Thread classΒ Β Β Β Β Β Β Β Β Β Β Β Thread t = new Thread(new Execution());Β
Β Β Β Β Β Β Β Β Β Β Β Β // Starting the threadΒ Β Β Β Β Β Β Β Β Β Β Β t.start();Β Β Β Β Β Β Β Β }Β Β Β Β }Β
Β Β Β Β // Method 2 of this classΒ Β Β Β // @OverrideΒ Β Β Β public void run()Β Β Β Β {Β
Β Β Β Β Β Β Β Β // Till it is trueΒ Β Β Β Β Β Β Β while (true) {Β
Β Β Β Β Β Β Β Β Β Β Β Β // Here we are fetching the tasks from theΒ Β Β Β Β Β Β Β Β Β Β Β // linkedblocking queueΒ Β Β Β Β Β Β Β Β Β Β Β // which we have submitted using execute methodΒ Β Β Β Β Β Β Β Β Β Β Β // and executing themΒ Β Β Β Β Β Β Β Β Β Β Β if (MyThreadPool.linkedTaskBlockingQueue.size()Β Β Β Β Β Β Β Β Β Β Β Β Β Β Β Β != 0) {Β Β Β Β Β Β Β Β Β Β Β Β Β Β Β Β MyThreadPool.linkedTaskBlockingQueue.poll()Β Β Β Β Β Β Β Β Β Β Β Β Β Β Β Β Β Β Β Β .run();Β Β Β Β Β Β Β Β Β Β Β Β }Β Β Β Β Β Β Β Β }Β Β Β Β }}Β
// Class 4// Helper class// Here we are creating a simple task// which is printing current thread nameclass Mytask implements Runnable {Β
Β Β Β Β // Method 1 of this classΒ Β Β Β // @OverrideΒ Β Β Β public void run()Β Β Β Β {Β
Β Β Β Β Β Β Β Β // Try block to check for exceptionsΒ Β Β Β Β Β Β Β try {Β
Β Β Β Β Β Β Β Β Β Β Β Β // Making thread to pause for a secondΒ Β Β Β Β Β Β Β Β Β Β Β // using sleep() methodΒ Β Β Β Β Β Β Β Β Β Β Β Thread.sleep(1000);Β Β Β Β Β Β Β Β }Β
Β Β Β Β Β Β Β Β // Catch block to check for exceptionsΒ Β Β Β Β Β Β Β catch (InterruptedException e) {Β
Β Β Β Β Β Β Β Β Β Β Β Β // Print the exception scaling ith line numberΒ Β Β Β Β Β Β Β Β Β Β Β // using printStackTrace() methodΒ Β Β Β Β Β Β Β Β Β Β Β e.printStackTrace();Β Β Β Β Β Β Β Β }Β
Β Β Β Β Β Β Β Β // Print and display the current thread usingΒ Β Β Β Β Β Β Β // currentThread() method by getting thread nameΒ Β Β Β Β Β Β Β // using getName() methodΒ Β Β Β Β Β Β Β System.out.println(Β Β Β Β Β Β Β Β Β Β Β Β "Current Thread :-> "Β Β Β Β Β Β Β Β Β Β Β Β + Thread.currentThread().getName());Β Β Β Β }}Β
// Class 5// Main Classpublic class ExecutorServiceCustom {Β Β Β Β // Main driver methodΒ Β Β Β public static void main(String[] args)Β Β Β Β {Β Β Β Β Β Β Β Β // Getting the object of MyExcutorService by usingΒ Β Β Β Β Β Β Β //Β the factory method myNewFixedThreadPoolΒ
Β Β Β Β Β Β Β Β // Passing number of threads as 3Β Β Β Β Β Β Β Β MyExecutorService serviceΒ Β Β Β Β Β Β Β Β Β Β Β = MyExecutors.myNewFixedThreadPool(3);Β
Β Β Β Β Β Β Β Β for (int i = 0; i < 20; i++) {Β
Β Β Β Β Β Β Β Β Β Β Β Β // Creating 20 tasks and passing them to executeΒ Β Β Β Β Β Β Β Β Β Β Β service.execute(new Mytask());Β Β Β Β Β Β Β Β }Β
Β Β Β Β Β Β Β Β Runnable runnableTask = null;Β Β Β Β }} |
Output:Β
Current Thread :-> Thread-0 Current Thread :-> Thread-1 Current Thread :-> Thread-2 Current Thread :-> Thread-0 Current Thread :-> Thread-1 Current Thread :-> Thread-2 Current Thread :-> Thread-0 Current Thread :-> Thread-1 Current Thread :-> Thread-2 Current Thread :-> Thread-0 Current Thread :-> Thread-1 Current Thread :-> Thread-2 Current Thread :-> Thread-0 Current Thread :-> Thread-1 Current Thread :-> Thread-2 Current Thread :-> Thread-0 Current Thread :-> Thread-1 Current Thread :-> Thread-2 Current Thread :-> Thread-0 Current Thread :-> Thread-1
Note: In the above output, we have printed the thread name as defined in the runnable 20 times as we have submitted 20 tasks which is visually described through a video below
Β



