Reading List

The Selfish Gene
The Psychopath Test: A Journey Through the Madness Industry
Bad Science
The Feynman Lectures on Physics
The Theory of Everything: The Origin and Fate of the Universe


ifknot's favorite books »

Wednesday, 5 March 2014

Part (2/3): A general purpose thread-safe internally synchronized message queue.

When it's time to end the relationship.

So here is the code for the single lock general purpose message queue as the obviously correct base queue type, but with one important addition...

The ability to close() the message queue.

But why?

TL;DR github.com/ifknot/libfbp



In multiprocessor programming, as in life, it is important to know when a relationship is over and the producer-consumer relationships of the AO, FBP and CSP approaches is no different.

Because an empty message queue does not mean the relationship between producer and consumer is necessarily over then another method of communicating when the producer(s) have finished is required.

A common approach is to send a specialised termination message as in Sutter's done message[1], the equivalent of the 0 in null terminated strings and the EOF marker for file streams.

But this permits any one producer to stop the communication whilst others may still be active.

An alternative approach is to allow the message queue to be closed with any subsequent attempt to perform a dequeue or enqueue action resulting in an exception being thrown!

The consumer is then able to carry on working inside a try-catch block until such time as the last producer has completed and detaches itself resulting in the closure of the message queue - after which time any further attempts at consuming results in the message queue specific runtime exception being caught and the consumer gracefully shutting down:
try {
//process message queue
}
catch(queue_exception e) {
//time to stop consuming and shut down
}
view raw consumer.cpp hosted with ❤ by GitHub
The general purpose requirement of the message_queue means that it must offer both bounded and unbounded behaviour with bounded try member functions as well as unbounded & waiting on empty member functions that permit dual usage:
fbp::gpcg_message_queue<int> bounded_q{10};
int i = 0;
while (bounded_q.try_enqueue(++i));
while (bounded_q.try_dequeue(i))
std::cout << i << ",";
std::cout << std::endl;
fbp::gpcg_message_queue<int> unbounded_q;
while (i-- >= 0)
unbounded_q.enqueue(i);
do {
i = unbounded_q.dequeue();
std::cout << i << ",";
} while (i != 0);
std::cout << std::endl;
view raw test_gp_q.cpp hosted with ❤ by GitHub
Which results in the output:
1,2,3,4,5,6,7,8,9,10,
9,8,7,6,5,4,3,2,1,0,
view raw output hosted with ❤ by GitHub
So how do the bounded member functions work given the single queue mutex q_mutex?
bool try_enqueue(const T& message) {
if (is_open) {
std::lock_guard<std::mutex> lock(q_mutex);
if(q.size() < q_max) {
q.push(message);
return true;
}
else {
return false;
}
}
else {
throw std::runtime_error();
}
}
bool try_dequeue(T& message) {
if(is_open) {
std::lock_guard<std::mutex> lock(q_mutex);
if(!q.empty()) {
message = std::move(q.front());
q.pop();
return true;
}
else {
return false;
}
}
else {
throw std::runtime_error();
}
}
view raw bounded.cpp hosted with ❤ by GitHub

The unbounded enqueue grows the queue automatically and the unbounded dequeue uses the single condition variable q_cv to wait if the queue is empty.
N.B. The q_cv.wait() is guarded by a while(q.empty) loop to trap any spurious wake up calls than can happen:
virtual void enqueue(T&& item) override {
if (this->is_open()) {
std::lock_guard<std::mutex> lock(q_mutex);
q.push(std::move(item));
}
else {
throw queue_closed_exception(doh::FAIL + doh::RESOURCE_CLOSED);
}
q_cv.notify_one();
}
virtual T dequeue() override {
std::unique_lock<std::mutex> lock(q_mutex);
while (q.empty()) {
if (this->is_open()) {
q_cv.wait(lock);
}
else {
throw queue_closed_exception(doh::FAIL + doh::RESOURCE_CLOSED);
}
}
auto item = q.front();
q.pop();
lock.unlock();
q_cv.notify_one();
return item;
}
view raw unbounded.cpp hosted with ❤ by GitHub

How can one now successfully share this message queue between multiple producers and consumers using a RAII idiom that automatically closes the queue if there are, either, no more interested consumers or no more interested producers?

What a good topic for the next blog entry...

References:
[1] Prefer Using Active Objects Instead of Naked Threads

No comments:

Post a Comment