Reading List

The Selfish Gene
The Psychopath Test: A Journey Through the Madness Industry
Bad Science
The Feynman Lectures on Physics
The Theory of Everything: The Origin and Fate of the Universe


ifknot's favorite books »

Monday, 17 March 2014

Part (3/3): A general purpose thread-safe internally synchronized message queue.

Proxies: Who's wearing the trousers? 

TL;DR I want to be able to do this:

producer_active_object(enqueuer_proxy(concurrent_queue));
consumer_active_object(dequeuer_proxy(concurrent_queue));

Today' reference will mostly be:
The proxy pattern[1] provides restricted or modified access to hidden or protected resources. There are a few subsets of proxies:
  • remote proxy
  • virtual proxy
  • protection proxy
  • smart reference
The kind of proxy that the message_queue type needs is combination of the last 2 that offers:
  • Access to the head and tail of the message queue via its various private enqueue and dequeue member functions.
  • Counting the number of references to the message queue and closing it down when either there are no more interested consumer(s) or the producer(s) have finished producing. 
Okay so the design choices are to either:
  1. Have a zoo of some_message_queue types each with member functions for generating their own proxies that use the extremely useful friendship C++ language feature[2]
  2. Select a specific some_message_queue policy from the zoo and wrap it in a generic message_queue type which dishes out the head and tail proxies.
Well at least there is no goat in this problem so in true Monty Hall style 
"Let's see what's behind door number 2..." 

Okay so here is the design description:
  • A concurrent_queue wrapper shared_queue locked up tighter than a chastity belt.
  • 2 concurrent_queue befriended enqueuer and dequeuer proxies that template specialise on the queue type but both have the key of friendship to shared_queue's chastity.
  • 3(+) queue interfaces (for wait, try & timed wait queue access) carrying queue closure semantics.
And the class diagram that results:
libfbp queue package class diagram (created with GenMyModel)
Here is how to use these classes:
//select some_concurrent_queue and wrap it in the sharer
using queue_type = que::shared_queue<que::gpcg_concurrent_queue<int>>;
queue_type unbounded_q; //construct as an unbounded queue
{
que::enqueuer<queue_type> nq(unbounded_q); //get access via an enqueuer proxy
que::dequeuer<queue_type> dq(unbounded_q); //and a dequeuer proxy
assert(nq.is_open() && dq.is_open()); //ensure both proxies see the queue as open
int i = 0;
while (i++ < 10)
nq.enqueue(i); //enqueue 10 sequential ints
while (i-- > 6)
std::cout << dq.dequeue() << ","; //dequeue & display half of them
std::cout << std::endl;
} //the queue will be closed as nq & dq go out of scope
que::dequeuer<queue_type> dq2(unbounded_q);
assert(!dq2.is_open()); //this dequeuer proxy should find the queue closed
try {
while (true)
std::cout << dq2.dequeue() << ","; //but, rightly, be able to dequeue until...
std::cout << std::endl;
}
catch(std::runtime_error& e) {
std::cout << e.what() << std::endl; //...it is empty
}
view raw usage.cpp hosted with ❤ by GitHub

Producing this output:

1,2,3,4,5,

6,7,8,9,10, (°□°)╯︵ ┻━┻ Access to resource is closed!

The queue sharing shared_queue class that manages dequeuer(s) and enqueuer(s) and closes the underlying queue as appropriate:
namespace que{
template<typename T>
class shared_queue {
friend enqueuer<shared_queue<T>>;
friend dequeuer<shared_queue<T>>;
using value_type = typename T::value_type;
public:
shared_queue(): nqs(0), dqs(0) {}
shared_queue(unsigned int max_size): nqs(0), dqs(0), q(T(max_size)) {}
void close() {
q.close();
}
private:
void register_dequeuer() {
++dqs;
}
void deregister_dequeuer() {
if(!--dqs)
close();
}
void register_enqueuer() {
++nqs;
}
void deregister_enqueuer() {
if (!--nqs)
close();
}
std::atomic<int> nqs;
std::atomic<int> dqs;
T q;
};
}

The dequeuer class:
namespace que {
template<typename T>
class dequeuer final {
using value_type = typename T::value_type;
public:
dequeuer(T& parent): parent(parent) {
parent.register_dequeuer();
}
value_type dequeue() {
return parent.q.dequeue();
}
bool try_dequeue(value_type& item) {
return parent.q.dequeue(item);
}
inline bool is_open() {
return parent.q.is_open();
}
~dequeuer() {
parent.deregister_dequeuer();
}
private:
T& parent;
};
}
view raw dequeuer.cpp hosted with ❤ by GitHub

Full code for all the interacting queue classes at:  github.com/ifknot/libfbp

References:
[1] Wikipedia Proxy Pattern
[2] When should you use 'friend' in C++?

No comments:

Post a Comment