本章完成 java 四种 BlockingQueue 的 C++ 实现

# 原有 BlockingQueue

  • ArrayBlockingQueue :基于数组的有界阻塞队列。队列按 FIFO 原则对元素进行排序,队列头部是在队列中存活时间最长的元素,队尾则是存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。 这是一个典型的 “有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
  • LinkedBlockingQueue :基于链表的无界阻塞队列。与 ArrayBlockingQueue 一样采用 FIFO 原则对元素进行排序。基于链表的队列吞吐量通常要高于基于数组的队列。
  • SynchronousQueue :同步的阻塞队列。其中每个插入操作必须等待另一个线程的对应移除操作,等待过程一直处于阻塞状态,同理,每一个移除操作必须等到另一个线程的对应插入操作。SynchronousQueue 没有任何容量。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素。
  • PriorityBlockingQueue :基于优先级的无界阻塞队列。优先级队列的元素按照其自然顺序进行排序,或者根据构造队列时提供的 Comparator 进行排序,具体取决于所使用的构造方法。

# 分别实现

# ArrayBlockingQueue

通过预先定义固定容量的数组,通过索引确定队首和队尾位置。由于队列满,队首和队尾索引会重叠,所以需要一个标志位 full 确定是否队列已满。

template<typename E>
class ArrayBlockingQueue : public BlockingQueue<E> {
public:
    explicit ArrayBlockingQueue(int capacity)
        : queue(capacity), capacity(capacity) {
    }
    ~ArrayBlockingQueue() override = default;
    bool Offer(const E &e, long long timeout, TimeUnit unit) override {
        std::unique_lock lock(mtx);
        bool success = cvFull.wait_for(lock, std::chrono::nanoseconds(NANOSECONDS(timeout, unit)), [&] {
            return !full;
        });
        if (!success) {
            return false;
        }
        queue[endIdx++] = e;
        endIdx %= capacity;
        if (endIdx == startIdx) {
            full = true;
        }
        cvEmpty.notify_one();
        return true;
    }
    void Put(const E &e) override {
        std::unique_lock lock(mtx);
        cvFull.wait(lock, [&] { return !full; });
        queue[endIdx++] = e;
        endIdx %= capacity;
        if (endIdx == startIdx) {
            full = true;
        }
        cvEmpty.notify_one();
    }
    bool Poll(E &e, long long timeout, TimeUnit unit) override {
        std::unique_lock lock(mtx);
        bool success = cvEmpty.wait_for(lock, std::chrono::nanoseconds(NANOSECONDS(timeout, unit)), [&] {
            return startIdx != endIdx || full;
        });
        if (!success) {
            return false;
        }
        e = queue[startIdx++];
        startIdx %= capacity;
        full = false;
        cvFull.notify_one();
        return true;
    }
    E Take() override {
        std::unique_lock lock(mtx);
        cvEmpty.wait(lock, [&] { return startIdx != endIdx || full; });
        auto e = queue[startIdx++];
        startIdx %= capacity;
        full = false;
        cvFull.notify_one();
        return e;
    }
    int RemainingCapacity() override {
        std::lock_guard guard(mtx);
        return full ? 0 : (endIdx + capacity - startIdx);
    }
private:
    std::vector<E> queue;
    int startIdx = 0;
    int endIdx = 0;
    bool full = false;
    std::mutex mtx;
    std::condition_variable cvEmpty;
    std::condition_variable cvFull;
    int capacity;
};

# LinkedBlockingQueue

LinkedBlockingQueue 与 ArrayBlockingQueue 相比,主要少了容量限制,以及存储数据的结构体由数组改成了 list

template<typename E>
class LinkedBlockingQueue : public BlockingQueue<E> {
public:
    LinkedBlockingQueue() = default;
    ~LinkedBlockingQueue() override = default;
    bool Offer(const E &e, long long timeout, TimeUnit unit) override {
        std::lock_guard guard(mtx);
        queue.push_back(e);
        cvEmpty.notify_one();
        return true;
    }
    void Put(const E &e) override {
        std::lock_guard lock(mtx);
        queue.push_back(e);
        cvEmpty.notify_one();
    }
    bool Poll(E &e, long long timeout, TimeUnit unit) override {
        std::unique_lock lock(mtx);
        bool success = cvEmpty.wait_for(lock, NANOSECONDS(timeout, unit), [&] {
            return !queue.empty();
        });
        if (!success) {
            return false;
        }
        e = queue.front();
        queue.pop_front();
        cvFull.notify_one();
        return true;
    }
    E Take() override {
        std::unique_lock lock(mtx);
        cvEmpty.wait(lock, [&] { return !queue.empty(); });
        auto e = queue.front();
        queue.pop_front();
        cvFull.notify_one();
        return e;
    }
    int RemainingCapacity() override {
        return INT_MAX;
    }
private:
    std::list<E> queue;
    std::mutex mtx;
    std::condition_variable cvEmpty;
    std::condition_variable cvFull;
};

# SynchronousQueue

SynchronousQueue 作为同步队列,出队操作只需要监听队列有数据事件即可,入队需要入队前监听无数据,入队后监听数据被处理完成

template<typename E>
class SynchronousQueue : public BlockingQueue<E> {
    class Transferer {
    public:
        bool Request(E &e, bool checkOnce = false) {
            hasValSem.acquire();
            e = tmp;
            doneSem.release();
            return true;
        }
        bool Post(const E &e, bool checkOnce = false) {
            if (checkOnce) {
                if (!postSem.try_acquire()) {
                    return false;
                }
            } else {
                postSem.acquire();
            }
            tmp = e; // 暂存数据
            hasValSem.release();
            doneSem.acquire();
            postSem.release();
            return true;
        }
    private:
        E tmp;
        std::binary_semaphore postSem{1};
        std::binary_semaphore hasValSem{0};
        std::binary_semaphore doneSem{0};
    };
public:
    SynchronousQueue() = default;
    ~SynchronousQueue() override = default;
    bool Offer(const E &e, long long timeout, TimeUnit unit) override {
        return false;
    }
    void Put(const E &e) override {
        transferer.Post(e, false);
    }
    bool Poll(E &e, long long timeout, TimeUnit unit) override {
        return false;
    }
    E Take() override {
        E e;
        transferer.Request(e, false);
        return e;
    }
    int RemainingCapacity() override {
        return 0;
    }
private:
    Transferer transferer;
};

# 测试用例

void ut_BlockingQueue(Concurrency::BlockingQueue<int> &queue, const std::string &tag) {
    auto start = std::chrono::high_resolution_clock::now();
    auto result = std::make_shared<Concurrency::LinkedBlockingQueue<int> >();
    std::atomic cnt = 0;
    int range = 100000;
    int threadNum = 10;
    auto producer = [&](int n) {
        for (int i = 0; i < n; i++) {
            queue.Put(i);
        }
    };
    std::mutex m;
    std::condition_variable cv;
    auto consumer = [&]() {
        while (true) {
            result->Put(queue.Take());
            ++cnt;
            if (cnt.load() >= range * threadNum) {
                cv.notify_one();
                break;
            }
        }
    };
    std::vector<std::shared_ptr<std::thread> > ts;
    for (int i = 0; i < threadNum; i++) {
        ts.push_back(std::make_shared<std::thread>(producer, range));
    }
    for (int i = 0; i < 5; i++) {
        ts.push_back(std::make_shared<std::thread>(consumer));
    }
    for (const auto &t: ts) {
        t->detach();
    }
    std::unique_lock<std::mutex> lock(m);
    cv.wait(lock, [&] {
        return cnt.load() == range * threadNum;
    });
    std::cout << cnt << std::endl;
    auto stop = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(stop - start).count();
    std::cout << tag << " Elapsed time: " << duration << " ms\n";
    std::cout << "cnt: " << cnt << std::endl;
}
int main() {
    auto q1 = Concurrency::ArrayBlockingQueue<int>(1000000);
    ut_BlockingQueue(q1, "ArrayBlockingQueue");
    auto q2 = Concurrency::LinkedBlockingQueue<int>();
    ut_BlockingQueue(q2, "LinkedBlockingQueue");
    auto q3 = Concurrency::SynchronousQueue<int>();
    ut_BlockingQueue(q3, "SynchronousQueue");
    return 0;
}

测试用例输出结果

1000000
ArrayBlockingQueue Elapsed time: 825 ms
cnt: 1000000
1000000
LinkedBlockingQueue Elapsed time: 938 ms
cnt: 1000000
1000000
SynchronousQueue Elapsed time: 919 ms
cnt: 1000000