本章完成 java 四种 BlockingQueue 的 C++ 实现
# 原有 BlockingQueue
ArrayBlockingQueue
:基于数组的有界阻塞队列。队列按 FIFO 原则对元素进行排序,队列头部是在队列中存活时间最长的元素,队尾则是存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。 这是一个典型的 “有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。LinkedBlockingQueue
:基于链表的无界阻塞队列。与 ArrayBlockingQueue 一样采用 FIFO 原则对元素进行排序。基于链表的队列吞吐量通常要高于基于数组的队列。SynchronousQueue
:同步的阻塞队列。其中每个插入操作必须等待另一个线程的对应移除操作,等待过程一直处于阻塞状态,同理,每一个移除操作必须等到另一个线程的对应插入操作。SynchronousQueue 没有任何容量。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素。PriorityBlockingQueue
:基于优先级的无界阻塞队列。优先级队列的元素按照其自然顺序进行排序,或者根据构造队列时提供的 Comparator 进行排序,具体取决于所使用的构造方法。
# 分别实现
# ArrayBlockingQueue
通过预先定义固定容量的数组,通过索引确定队首和队尾位置。由于队列满,队首和队尾索引会重叠,所以需要一个标志位 full 确定是否队列已满。
template<typename E> | |
class ArrayBlockingQueue : public BlockingQueue<E> { | |
public: | |
explicit ArrayBlockingQueue(int capacity) | |
: queue(capacity), capacity(capacity) { | |
} | |
~ArrayBlockingQueue() override = default; | |
bool Offer(const E &e, long long timeout, TimeUnit unit) override { | |
std::unique_lock lock(mtx); | |
bool success = cvFull.wait_for(lock, std::chrono::nanoseconds(NANOSECONDS(timeout, unit)), [&] { | |
return !full; | |
}); | |
if (!success) { | |
return false; | |
} | |
queue[endIdx++] = e; | |
endIdx %= capacity; | |
if (endIdx == startIdx) { | |
full = true; | |
} | |
cvEmpty.notify_one(); | |
return true; | |
} | |
void Put(const E &e) override { | |
std::unique_lock lock(mtx); | |
cvFull.wait(lock, [&] { return !full; }); | |
queue[endIdx++] = e; | |
endIdx %= capacity; | |
if (endIdx == startIdx) { | |
full = true; | |
} | |
cvEmpty.notify_one(); | |
} | |
bool Poll(E &e, long long timeout, TimeUnit unit) override { | |
std::unique_lock lock(mtx); | |
bool success = cvEmpty.wait_for(lock, std::chrono::nanoseconds(NANOSECONDS(timeout, unit)), [&] { | |
return startIdx != endIdx || full; | |
}); | |
if (!success) { | |
return false; | |
} | |
e = queue[startIdx++]; | |
startIdx %= capacity; | |
full = false; | |
cvFull.notify_one(); | |
return true; | |
} | |
E Take() override { | |
std::unique_lock lock(mtx); | |
cvEmpty.wait(lock, [&] { return startIdx != endIdx || full; }); | |
auto e = queue[startIdx++]; | |
startIdx %= capacity; | |
full = false; | |
cvFull.notify_one(); | |
return e; | |
} | |
int RemainingCapacity() override { | |
std::lock_guard guard(mtx); | |
return full ? 0 : (endIdx + capacity - startIdx); | |
} | |
private: | |
std::vector<E> queue; | |
int startIdx = 0; | |
int endIdx = 0; | |
bool full = false; | |
std::mutex mtx; | |
std::condition_variable cvEmpty; | |
std::condition_variable cvFull; | |
int capacity; | |
}; |
# LinkedBlockingQueue
LinkedBlockingQueue 与 ArrayBlockingQueue 相比,主要少了容量限制,以及存储数据的结构体由数组改成了 list
template<typename E> | |
class LinkedBlockingQueue : public BlockingQueue<E> { | |
public: | |
LinkedBlockingQueue() = default; | |
~LinkedBlockingQueue() override = default; | |
bool Offer(const E &e, long long timeout, TimeUnit unit) override { | |
std::lock_guard guard(mtx); | |
queue.push_back(e); | |
cvEmpty.notify_one(); | |
return true; | |
} | |
void Put(const E &e) override { | |
std::lock_guard lock(mtx); | |
queue.push_back(e); | |
cvEmpty.notify_one(); | |
} | |
bool Poll(E &e, long long timeout, TimeUnit unit) override { | |
std::unique_lock lock(mtx); | |
bool success = cvEmpty.wait_for(lock, NANOSECONDS(timeout, unit), [&] { | |
return !queue.empty(); | |
}); | |
if (!success) { | |
return false; | |
} | |
e = queue.front(); | |
queue.pop_front(); | |
cvFull.notify_one(); | |
return true; | |
} | |
E Take() override { | |
std::unique_lock lock(mtx); | |
cvEmpty.wait(lock, [&] { return !queue.empty(); }); | |
auto e = queue.front(); | |
queue.pop_front(); | |
cvFull.notify_one(); | |
return e; | |
} | |
int RemainingCapacity() override { | |
return INT_MAX; | |
} | |
private: | |
std::list<E> queue; | |
std::mutex mtx; | |
std::condition_variable cvEmpty; | |
std::condition_variable cvFull; | |
}; |
# SynchronousQueue
SynchronousQueue
作为同步队列,出队操作只需要监听队列有数据事件即可,入队需要入队前监听无数据,入队后监听数据被处理完成
template<typename E> | |
class SynchronousQueue : public BlockingQueue<E> { | |
class Transferer { | |
public: | |
bool Request(E &e, bool checkOnce = false) { | |
hasValSem.acquire(); | |
e = tmp; | |
doneSem.release(); | |
return true; | |
} | |
bool Post(const E &e, bool checkOnce = false) { | |
if (checkOnce) { | |
if (!postSem.try_acquire()) { | |
return false; | |
} | |
} else { | |
postSem.acquire(); | |
} | |
tmp = e; // 暂存数据 | |
hasValSem.release(); | |
doneSem.acquire(); | |
postSem.release(); | |
return true; | |
} | |
private: | |
E tmp; | |
std::binary_semaphore postSem{1}; | |
std::binary_semaphore hasValSem{0}; | |
std::binary_semaphore doneSem{0}; | |
}; | |
public: | |
SynchronousQueue() = default; | |
~SynchronousQueue() override = default; | |
bool Offer(const E &e, long long timeout, TimeUnit unit) override { | |
return false; | |
} | |
void Put(const E &e) override { | |
transferer.Post(e, false); | |
} | |
bool Poll(E &e, long long timeout, TimeUnit unit) override { | |
return false; | |
} | |
E Take() override { | |
E e; | |
transferer.Request(e, false); | |
return e; | |
} | |
int RemainingCapacity() override { | |
return 0; | |
} | |
private: | |
Transferer transferer; | |
}; |
# 测试用例
void ut_BlockingQueue(Concurrency::BlockingQueue<int> &queue, const std::string &tag) { | |
auto start = std::chrono::high_resolution_clock::now(); | |
auto result = std::make_shared<Concurrency::LinkedBlockingQueue<int> >(); | |
std::atomic cnt = 0; | |
int range = 100000; | |
int threadNum = 10; | |
auto producer = [&](int n) { | |
for (int i = 0; i < n; i++) { | |
queue.Put(i); | |
} | |
}; | |
std::mutex m; | |
std::condition_variable cv; | |
auto consumer = [&]() { | |
while (true) { | |
result->Put(queue.Take()); | |
++cnt; | |
if (cnt.load() >= range * threadNum) { | |
cv.notify_one(); | |
break; | |
} | |
} | |
}; | |
std::vector<std::shared_ptr<std::thread> > ts; | |
for (int i = 0; i < threadNum; i++) { | |
ts.push_back(std::make_shared<std::thread>(producer, range)); | |
} | |
for (int i = 0; i < 5; i++) { | |
ts.push_back(std::make_shared<std::thread>(consumer)); | |
} | |
for (const auto &t: ts) { | |
t->detach(); | |
} | |
std::unique_lock<std::mutex> lock(m); | |
cv.wait(lock, [&] { | |
return cnt.load() == range * threadNum; | |
}); | |
std::cout << cnt << std::endl; | |
auto stop = std::chrono::high_resolution_clock::now(); | |
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(stop - start).count(); | |
std::cout << tag << " Elapsed time: " << duration << " ms\n"; | |
std::cout << "cnt: " << cnt << std::endl; | |
} | |
int main() { | |
auto q1 = Concurrency::ArrayBlockingQueue<int>(1000000); | |
ut_BlockingQueue(q1, "ArrayBlockingQueue"); | |
auto q2 = Concurrency::LinkedBlockingQueue<int>(); | |
ut_BlockingQueue(q2, "LinkedBlockingQueue"); | |
auto q3 = Concurrency::SynchronousQueue<int>(); | |
ut_BlockingQueue(q3, "SynchronousQueue"); | |
return 0; | |
} |
测试用例输出结果
1000000 | |
ArrayBlockingQueue Elapsed time: 825 ms | |
cnt: 1000000 | |
1000000 | |
LinkedBlockingQueue Elapsed time: 938 ms | |
cnt: 1000000 | |
1000000 | |
SynchronousQueue Elapsed time: 919 ms | |
cnt: 1000000 |