Metaprogramming putting meat on the bones.
TL;DR Using an enumerated type as a behaviour selector at compile time by using template overloading[1] to select either total or partial queue access[2] behaviour whilst maintaining the same queue ADT interface
The libfbp design goal is for a uniform queue interface as per the prescribed Abstract Data Type definition here using only enqueue and dequeue access class functions instead of having separate class function method calls for either partial or total access behaviour i.e. to get rid of try_enqueue and try_dequeue.
A queue can either be bounded or unbounded.
- Bounded: holding only a limited number of items set as its capacity.
- Unbounded: being able to hold a, logically at least, unlimited amount of items - but physical storage limitations would ultimately apply.
Further, the queue access can either be partial or total. (Or for that matter synchronous but that's a different story.)
- Partial: Access calls will wait for certain conditions to hold true before returning e.g. a dequeue request will, if the queue is empty, wait for the condition that it is not empty to be true before removing an item and returning it.
- Total: by contrast access calls will not wait until they can perform the request but, rather, will return immediately with either a failure code or will throw an exception e.g. a dequeue request will, if the queue is empty, throw a queue empty exception.
Traditionally, selecting this behaviour has been done by having separate classes for unbounded/bounded and different class function names for partial/total, typically dequeue/enqueue and try_dequeue/try_enqueue.
However, by using template metaprogramming C++ no such idioms are required and behaviour can be selected at compile time.
Take for instance the enumerated data type used to select the kind of queue for the job:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//using enum in que namespace | |
enum specialize {partial, total, synchronous}; | |
//senario 1 | |
//compile time select total queue access behaviour and use the only available constructor to set the capacity | |
que::gpcg_concurrent_queue<int, que::total> bounded_q{10}; | |
int i = 0; | |
try { //wrap total access behaviour in try-catch block | |
while (true) bounded_q.enqueue(++i); //enqueue stuff | |
} | |
catch (que::queue_full_exception& e) { //catch by reference still optimal in C++11 move semantics? | |
std::cerr << e.what() << std::endl; | |
} | |
try { //wrap total access behaviour in try-catch block | |
while (true) { | |
i = bounded_q.dequeue(); //dequeue stuff | |
std::cout << i << ","; | |
} | |
} | |
catch (que::queue_empty_exception& e) { //catch the inevitable | |
std::cerr << e.what() << std::endl; | |
} | |
//sceanrio 2 | |
//compile time select partial access behaviour which only comes with an unbounded constructor | |
que::gpcg_concurrent_queue<int, que::partial> unbounded_q; | |
while (i-- >= 0) | |
unbounded_q.enqueue(i); //don't need to catch anything because will wait if full | |
do { | |
i = unbounded_q.dequeue(); //don't need to catch anything because will wait if empty | |
std::cout << i << ","; | |
} while (i != 0); |
This is implemented by the queue as follows which, although not mutually exclusive, only permits unbounded construction for partial access and bounded construction for total access. The scenario of total access for an unbounded queue is a bit of an oddity in that a total dequeue request can throw a queue empty exception but an unbounded queue total enqueue request is always fulfilled!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <queue> | |
#include <mutex> | |
#include <condition_variable> | |
#include <atomic> | |
#include "queue_policies.h" //enum specialize {partial, total, synchronous}; | |
namespace que { | |
//The template construction file must contain all the overloaded options which can lead to verbose and repeated sections of code but such is the nature of template metaprogramming | |
template<typename T, specialize S> //this is the partial access unbounded template | |
class gpcg_concurrent_queue { | |
public: | |
//whilst not concerned with capacity access to current size is relevant for unbounded queue | |
using size_type = typename std::queue<T>::size_type; | |
typedef T value_type; | |
gpcg_concurrent_queue(): open(true) {} //unbounded constructor no capacity required | |
//C++11 copy, move and assignment constructors | |
gpcg_concurrent_queue(const gpcg_concurrent_queue& other): open(other.open), q(other.q) {} | |
gpcg_concurrent_queue(gpcg_concurrent_queue&& other): open(other.open), q(other.q) {} | |
gpcg_concurrent_queue& operator=(gpcg_concurrent_queue&& other) { | |
if (this != &other) { | |
open = other.open; | |
q =other.q; | |
} | |
return *this; | |
} | |
//partial enqueue will complete until physical storage limits | |
void enqueue(const T& item) { | |
if (is_open()) { | |
std::lock_guard<std::mutex> lock(q_mutex); | |
q.push(item); | |
} | |
else { | |
throw queue_closed_exception(doh::FAIL + doh::RESOURCE_CLOSED); | |
} | |
q_cv.notify_one(); | |
} | |
//partial move enqueue will complete until physical storage limits | |
void enqueue(T&& item) { | |
if (is_open()) { | |
std::lock_guard<std::mutex> lock(q_mutex); | |
q.push(std::move(item)); | |
} | |
else { | |
throw queue_closed_exception(doh::FAIL + doh::RESOURCE_CLOSED); | |
} | |
q_cv.notify_one(); | |
} | |
//partial dequeue will wait if empty | |
T dequeue() { | |
std::unique_lock<std::mutex> lock(q_mutex); | |
while (q.empty()) { | |
if (is_open()) { | |
q_cv.wait(lock); | |
} | |
else { | |
throw queue_closed_exception(doh::FAIL + doh::RESOURCE_CLOSED); | |
} | |
} | |
auto item = std::move(q.front()); | |
q.pop(); | |
lock.unlock(); | |
q_cv.notify_one(); | |
return item; | |
} | |
bool is_open() { | |
return open; | |
} | |
size_type size() { | |
return q.size(); | |
} | |
bool empty() { | |
return q.empty(); | |
} | |
void close() { | |
open = false; | |
q_cv.notify_all(); | |
} | |
~gpcg_concurrent_queue() { | |
close(); | |
} | |
private: | |
std::atomic<bool> open; | |
std::mutex q_mutex; | |
std::condition_variable q_cv; | |
std::queue<T> q; | |
}; | |
template<typename T> //this is the total access bounded queue template | |
class gpcg_concurrent_queue<T, total> { | |
public: | |
//size and capacity are relevant for bounded queue | |
using size_type = typename std::queue<T>::size_type; | |
typedef T value_type; | |
//specify a capacity for this bounded queue | |
gpcg_concurrent_queue(size_type capacity): open(true), capacity_(capacity) {} | |
//C++11 copy, move and assignment constructors | |
gpcg_concurrent_queue(const gpcg_concurrent_queue& other): open(true), capacity_(other.capacity_), q(other.q) {} | |
gpcg_concurrent_queue(const gpcg_concurrent_queue&& other): open(true), capacity_(other.capacity_), q(other.q) {} | |
gpcg_concurrent_queue& operator= (const gpcg_concurrent_queue& rhs) { | |
if (this != &rhs) { | |
open.store(rhs.open.load()); | |
capacity_ = rhs.capacity_; | |
q = rhs.q; | |
} | |
return *this; | |
} | |
//total enqueue will throw queue full exception | |
void enqueue(const T& item) { | |
if(is_open()) { | |
std::lock_guard<std::mutex> lock(q_mutex); | |
if(q.size() < capacity()) { | |
q.push(item); | |
} | |
else { | |
throw queue_full_exception(doh::FAIL + doh::RESOURCE_FULL); | |
} | |
} | |
else { | |
throw queue_closed_exception(doh::FAIL + doh::RESOURCE_CLOSED); | |
} | |
} | |
//total move enqueue will also throw queue full exception | |
void enqueue(T&& item) { | |
if(is_open()) { | |
std::lock_guard<std::mutex> lock(q_mutex); | |
if(q.size() < capacity()) { | |
q.push(std::move(item)); | |
} | |
else { | |
throw queue_full_exception(doh::FAIL + doh::RESOURCE_FULL); | |
} | |
} | |
else { | |
throw queue_closed_exception(doh::FAIL + doh::RESOURCE_CLOSED); | |
} | |
} | |
//total dequeue will throw queue empty exception | |
T dequeue() { | |
std::lock_guard<std::mutex> lock(q_mutex); | |
if (!q.empty()) { | |
if (is_open()) { | |
auto item = std::move(q.front()); | |
q.pop(); | |
return item; | |
} | |
else { | |
throw queue_closed_exception(doh::FAIL + doh::RESOURCE_CLOSED); | |
} | |
} | |
else { | |
throw queue_empty_exception(doh::FAIL + doh::RESOURCE_EMPTY); | |
} | |
} | |
bool is_open() { | |
return open; | |
} | |
size_type size() { | |
return q.size(); | |
} | |
bool empty() { | |
return q.empty(); | |
} | |
//capacity is relevant | |
size_type capacity() { | |
return capacity_; | |
} | |
void close() { | |
open = false; | |
} | |
~gpcg_concurrent_queue() { | |
close(); | |
} | |
private: | |
std::atomic<bool> open; | |
size_type capacity_; | |
std::mutex q_mutex; | |
std::queue<T> q; | |
}; | |
} |
I normally trail the next blog entry here in some "humorously" prescient manner that is supposed to betray some sort of ongoing plan for delivery. But none exists and I have no idea what the next blog entry will be until I have written some code in my Tom Lehrer-esque copious free time.
References:
[2] Maurice Herlihy and Nir Shavit. 2008. The Art of Multiprocessor Programming. Morgan Kaufmann Publishers Inc., San Francisco, CA, USA.
No comments:
Post a Comment