30 #ifndef _QORE_THREADPOOL_H
31 #define _QORE_THREADPOOL_H
33 #define QTP_DEFAULT_RELEASE_MS 5000
39 class ThreadPoolThread;
41 typedef std::deque<ThreadTask*> taskq_t;
42 typedef qlist<ThreadPoolThread*> tplist_t;
49 DLLLOCAL ~ThreadTask() {
57 cancelCode->deref(xsink);
66 return code->execValue(0, xsink);
71 cancelCode->execValue(0, xsink).discard(xsink);
79 class ThreadTaskHolder {
85 DLLLOCAL ThreadTaskHolder(ThreadTask* t,
ExceptionSink* xs) : task(t), xsink(xs) {
88 DLLLOCAL ~ThreadTaskHolder() {
94 DLLLOCAL ThreadTask* release() {
95 ThreadTask* rv = task;
103 class ThreadPoolThread {
107 ThreadTask* task =
nullptr;
111 tplist_t::iterator pos;
112 bool stopflag =
false,
118 DLLLOCAL ThreadPoolThread(ThreadPool& n_tp,
ExceptionSink* xsink);
120 DLLLOCAL ~ThreadPoolThread() {
125 DLLLOCAL
void setPos(tplist_t::iterator p) {
129 DLLLOCAL
bool valid()
const {
135 DLLLOCAL
void stop() {
143 DLLLOCAL
void stopWait() {
168 DLLLOCAL
void submit(ThreadTask* t) {
176 DLLLOCAL
int getId()
const {
180 DLLLOCAL tplist_t::iterator getPos()
const {
211 bool waiting =
false;
213 bool shutdown =
false,
218 DLLLOCAL
int checkStopUnlocked(
const char* m,
ExceptionSink* xsink) {
220 xsink->
raiseException(
"THREADPOOL-ERROR",
"ThreadPool::%s() cannot be executed because the ThreadPool " \
221 "is being destroyed", m);
229 std::unique_ptr<ThreadPoolThread> tpth(
new ThreadPoolThread(*
this, xsink));
230 if (!tpth->valid()) {
235 ThreadPoolThread* tpt = tpth.release();
239 tpt->setPos(fh.end());
244 DLLLOCAL ThreadPoolThread* getThreadUnlocked(
ExceptionSink* xsink) {
246 while (!stopflag && fh.empty() && max && (
int)ah.size() == max) {
255 ThreadPoolThread* tpt;
261 std::unique_ptr<ThreadPoolThread> tpt_pt(
new ThreadPoolThread(*
this, xsink));
262 if (!tpt_pt->valid()) {
266 tpt = tpt_pt.release();
270 tplist_t::iterator i = ah.end();
277 DLLLOCAL ThreadPool(
ExceptionSink* xsink,
int n_max = 0,
int n_minidle = 0,
int m_maxidle = 0,
int n_release_ms = QTP_DEFAULT_RELEASE_MS);
279 DLLLOCAL ~ThreadPool() {
289 str.
sprintf(
"ThreadPool %p total: %lu max: %d minidle: %d maxidle: %d release_ms: %d running: [",
this,
290 ah.size() + fh.size(), max, minidle, maxidle, release_ms);
291 for (tplist_t::iterator i = ah.begin(), e = ah.end(); i != e; ++i) {
294 str.
sprintf(
"%d", (*i)->getId());
299 for (tplist_t::iterator i = fh.begin(), e = fh.end(); i != e; ++i) {
302 str.
sprintf(
"%d", (*i)->getId());
309 DLLLOCAL
void stop() {
325 if (detach && !stopped) {
326 xsink->
raiseException(
"THREADPOOL-ERROR",
"cannot call ThreadPool::stopWait() after ThreadPool::stop() " \
327 "has been called since worker threads have been detached and can no longer be traced");
345 ThreadTaskHolder task(
new ThreadTask(c, cc), xsink);
348 if (checkStopUnlocked(
"submit", xsink))
353 q.push_back(task.release());
358 DLLLOCAL
void threadCounts(
int& idle,
int& running) {
364 DLLLOCAL
int done(ThreadPoolThread* tpt) {
372 tplist_t::iterator i = tpt->getPos();
376 if ((!maxidle && release_ms) || ((
int)fh.size() < maxidle) || q.size() > fh.size()) {
378 if (waiting || (release_ms && (
int)fh.size() > minidle))