Non-Repeating Elements of a given array using Multithreaded program

Given an array arr[] of size N and an integer T representing the count of threads, the task is to find all non-repeating array elements using multithreading.
Examples:
Input: arr[] = { 1, 0, 5, 5, 2}, T = 3Â
Output: 0 1 2Â
Explanation:Â
The frequency of 0 in the array arr[] is 1.Â
The frequency of 1 in the array arr[] is 1.Â
The frequency of 2 in the array arr[] is 1.Â
Therefore, the required output is 0 1 2Input: arr[] = { 1, 1, 5, 5, 2, 4 }, T = 3Â
Output: 2 4Â
Explanation:Â
The frequency of 2 in the array arr[] is 1.Â
The frequency of 4 in the array arr[] is 1.Â
Therefore, the required output is 2 4
Approach: The idea is to use the pthread library available in C++ to create multiple threads for concurrent process flow and perform multiple operations( pthread create, pthread join , lock, etc) in multithreaded program. Follow the steps below to solve the problem:
- Divide the array into T subarrays, such that each subarray of size N / T will be executed in a single thread.
- Initialize a Map, say mp, to store the frequencies of each array element.
- Create a pthread_mutex_lock, say lock1, to ensure that all threads do not trip over each other and corrupt the Map container.
- Define a function func() for executing the body of a thread. This function is often called the kernel of the thread and is provided during thread creation.
- Lock the current thread using pthread_mutex_lock() so that it does not overlap with other threads
- Traverse through the given range as an argument to the function func() in the array arr[] and increment the frequency of the array element which is encountered.
- Release the current thread using the function pthread_mutex_unlock().
- Initialize an array, say thread[], of type pthread_t for storing the threads.
- Iterate over the range [0, T] and create a thread by calling pthread_create() function and store it in the thread[i]
- While each thread performs their individual tasks, the main() function will need to wait till each of the threads finish their work.Â
- Use pthread_join() function for waiting till each thread finishes executing function func()
- Iterate over the range [0, T] and call pthread_create() function for each thread[i]
- Finally, traverse the map mp and print the element occurring only once.
Below is the implementation of the above approach:
C++
// C++ program to implement// the above approachÂ
#include <bits/stdc++.h>#include <pthread.h>using namespace std;Â
// Structure of subarray // of the arraystruct range_info {Â
    // Stores start index    // of the subarray    int start;Â
    // Stores end index    // of the subarray    int end;Â
    // Stores pointer to the    // first array element    int* a;};Â
Â
// Declaring map, and mutex for// thread exclusion(lock)map<int, int> mp;pthread_mutex_t lock1;Â
Â
// Function performed by every thread to// count the frequency of array elements// in current subarrayvoid* func(void* arg){    // Taking a lock so that threads    // do not overlap each other    pthread_mutex_lock(&lock1);              // Initialize range_info     // for each thread    struct range_info* rptr     = (struct range_info*)arg;Â
Â
    // Thread is going through the array    // to check and update the map    for (int i = rptr->start;             i <= rptr->end; i++) {                                      // Stores iterator to the map                map<int, int>::iterator it;                          // Update it        it = mp.find(rptr->a[i]);                          // If rptr->a[i] not found        // in map mp        if (it == mp.end()) {                                      // Insert rptr->a[i] with             // frequency 1            mp.insert({ rptr->a[i], 1 });        }        else {                                      // Update frequency of            // current element            it->second++;        }    }     Â
    // Thread releases the lock    pthread_mutex_unlock(&lock1);    return NULL;}Â
Â
// Function to find the unique// numbers in the arrayvoid numbers_occuring_once(int arr[],                        int N, int T){    // Stores all threads    pthread_t threads[T];Â
    // Stores start index of    // first thread    int spos = 0;Â
    // Stores last index     // of the first thread    int epos = spos + (N / T) - 1;Â
    // Traverse each thread    for (int i = 0; i < T; i++) {Â
        // Initialize range_info for        // current thread        struct range_info* rptr            = (struct range_info*)malloc(                sizeof(struct range_info));Â
        // Update start index of        // current subarray            rptr->start = spos;Â
        // Stores end index of        // current subarray        rptr->end = epos;Â
        // Update pointer to the first        // element of the array        rptr->a = arr;                          // creating each thread with         // appropriate parameters        int a        = pthread_create(&threads[i], NULL,                        func, (void*)(rptr));                                                          // updating the parameters         // for the next thread        spos = epos + 1;        epos = spos + (N / T) - 1;        if (i == T - 2) {            epos = N - 1;        }    }Â
    // Waiting for threads to finish    for (int i = 0; i < T; i++) {        int rc         = pthread_join(threads[i], NULL);    }Â
    // Traverse the map    for (auto it: mp) {                                                                           // If frequency of current     // element is 1                                    if (it.second == 1) {Â
            // Print the element            cout << it.first << " ";        }    }}Â
Â
// Drivers Codeint main(){    // initializing the mutex lock     pthread_mutex_init(&lock1, NULL);    int T = 3;    int arr[] = { 1, 0, 5, 5, 2, 6 };    int N = sizeof(arr) / sizeof(arr[0]);    numbers_occuring_once(arr, N, T);} |
Java
import java.util.HashMap;import java.util.Map;Â
public class Main {Â
  // Structure of subarray of the array  static class RangeInfo {Â
    // Stores start index of the subarray    int start;Â
    // Stores end index of the subarray    int end;Â
    // Stores pointer to the first array element    int[] a;Â
    RangeInfo(int start, int end, int[] a)    {      this.start = start;      this.end = end;      this.a = a;    }  }Â
  // Declaring map for thread exclusion(lock)  static Map<Integer, Integer> mp = new HashMap<>();Â
  // Function performed by every thread to  // count the frequency of array elements  // in current subarray  static void func(RangeInfo rptr)  {    // Initialize range_info for each threadÂ
    // Thread is going through the array    // to check and update the map    for (int i = rptr.start; i <= rptr.end; i++) {Â
      // Stores value of the current element      int curr = rptr.a[i];Â
      // Synchronize access to the map      synchronized (mp)      {        // Stores the frequency of the current        // element        Integer freq = mp.get(curr);Â
        // If curr not found in map mp        if (freq == null) {          // Insert curr with frequency 1          mp.put(curr, 1);        }        else {          // Update frequency of current element          mp.put(curr, freq + 1);        }      }    }  }Â
  // Function to find the unique numbers in the array  static void numbersOccurringOnce(int[] arr, int n,                                   int t)  {    // Stores all threads    Thread[] threads = new Thread[t];Â
    // Stores start index of first thread    final int spos = 0;Â
    // Stores last index of the first thread    final int epos = spos + (n / t) - 1;Â
    // Traverse each thread    for (int i = 0; i < t; i++) {Â
      // Update start index of current subarray      final int start = spos + (n / t) * i;Â
      // Stores end index of current subarray      final int end        = i == t - 1 ? n - 1        : spos + (n / t) * (i + 1) - 1;Â
      // Initialize range_info for current thread      RangeInfo rptr = new RangeInfo(start, end, arr);Â
      // creating each thread with appropriate      // parameters      threads[i] = new Thread(() -> func(rptr));    }Â
    // Starting all threads    for (int i = 0; i < t; i++) {      threads[i].start();    }Â
    // Waiting for threads to finish    for (int i = 0; i < t; i++) {      try {        threads[i].join();      }      catch (InterruptedException e) {        e.printStackTrace();      }    }Â
    // Traverse the map    for (Map.Entry<Integer, Integer> it :         mp.entrySet()) {      // If frequency of current element is 1      if (it.getValue() == 1) {        // Print the element        System.out.print(it.getKey() + " ");      }    }  }Â
  // Driver's Code  public static void main(String[] args)    throws InterruptedException  {    // initializing the mutex lock    Object lock1 = new Object();Â
    int T = 3;    int[] arr = { 1, 0, 5, 5, 2, 6 };    int N = arr.length;    numbersOccurringOnce(arr, N, T);  }} |
C#
using System;using System.Collections.Generic;using System.Threading;Â
// Structure of subarray of the arrayclass RangeInfo {    // Stores start index of the subarray    public int start;Â
    // Stores end index of the subarray    public int end;Â
    // Stores pointer to the first array element    public int[] a;Â
    public RangeInfo(int start, int end, int[] a)    {        this.start = start;        this.end = end;        this.a = a;    }}Â
public class GFG {Â
    // Declaring map for thread exclusion(lock)    static Dictionary<int, int> mp        = new Dictionary<int, int>();Â
    // Function performed by every thread to    // count the frequency of array elements    // in current subarray    static void func(RangeInfo rptr)    {        // Thread is going through the array        // to check and update the map        for (int i = rptr.start; i <= rptr.end; i++) {            // Stores value of the current element            int curr = rptr.a[i];Â
            // Synchronize access to the map            lock(mp)            {                // Stores the frequency of the current                // element                int freq;                mp.TryGetValue(curr, out freq);Â
                // If curr not found in map mp                if (freq == 0) {                    // Insert curr with frequency 1                    mp.Add(curr, 1);                }                else {                    // Update frequency of current element                    mp[curr] = freq + 1;                }            }        }    }Â
    // Function to find the unique numbers in the array    static void numbersOccurringOnce(int[] arr, int n,                                     int t)    {        // Stores all threads        Thread[] threads = new Thread[t];Â
        // Stores start index of first thread        int spos = 0;Â
        // Stores last index of the first thread        int epos = spos + (n / t) - 1;Â
        // Traverse each thread        for (int i = 0; i < t; i++) {            // Update start index of current subarray            int start = spos + (n / t) * i;Â
            // Stores end index of current subarray            int end = i == t - 1                          ? n - 1                          : spos + (n / t) * (i + 1) - 1;Â
            // Initialize range_info for current thread            RangeInfo rptr = new RangeInfo(start, end, arr);Â
            // creating each thread with appropriate            // parameters            threads[i] = new Thread(() => func(rptr));        }Â
        // Starting all threads        for (int i = 0; i < t; i++) {            threads[i].Start();        }Â
        // Waiting for threads to finish        for (int i = 0; i < t; i++) {            threads[i].Join();        }Â
        // Traverse the map        foreach(KeyValuePair<int, int> entry in mp)        {            // If frequency of current element is 1            if (entry.Value == 1) {                // Print the element                Console.Write(entry.Key + " ");            }        }    }Â
    // Driver's Code    public static void Main(string[] args)    {        int T = 3;        int[] arr = { 1, 0, 5, 5, 2, 6 };        int N = arr.Length;Â
        numbersOccurringOnce(arr, N, T);    }} |
Python3
import threadingÂ
# Structure of subarray of the arrayclass RangeInfo:    # Stores start index of the subarray    def __init__(self, start, end, a):        self.start = start        self.end = end        self.a = aÂ
# Declaring map for thread exclusion(lock)mp = {}Â
# Function performed by every thread to# count the frequency of array elements# in current subarraydef func(rptr):    # Thread is going through the array    # to check and update the map    for i in range(rptr.start, rptr.end + 1):        # Stores value of the current element        curr = rptr.a[i]Â
        # Synchronize access to the map        with threading.Lock():            # Stores the frequency of the current            # element            freq = mp.get(curr, 0)Â
            # If curr not found in map mp            if freq == 0:                # Insert curr with frequency 1                mp[curr] = 1            else:                # Update frequency of current element                mp[curr] = freq + 1Â
# Function to find the unique numbers in the arraydef numbersOccurringOnce(arr, n, t):    # Stores all threads    threads = []Â
    # Stores start index of first thread    spos = 0Â
    # Stores last index of the first thread    epos = spos + (n // t) - 1Â
    # Traverse each thread    for i in range(t):        # Update start index of current subarray        start = spos + (n // t) * iÂ
        # Stores end index of current subarray        end = n - 1 if i == t - 1 else spos + (n // t) * (i + 1) - 1Â
        # Initialize range_info for current thread        rptr = RangeInfo(start, end, arr)Â
        # creating each thread with appropriate        # parameters        threads.append(threading.Thread(target=func, args=(rptr,)))Â
    # Starting all threads    for thread in threads:        thread.start()Â
    # Waiting for threads to finish    for thread in threads:        thread.join()Â
    # Traverse the map    for key, value in mp.items():        # If frequency of current element is 1        if value == 1:            # Print the element            print(key, end=" ")Â
# Drivers Codeif __name__ == "__main__":Â Â Â Â T = 3Â Â Â Â arr = [1, 0, 5, 5, 2, 6]Â Â Â Â N = len(arr)Â
    numbersOccurringOnce(arr, N, T) |
Time Complexity: O(N * log(N))
Auxiliary Space: O(N)
Note: It is recommended to execute the program in a Linux based system using the following command:
g++ -pthread program_name.cpp
Ready to dive in? Explore our Free Demo Content and join our DSA course, trusted by over 100,000 zambiatek!



