本章搭建 java ThreadPoolExecutor 线程池的 C++ 简易实现框架
# 简介
近期想加固多线程场景编程能力,想使用 C++ 实现一套类似 Java ThreadPoolExecutor
的线程池。本章介绍各组件的基础功能框架。
# ThreadPoolExecutor 实现
# Java 线程池的参数
public ThreadPoolExecutor(int corePoolSize, | |
int maximumPoolSize, | |
long keepAliveTime, | |
TimeUnit unit, | |
BlockingQueue<Runnable> workQueue, | |
ThreadFactory threadFactory, | |
RejectedExecutionHandler handler) { | |
if (corePoolSize < 0 || | |
maximumPoolSize <= 0 || | |
maximumPoolSize < corePoolSize || | |
keepAliveTime < 0) | |
throw new IllegalArgumentException(); | |
if (workQueue == null || threadFactory == null || handler == null) | |
throw new NullPointerException(); | |
this.corePoolSize = corePoolSize; | |
this.maximumPoolSize = maximumPoolSize; | |
this.workQueue = workQueue; | |
this.keepAliveTime = unit.toNanos(keepAliveTime); | |
this.threadFactory = threadFactory; | |
this.handler = handler; | |
} |
- corePoolSize(核心工作线程数):当向线程池提交一个任务时,若线程池已创建的线程数小于 corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于 corePoolSize 时。
- maximumPoolSize(最大线程数):线程池所允许的最大线程个数。当队列满了,且已创建的线程数小于 maximumPoolSize,则线程池会创建新的线程来执行任务。另外,对于无界队列,可忽略该参数。
- keepAliveTime(多余线程存活时间):当线程池中线程数大于核心线程数时,线程的空闲时间如果超过线程存活时间,那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数。
- workQueue(队列):用于传输和保存等待执行任务的阻塞队列。
- threadFactory(线程创建工厂):用于创建新线程。threadFactory 创建的线程也是采用 new Thread () 方式,threadFactory 创建的线程名都具有统一的风格:pool-m-thread-n(m 为线程池的编号,n 为线程池内的线程编号)。
- handler(拒绝策略):当线程池和队列都满了,再加入线程会执行此策略
# 任务添加流程图
# 需要实现的框架
# TimeUnit
#ifndef TIME_UNIT_H | |
#define TIME_UNIT_H | |
namespace Concurrency { | |
constexpr long long NANO_SCALE = 1L; | |
constexpr long long MICRO_SCALE = 1000L * NANO_SCALE; | |
constexpr long long MILLI_SCALE = 1000L * MICRO_SCALE; | |
constexpr long long SECOND_SCALE = 1000L * MILLI_SCALE; | |
constexpr long long MINUTE_SCALE = 60L * SECOND_SCALE; | |
constexpr long long HOUR_SCALE = 60L * MINUTE_SCALE; | |
constexpr long long DAY_SCALE = 24L * HOUR_SCALE; | |
enum class TimeUnit: long long { | |
NANOSECONDS = NANO_SCALE, | |
MICROSECONDS = MICRO_SCALE, | |
MILLISECONDS = MILLI_SCALE, | |
SECONDS = SECOND_SCALE, | |
MINUTES = MINUTE_SCALE, | |
HOURS = HOUR_SCALE, | |
DAYS = DAY_SCALE | |
}; | |
} | |
#endif // TIME_UNIT_H |
# BlockingQueue
BlockingQueue
只提取了部分队列需要的方法,分别是阻塞 push
、 pop
和单次尝试 push
、 pop
。
目前只定义了接口,后续会有章节仔细说明多种队列的实现。
#ifndef BLOCKING_QUEUE_H | |
#define BLOCKING_QUEUE_H | |
namespace Concurrency { | |
template<typename E> | |
class BlockingQueue { | |
public: | |
virtual ~BlockingQueue() = default; | |
virtual bool Offer(const E &e, long long timeout, TimeUnit unit) = 0; | |
virtual void Put(const E &e) = 0; | |
virtual bool Poll(E &e, long long timeout, TimeUnit unit) = 0; | |
virtual E Take() = 0; | |
virtual int RemainingCapacity() = 0; | |
}; | |
} | |
#endif // BLOCKING_QUEUE_H |
# ThreadFactory
#ifndef THREAD_FACTORY_H | |
#define THREAD_FACTORY_H | |
#include "Thread.h" | |
namespace Concurrency { | |
class ThreadFactory { | |
public: | |
virtual ~ThreadFactory() = default; | |
virtual std::shared_ptr<Thread> NewThread(ExecutableTask task) = 0; | |
}; | |
} | |
#endif // THREAD_FACTORY_H |
# RejectedExecutionHandler
#ifndef REJECTED_EXECUTION_HANDLER_H | |
#define REJECTED_EXECUTION_HANDLER_H | |
#include <memory> | |
#include "ExecutableTask.h" | |
namespace Concurrency { | |
class ThreadPoolExecutor; | |
class RejectedExecutionHandler { | |
public: | |
virtual ~RejectedExecutionHandler() = default; | |
virtual void RejectedExecution(ExecutableTask task, std::shared_ptr<ThreadPoolExecutor> executor) = 0; | |
}; | |
} | |
#endif // REJECTED_EXECUTION_HANDLER_H |
# ThreadPoolExecutor
#ifndef THREAD_POOL_EXECUTOR_H | |
#define THREAD_POOL_EXECUTOR_H | |
#include <functional> | |
#include <memory> | |
#include "BlockingQueue.h" | |
#include "ExecutableTask.h" | |
#include "RejectedExecutionHandler.h" | |
#include "ThreadFactory.h" | |
#include "TimeUnit.h" | |
namespace Concurrency { | |
constexpr int DEFAULT_CORE_POOL_SIZE = 0; | |
constexpr int DEFAULT_MAXIMUM_POOL_SIZE = 1; | |
constexpr long long DEFAULT_KEEP_ALIVE_TIME = 0; | |
class ThreadPoolExecutor { | |
public: | |
ThreadPoolExecutor(int corePoolSize, | |
int maximumPoolSize, | |
long long keepAliveTime = 0, | |
TimeUnit unit = TimeUnit::MILLISECONDS, | |
std::shared_ptr<BlockingQueue<ExecutableTask> > workQueue = nullptr, | |
std::shared_ptr<ThreadFactory> threadFactory = nullptr, | |
std::shared_ptr<RejectedExecutionHandler> rejectedHandler = nullptr) { | |
if (corePoolSize < 0) { | |
corePoolSize = DEFAULT_CORE_POOL_SIZE; | |
} | |
if (maximumPoolSize <= 0) { | |
maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE; | |
} | |
if (maximumPoolSize < corePoolSize) { | |
maximumPoolSize = corePoolSize; | |
} | |
if (keepAliveTime < 0) { | |
keepAliveTime = DEFAULT_KEEP_ALIVE_TIME; | |
} | |
if (workQueue == nullptr) { | |
workQueue = CreateDefaultWorkQueue(); | |
} | |
if (threadFactory == nullptr) { | |
threadFactory = CreateDefaultThreadFactory(); | |
} | |
if (rejectedHandler == nullptr) { | |
rejectedHandler = CreateDefaultRejectedHandler(); | |
} | |
this->corePoolSize = corePoolSize; | |
this->maximumPoolSize = maximumPoolSize; | |
this->keepAliveTime = keepAliveTime; | |
this->unit = unit; | |
this->workQueue = workQueue; | |
this->threadFactory = threadFactory; | |
this->rejectedHandler = rejectedHandler; | |
} | |
private: | |
static std::shared_ptr<BlockingQueue<ExecutableTask> > CreateDefaultWorkQueue() { | |
// TODO | |
return nullptr; | |
} | |
static std::shared_ptr<ThreadFactory> CreateDefaultThreadFactory() { | |
// TODO | |
return nullptr; | |
} | |
static std::shared_ptr<RejectedExecutionHandler> CreateDefaultRejectedHandler() { | |
// TODO | |
return nullptr; | |
} | |
private: | |
int corePoolSize; | |
int maximumPoolSize; | |
long long keepAliveTime; | |
TimeUnit unit; | |
std::shared_ptr<BlockingQueue<ExecutableTask> > workQueue; | |
std::shared_ptr<ThreadFactory> threadFactory; | |
std::shared_ptr<RejectedExecutionHandler> rejectedHandler; | |
}; | |
} | |
#endif // THREAD_POOL_EXECUTOR_H |
# 后续章节
在后续章节,我们会依次对 BlockingQueue
、 ThreadFactory
、 RejectedExecutionHandler
等进行实现。