FRI-OS
example/posix/threads-producer_consumer.cpp

An example of the producer / consumer problem

//LDFLAGS = -lpthread
#include <iostream>
#include <cstdlib>
#include <vector>
#include <queue>
#include <algorithm>
#include <cassert>
#include <frios/posix.hpp>
// Queue with added synchronization
class interlocked_queue
{
private:
// the standard unsynchronizaed queue
std::queue<int> _queue;
// mutex for synchronizing access to the queue
public:
interlocked_queue(void) = default;
// push a new value to the queue
void push(int value)
{
// ensure that the mutex is locked
// push the value to the queue
_queue.push(value);
}
// pop a value from the queue
int pop(void)
{
// ensure that the mutex is locked
// get the "oldest" element in the queue
int result = _queue.front();
// remove it from the queue
_queue.pop();
// return the value
return result;
}
// check if the queue is empty
bool empty(void)
{
// ensure that the mutex is locked
// delegate the call to the internal queue
return _queue.empty();
}
};
// custom implementation of an atomic boolean value
class atomic_bool
{
private:
// the value
bool _value;
// the mutex for synchronization of access to _value
public:
// initialized to false by default
atomic_bool(void)
: _value(false)
{ }
// interlocked conversion to bool (getter)
operator bool (void)
{
// ensure that the mutex is locked
// return the value
return _value;
}
// interlocked assignment (setter)
atomic_bool& operator = (bool value)
{
// ensure that the mutex is locked
// set the value
_value = value;
return *this;
}
};
// the data shared by the worker threads
struct shared_data
{
// the count of numbers to be generated by the generator
std::size_t count;
// the range from which to generate random numbers
int range;
// indicates that the generator has finished generating
atomic_bool done_generating;
// indicates that the tester has finished testing the numbers
atomic_bool done_testing;
// a queue for numbers generated by the generator
// and tested by the tester
interlocked_queue generated;
// queues for prime and non-prime numbers as tested
// by the tester and printed by the printer
interlocked_queue primes;
interlocked_queue non_primes;
};
// the main function of the generator thread
void* generator_main(void* raw_ptr)
{
// the data pointer must be non-null
assert(raw_ptr);
// cast it to the appropriate type and dereference it
shared_data& data = *static_cast<shared_data*>(raw_ptr);
// generate data.count random numbers from range <0, data.range>
// and push them into the data.generated queue
for(std::size_t i=0; i!=data.count; ++i)
data.generated.push(1 + std::rand() % data.range);
// indicate that the generating has finished
data.done_generating = true;
return nullptr;
}
// function that checks if p is divisible by the numbers in primes.
// the primes vector is assumed to contain prime all numbers
// smaller or equal to p
bool is_prime(const std::vector<int>& primes, int p)
{
// if p is 1 it is considered prime
if(p <= 1) return true;
// if we have not found any number that divides p
// and is not equal to p then p is considered prime
return std::find_if(
primes.begin(),
primes.end(),
[p](int div) -> bool
{
return (p != div) && (p % div == 0);
}
) == primes.end();
}
// update the vector primes to contain all primes
// that are smaller than or equal to n
void update_primes(std::vector<int>& primes, int n)
{
int p = primes.back()+1;
while(true)
{
if(is_prime(primes, p))
{
primes.push_back(p);
if(p >= n) break;
}
++p;
}
}
// the main function of the tester thread which pops numbers
// generated by generator and tests if they are prime
// and pushes them to the queues read by the printer thread
void* tester_main(void* raw_ptr)
{
// check, cast and dereference the pointer
assert(raw_ptr);
shared_data& data = *static_cast<shared_data*>(raw_ptr);
// initialize the local vector of prime numbers
std::vector<int> primes = {2, 3, 5, 7};
// while the generator is not done and we have
// some numbers in data.generated
while(!data.done_generating || !data.generated.empty())
{
// pop a new candidate from the queue
int candidate = data.generated.pop();
// if the biggest prime we have stored
// is smaller than the candidate
// update the local list of primes
if(primes.back() < candidate)
update_primes(primes, candidate);
// check if the candidate is a prime
// and push it to the appropriate queue
// based on the result
if(is_prime(primes, candidate))
data.primes.push(candidate);
else data.non_primes.push(candidate);
}
// indicate that the tester thread has finished
data.done_testing = true;
return nullptr;
}
// main function of the printer thread, which pops the prime
// and non-prime numbers from the tester, does some statistics
// and prints the results
void* printer_main(void* raw_ptr)
{
// check, cast and derefenrece the pointer
assert(raw_ptr);
shared_data& data = *static_cast<shared_data*>(raw_ptr);
// initialize the statistic values
int p, n;
int count_p = 0, count_n = 0;
int min_p = data.range, max_p = 0;
int min_n = data.range, max_n = 0;
bool has_p = false, has_n = false;
// while the tester is still tersing
// or we have some primes or non-primes
// in the queues
while(
(!data.done_testing) ||
(has_p=!data.primes.empty()) ||
(has_n=!data.non_primes.empty())
)
{
// if we have a prime
if(has_p)
{
// pop and process it
p = data.primes.pop();
if(min_p > p) min_p = p;
if(max_p < p) max_p = p;
++count_p;
}
// if we have a non-prime
if(has_n)
{
// pop and process it
n = data.non_primes.pop();
if(min_n > n) min_n = n;
if(max_n < n) max_n = n;
++count_n;
}
}
// print the stats
std::cout << "Generated " << data.count << " numbers ";
std::cout << "in range <1, " << data.range << ">" << std::endl;
std::cout << "Primes: " << count_p;
std::cout << " (" << ((1000 * count_p)/data.count)*0.1 << "%)";
std::cout << ", min = " << min_p << ", max = " << max_p << std::endl;
std::cout << "Non-primes: " << count_n;
std::cout << " (" << ((1000 * count_n)/data.count)*0.1 << "%)";
std::cout << ", min = " << min_n << ", max = " << max_n << std::endl;
return nullptr;
}
int main(int argc, const char* argv[])
{
std::srand(::getpid());
// initialize the shared data
shared_data data;
// get the count of generated numbers from the first
// argument or use the default
data.count = (argc>1)?std::atoi(argv[1]):0;
if(data.count <= 0) data.count = 1000;
// get the range of generated numbers from the second
// argument or use the default
data.range = (argc>2)?std::atoi(argv[2]):0;
if(data.range <= 0) data.range = 10000;
// start the threads
frios::posix::thread printer(printer_main, &data);
frios::posix::thread tester(tester_main, &data);
frios::posix::thread generator(generator_main, &data);
// wait for the threads
generator.wait_for();
tester.wait_for();
printer.wait_for();
return 0;
}