Producer Consumer Problem using condition_variable
Producer Consumer Problem:
Introduction:
Producer consumer problem is also called as bounded-buffer problem. This is the classic example for
thread synchronization.
The task of the Producer is to produce the item, put it into the memory buffer, and again start producing items. Whereas the task of the Consumer is to consume the item from the memory buffer. The same memory buffer is shared by both producers and consumers which is of fixed-size.
The intention of the solution is to provide the shared memory resource between producer and consumer threads, and synchronization between these threads , and exclusive access to the shared buffer.
Key points:
- The producer should produce data only when the buffer is not full. If buffer is full, the producer is not allowed to store any data into the memory buffer until free space available.
- Consumer can consume the data only when the buffer is not empty. If buffer is empty, the consumer is not allowed to use any data from the memory buffer until some data is available in the buffer.
- Accessing memory buffer should not be allowed to producer and consumer at the same time.
Implementation:
Complete source code link: https://github.com/krishnaKSA/Multithreading/blob/main/sourcecode/producerconsumerusingcondvar.h
Semaphore class using std::condition_variable:
Here, we are going to implement custom semaphore class which has the functionality of wait, notifyall, notifyone using std::condition_variable.
class semaphore
{
private:
mutable std::mutex data_mutex; //mutex. mutuable since used in the getSize() const function
std::condition_variable data_condVariable; //event handling across threads
uint8_t data_available; //data availability
public:
//constructor
explicit semaphore(uint8_t available);
void wait();
void notifyall();
void notifyone();
uint8_t getsize() const;
};
std::condition_variable is synchronization primitive that allows thread to wait until certain condition occurs. It has two operations wait and notify. Its used with mutex.Constructor:
While creating the instance of semaphore class provide number of slots available. This is referred as number of free slots available to fill the data by producer, and number of filled data by consumer.
//constructor
explicit semaphore(uint8_t available)
{
data_available = available;
}
Creating two instance of semaphore class. available is used by producer and filled is used by consumer.semaphore available(10); //number of free positions
semaphore filled(0); //number of filled positions
Wait:
This wait function called by both producer and consumer before accessing the shared memory buffer.
Producer calls this function to know the free slots available to store the produced data. Producer calls avaialble.wait(). In this case, available.data_available variable will be evaluated to see the free space.
Consumer calls this function to know any data available to consume. Consumer calls filled.wait(). In this case, filled.data_available will be evaluated.
void wait()
{
//Get the unique lock, so that other threads can't get the lock
std::unique_lock<std::mutex> lock(data_mutex);
//if no data/space available to consume or store , wait for it until its available
while (data_available == 0)
{
//this conditional_variable is used to signaling between the threads
data_condVariable.wait(lock); //this internally release the lock, and wait for the notification.
}
//once, notified get the lock again , and reduce the data_available.
--data_available;
//unique lock is released here
}
Notify:
When the certain condition occurs, producer/consumer invokes the notify function.
Here, there are two notify functions.
1. notify_one() -> Notifies to only one thread
2. notify_all() -> Notifies all the threads.
Once the producer stores the data into the buffer, consumer thread is notified (filled.notifyall()) about the stored data.
Once the consumer takes the data from the buffer, producer thread is notified (available.notifyall()) about the free slots.
void notifyall()
{
//since we are going to notify, lockguard is sufficient
std::lock_guard<std::mutex> lock(data_mutex); //similar to data_mutex.lock() but no unlock requires here
//if data available, notify to all
//increment the data_available
if (data_available++ == 0)
{
//this conditional_variable is used to signaling between the threads
data_condVariable.notify_all();
}
//lock is released here
}
void notifyone()
{
//since we are going to notify, lockguard is sufficient
std::lock_guard<std::mutex> lock(data_mutex); //similar to data_mutex.lock() but no unlock requires here
//if data available, notify to all
if (data_available++ == 0)
{
//this conditional_variable is used to signaling between the threads
data_condVariable.notify_one();
}
//lock is released here
}
Buffer:
Here, I have used circularBuffer as shared resource between producer and consumer.
CCircularBuffer* cBuffer = CCircularBuffer::getCircularBuffer();
Producer thread function:
1. Produce the data
2. Check the free slots to store the data
3. Invokes wait function to check free slots. If buffer is full, thread will be blocked.
4. Once the free slot is available , create std::lock_guard using mutex, and push the data to the buffer.
5. Notify the consumer thread about the filled data.
Added for loop just to limit the number of times to produce the data.
//thread function
void operator()()
{
for (uint8_t index = 0; index < num_of_trails; index++)
{
//produce data
++produced_data;
//check the available slot to fill the data from producer side
available.wait();
{
std::lock_guard<std::mutex> lock(l_mutex);
//push to the circular buffer
cBuffer->push(produced_data);
}
//call notifyall to the filled object.
filled.notifyall();
}
}
Consumer thread function:
1. Invokes wait function to check the data availability
2. If buffer is empty, this thread will be blocked.
3. Once data is available, thread will be unblocked.
4. Create std::lock_guard using mutex, and consume the data from the buffer
5. Notify the producer thread about the free slot.
//thread function
void operator()()
{
for (uint8_t index = 0; index < num_of_trails; index++)
{
//check any data available to consume
filled.wait();
{
std::lock_guard<std::mutex> lock(l_mutex);
printf("consumerId: %d", consumerId);
cout <<" consumed data : " << cBuffer->pop() << endl;
}
//call notifyall to the available object.
available.notifyall();
}
}
Thread creation:
Here, I created one producer thread , and three consumer threads.
//create instance for producer and consumer
CProducer producer(12);
CConsumer consumer1(3, 0);
CConsumer consumer2(5, 1);
CConsumer consumer3(2, 2);
//create threads
//start consumer first then producer
thread cons1{ consumer1 };
thread cons2{ consumer2 };
thread cons3{ consumer3 };
thread prod{ producer };
//join the threads
prod.join();
cons1.join();
cons2.join();
cons3.join();