本章搭建 java ThreadPoolExecutor 线程池的 C++ 简易实现框架

# 简介

近期想加固多线程场景编程能力,想使用 C++ 实现一套类似 Java ThreadPoolExecutor 的线程池。本章介绍各组件的基础功能框架。

# ThreadPoolExecutor 实现

# Java 线程池的参数

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize(核心工作线程数):当向线程池提交一个任务时,若线程池已创建的线程数小于 corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于 corePoolSize 时。
  • maximumPoolSize(最大线程数):线程池所允许的最大线程个数。当队列满了,且已创建的线程数小于 maximumPoolSize,则线程池会创建新的线程来执行任务。另外,对于无界队列,可忽略该参数。
  • keepAliveTime(多余线程存活时间):当线程池中线程数大于核心线程数时,线程的空闲时间如果超过线程存活时间,那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数。
  • workQueue(队列):用于传输和保存等待执行任务的阻塞队列。
  • threadFactory(线程创建工厂):用于创建新线程。threadFactory 创建的线程也是采用 new Thread () 方式,threadFactory 创建的线程名都具有统一的风格:pool-m-thread-n(m 为线程池的编号,n 为线程池内的线程编号)。
  • handler(拒绝策略):当线程池和队列都满了,再加入线程会执行此策略

# 任务添加流程图

图片

# 需要实现的框架

# TimeUnit

#ifndef TIME_UNIT_H
#define TIME_UNIT_H
namespace Concurrency {
constexpr long long NANO_SCALE = 1L;
constexpr long long MICRO_SCALE = 1000L * NANO_SCALE;
constexpr long long MILLI_SCALE = 1000L * MICRO_SCALE;
constexpr long long SECOND_SCALE = 1000L * MILLI_SCALE;
constexpr long long MINUTE_SCALE = 60L * SECOND_SCALE;
constexpr long long HOUR_SCALE = 60L * MINUTE_SCALE;
constexpr long long DAY_SCALE = 24L * HOUR_SCALE;
enum class TimeUnit: long long {
    NANOSECONDS = NANO_SCALE,
    MICROSECONDS = MICRO_SCALE,
    MILLISECONDS = MILLI_SCALE,
    SECONDS = SECOND_SCALE,
    MINUTES = MINUTE_SCALE,
    HOURS = HOUR_SCALE,
    DAYS = DAY_SCALE
};
}
#endif // TIME_UNIT_H

# BlockingQueue

BlockingQueue 只提取了部分队列需要的方法,分别是阻塞 pushpop 和单次尝试 pushpop
目前只定义了接口,后续会有章节仔细说明多种队列的实现。

#ifndef BLOCKING_QUEUE_H
#define BLOCKING_QUEUE_H
namespace Concurrency {
template<typename E>
class BlockingQueue {
public:
    virtual ~BlockingQueue() = default;
    virtual bool Offer(const E &e, long long timeout, TimeUnit unit) = 0;
    virtual void Put(const E &e) = 0;
    virtual bool Poll(E &e, long long timeout, TimeUnit unit) = 0;
    virtual E Take() = 0;
    virtual int RemainingCapacity() = 0;
};
}
#endif // BLOCKING_QUEUE_H

# ThreadFactory

#ifndef THREAD_FACTORY_H
#define THREAD_FACTORY_H
#include "Thread.h"
namespace Concurrency {
class ThreadFactory {
public:
    virtual ~ThreadFactory() = default;
    virtual std::shared_ptr<Thread> NewThread(ExecutableTask task) = 0;
};
}
#endif // THREAD_FACTORY_H

# RejectedExecutionHandler

#ifndef REJECTED_EXECUTION_HANDLER_H
#define REJECTED_EXECUTION_HANDLER_H
#include <memory>
#include "ExecutableTask.h"
namespace Concurrency {
class ThreadPoolExecutor;
class RejectedExecutionHandler {
public:
    virtual ~RejectedExecutionHandler() = default;
    virtual void RejectedExecution(ExecutableTask task, std::shared_ptr<ThreadPoolExecutor> executor) = 0;
};
}
#endif // REJECTED_EXECUTION_HANDLER_H

# ThreadPoolExecutor

#ifndef THREAD_POOL_EXECUTOR_H
#define THREAD_POOL_EXECUTOR_H
#include <functional>
#include <memory>
#include "BlockingQueue.h"
#include "ExecutableTask.h"
#include "RejectedExecutionHandler.h"
#include "ThreadFactory.h"
#include "TimeUnit.h"
namespace Concurrency {
constexpr int DEFAULT_CORE_POOL_SIZE = 0;
constexpr int DEFAULT_MAXIMUM_POOL_SIZE = 1;
constexpr long long DEFAULT_KEEP_ALIVE_TIME = 0;
class ThreadPoolExecutor {
public:
    ThreadPoolExecutor(int corePoolSize,
                       int maximumPoolSize,
                       long long keepAliveTime = 0,
                       TimeUnit unit = TimeUnit::MILLISECONDS,
                       std::shared_ptr<BlockingQueue<ExecutableTask> > workQueue = nullptr,
                       std::shared_ptr<ThreadFactory> threadFactory = nullptr,
                       std::shared_ptr<RejectedExecutionHandler> rejectedHandler = nullptr) {
        if (corePoolSize < 0) {
            corePoolSize = DEFAULT_CORE_POOL_SIZE;
        }
        if (maximumPoolSize <= 0) {
            maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
        }
        if (maximumPoolSize < corePoolSize) {
            maximumPoolSize = corePoolSize;
        }
        if (keepAliveTime < 0) {
            keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
        }
        if (workQueue == nullptr) {
            workQueue = CreateDefaultWorkQueue();
        }
        if (threadFactory == nullptr) {
            threadFactory = CreateDefaultThreadFactory();
        }
        if (rejectedHandler == nullptr) {
            rejectedHandler = CreateDefaultRejectedHandler();
        }
        this->corePoolSize = corePoolSize;
        this->maximumPoolSize = maximumPoolSize;
        this->keepAliveTime = keepAliveTime;
        this->unit = unit;
        this->workQueue = workQueue;
        this->threadFactory = threadFactory;
        this->rejectedHandler = rejectedHandler;
    }
private:
    static std::shared_ptr<BlockingQueue<ExecutableTask> > CreateDefaultWorkQueue() {
        // TODO
        return nullptr;
    }
    static std::shared_ptr<ThreadFactory> CreateDefaultThreadFactory() {
        // TODO
        return nullptr;
    }
    static std::shared_ptr<RejectedExecutionHandler> CreateDefaultRejectedHandler() {
        // TODO
        return nullptr;
    }
private:
    int corePoolSize;
    int maximumPoolSize;
    long long keepAliveTime;
    TimeUnit unit;
    std::shared_ptr<BlockingQueue<ExecutableTask> > workQueue;
    std::shared_ptr<ThreadFactory> threadFactory;
    std::shared_ptr<RejectedExecutionHandler> rejectedHandler;
};
}
#endif // THREAD_POOL_EXECUTOR_H

# 后续章节

在后续章节,我们会依次对 BlockingQueueThreadFactoryRejectedExecutionHandler 等进行实现。