Producer consumer problem using counting semaphore
Note: This solution is available from C++20
Introduction:
The intention of this post to provide the solution for producer consumer problem using counting semaphore.
A counting semaphore is a special semaphore with a counter bigger than zero. The counter is initialized in the constructor. Acquiring the semaphore decreases the counter, and releasing the semaphore increases the counter. If a thread tries to acquire the semaphore when the counter is zero, the thread will block until another thread increments the counter by releasing the semaphore.
It supports the two operations wait
and signal
. wait
acquires the semaphore and decreases the counter; it blocks the thread acquiring it if the counter is zero. signal
releases the semaphore and increases the counter. Blocked threads are added to the queue to avoid starvation.
Github link: sourcecode/producerconsumerusingcountingsemaphore.h
Implementation:
const uint8_t MAX_SLOTS = 10;
std::counting_semaphore<MAX_SLOTS>
availableSlots{MAX_SLOTS},
filledSlots{ 0 };
Mutex:
std::mutex resourceMutex;
Shared resource:
//Shared memory resource
queue<int> cBuffer;
Producer thread function:
//producer thread function
void producerThreadFn()
{
//this for loop is only for testing purpose to limit the number of cycles
for (uint8_t idx = 0; idx < MAX_SLOTS; idx++)
{
//1. produce the data
gdata++;
//2.Check for the free slot
availableSlots.acquire();
{
//once the semaphore acquired, create lock_guard before accessing the shared data.
std::lock_guard<std::mutex> lock(resourceMutex);
cBuffer.push(gdata);
}
filledSlots.release();
}
}
Consumer thread function:
//consumer thread function
void consumerThreadFn()
{
//this for loop is only for testing purpose to limit the number of cycles
for (uint8_t idx = 0; idx < 2; idx++)
{
//1.check for the empty buffer
filledSlots.acquire();
{
//once the semaphore acquired, create lock_guard before accessing the shared data.
std::lock_guard<std::mutex> lock(resourceMutex);
cout << "Consumed data = " << cBuffer.front() << endl;
cBuffer.pop();
//3. Inform the producer about the free slot
availableSlots.release();
}
}
}
Thread creation:
//test function
void test()
{
//create threads
thread producer(&producerThreadFn);
thread consumer1(&consumerThreadFn);
thread consumer2(&consumerThreadFn);
thread consumer3(&consumerThreadFn);
thread consumer4(&consumerThreadFn);
//join threads, consumers first
consumer1.join();
consumer2.join();
consumer3.join();
consumer4.join();
producer.join();
}