Qore Programming Language 1.19.1
Loading...
Searching...
No Matches
ThreadPool.h
1/* -*- mode: c++; indent-tabs-mode: nil -*- */
2/*
3 Qore Programming Language
4
5 Copyright (C) 2003 - 2023 Qore Technologies, s.r.o.
6
7 Permission is hereby granted, free of charge, to any person obtaining a
8 copy of this software and associated documentation files (the "Software"),
9 to deal in the Software without restriction, including without limitation
10 the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 and/or sell copies of the Software, and to permit persons to whom the
12 Software is furnished to do so, subject to the following conditions:
13
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
16
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 DEALINGS IN THE SOFTWARE.
24
25 Note that the Qore library is released under a choice of three open-source
26 licenses: MIT (as above), LGPL 2+, or GPL 2+; see README-LICENSE for more
27 information.
28*/
29
30#ifndef _QORE_THREADPOOL_H
31#define _QORE_THREADPOOL_H
32
33#define QTP_DEFAULT_RELEASE_MS 5000
34
35#include <deque>
36#include <qore/qlist>
37
38class ThreadTask;
39class ThreadPoolThread;
40
41typedef std::deque<ThreadTask*> taskq_t;
42typedef qlist<ThreadPoolThread*> tplist_t;
43
44class ThreadTask {
45public:
46 DLLLOCAL ThreadTask(ResolvedCallReferenceNode* c, ResolvedCallReferenceNode* cc) : code(c), cancelCode(cc) {
47 }
48
49 DLLLOCAL ~ThreadTask() {
50 assert(!code);
51 assert(!cancelCode);
52 }
53
54 DLLLOCAL void del(ExceptionSink* xsink) {
55 code->deref(xsink);
56 if (cancelCode)
57 cancelCode->deref(xsink);
58#ifdef DEBUG
59 code = nullptr;
60 cancelCode = nullptr;
61#endif
62 delete this;
63 }
64
65 DLLLOCAL QoreValue run(ExceptionSink* xsink) {
66 return code->execValue(0, xsink);
67 }
68
69 DLLLOCAL void cancel(ExceptionSink* xsink) {
70 if (cancelCode)
71 cancelCode->execValue(0, xsink).discard(xsink);
72 }
73
74protected:
76 ResolvedCallReferenceNode* cancelCode;
77};
78
79class ThreadTaskHolder {
80protected:
81 ThreadTask* task;
82 ExceptionSink* xsink;
83
84public:
85 DLLLOCAL ThreadTaskHolder(ThreadTask* t, ExceptionSink* xs) : task(t), xsink(xs) {
86 }
87
88 DLLLOCAL ~ThreadTaskHolder() {
89 if (task) {
90 task->del(xsink);
91 }
92 }
93
94 DLLLOCAL ThreadTask* release() {
95 ThreadTask* rv = task;
96 task = 0;
97 return rv;
98 }
99};
100
101class ThreadPool;
102
103class ThreadPoolThread {
104protected:
105 int id;
106 ThreadPool& tp;
107 ThreadTask* task = nullptr;
109 *stopCond = nullptr;
111 tplist_t::iterator pos;
112 bool stopflag = false,
113 stopped = false;
114
115 DLLLOCAL void finalize(ExceptionSink* xsink);
116
117public:
118 DLLLOCAL ThreadPoolThread(ThreadPool& n_tp, ExceptionSink* xsink);
119
120 DLLLOCAL ~ThreadPoolThread() {
121 delete stopCond;
122 assert(!task);
123 }
124
125 DLLLOCAL void setPos(tplist_t::iterator p) {
126 pos = p;
127 }
128
129 DLLLOCAL bool valid() const {
130 return id != -1;
131 }
132
133 DLLLOCAL void worker(ExceptionSink* xsink);
134
135 DLLLOCAL void stop() {
136 AutoLocker al(m);
137 assert(!stopflag);
138 stopflag = true;
139 c.signal();
140 //printd(5, "ThreadPoolThread::stop() signaling stop for id %d\n", id);
141 }
142
143 DLLLOCAL void stopWait() {
144 assert(!stopCond);
145 stopCond = new QoreCondition;
146
147 //printd(5, "ThreadPoolThread::stopWait() stopping id %d\n", id);
148 AutoLocker al(m);
149
150 assert(!stopflag);
151 stopflag = true;
152 c.signal();
153 }
154
155 DLLLOCAL void stopConfirm(ExceptionSink* xsink) {
156 {
157 AutoLocker al(m);
158 assert(stopflag);
159 assert(stopCond);
160 while (!stopped)
161 stopCond->wait(m);
162 }
163
164 //printd(5, "ThreadPoolThread::stopConfirm() stopped id %d\n", id);
165 finalize(xsink);
166 }
167
168 DLLLOCAL void submit(ThreadTask* t) {
169 AutoLocker al(m);
170 assert(!stopflag);
171 assert(!task);
172 task = t;
173 c.signal();
174 }
175
176 DLLLOCAL int getId() const {
177 return id;
178 }
179
180 DLLLOCAL tplist_t::iterator getPos() const {
181 return pos;
182 }
183};
184
185class ThreadPool : public AbstractPrivateData {
186protected:
187 int max, // maximum number of threads in pool (if <= 0 then unlimited)
188 minidle, // minimum number of idle threads
189 maxidle, // maximum number of idle threads
190 release_ms; // number of milliseconds before idle threads are released when > minidle
191
192 // mutex for atomicity
194
195 // worker thread condition variable
196 QoreCondition cond;
197
198 // stop condition variable
199 QoreCondition stopCond;
200
201 tplist_t ah, // allocated thread list
202 fh; // free thread list
203
204 // quit flag
205 bool quit = false;
206
207 // master task queue
208 taskq_t q;
209
210 // task waiting flag
211 bool waiting = false;
212
213 bool shutdown = false, // shutdown flag
214 stopflag = false, // stop flag
215 stopped = false, // stopped flag
216 detach = false; // detach flag
217
218 DLLLOCAL int checkStopUnlocked(const char* m, ExceptionSink* xsink) {
219 if (shutdown) {
220 xsink->raiseException("THREADPOOL-ERROR", "ThreadPool::%s() cannot be executed because the ThreadPool " \
221 "is being destroyed", m);
222 return -1;
223 }
224 return 0;
225 }
226
227 DLLLOCAL int addIdleWorker(ExceptionSink* xsink) {
228 assert(xsink);
229 std::unique_ptr<ThreadPoolThread> tpth(new ThreadPoolThread(*this, xsink));
230 if (!tpth->valid()) {
231 assert(*xsink);
232 return -1;
233 }
234
235 ThreadPoolThread* tpt = tpth.release();
236 fh.push_back(tpt);
237#ifdef DEBUG
238 // set to an invalid iterator
239 tpt->setPos(fh.end());
240#endif
241 return 0;
242 }
243
244 DLLLOCAL ThreadPoolThread* getThreadUnlocked(ExceptionSink* xsink) {
245 assert(xsink);
246 while (!stopflag && fh.empty() && max && (int)ah.size() == max) {
247 waiting = true;
248 cond.wait(m);
249 waiting = false;
250 }
251
252 if (stopflag)
253 return nullptr;
254
255 ThreadPoolThread* tpt;
256
257 if (!fh.empty()) {
258 tpt = fh.front();
259 fh.pop_front();
260 } else {
261 std::unique_ptr<ThreadPoolThread> tpt_pt(new ThreadPoolThread(*this, xsink));
262 if (!tpt_pt->valid()) {
263 assert(*xsink);
264 return nullptr;
265 }
266 tpt = tpt_pt.release();
267 }
268
269 ah.push_back(tpt);
270 tplist_t::iterator i = ah.end();
271 --i;
272 tpt->setPos(i);
273 return tpt;
274 }
275
276public:
277 DLLLOCAL ThreadPool(ExceptionSink* xsink, int n_max = 0, int n_minidle = 0, int m_maxidle = 0, int n_release_ms = QTP_DEFAULT_RELEASE_MS);
278
279 DLLLOCAL ~ThreadPool() {
280 assert(q.empty());
281 assert(ah.empty());
282 assert(fh.empty());
283 assert(stopped);
284 }
285
286 DLLLOCAL void toString(QoreString& str) {
287 AutoLocker al(m);
288
289 str.sprintf("ThreadPool %p total: %lu max: %d minidle: %d maxidle: %d release_ms: %d running: [", this,
290 ah.size() + fh.size(), max, minidle, maxidle, release_ms);
291 for (tplist_t::iterator i = ah.begin(), e = ah.end(); i != e; ++i) {
292 if (i != ah.begin())
293 str.concat(", ");
294 str.sprintf("%d", (*i)->getId());
295 }
296
297 str.concat("] idle: [");
298
299 for (tplist_t::iterator i = fh.begin(), e = fh.end(); i != e; ++i) {
300 if (i != fh.begin())
301 str.concat(", ");
302 str.sprintf("%d", (*i)->getId());
303 }
304
305 str.concat(']');
306 }
307
308 // does not return until the thread pool has been stopped
309 DLLLOCAL void stop() {
310 AutoLocker al(m);
311 if (!stopflag) {
312 detach = true;
313 shutdown = true;
314 stopflag = true;
315 cond.signal();
316 }
317
318 while (!stopped) {
319 stopCond.wait(m);
320 }
321 }
322
323 DLLLOCAL int stopWait(ExceptionSink* xsink) {
324 AutoLocker al(m);
325 if (detach && !stopped) {
326 xsink->raiseException("THREADPOOL-ERROR", "cannot call ThreadPool::stopWait() after ThreadPool::stop() " \
327 "has been called since worker threads have been detached and can no longer be traced");
328 return -1;
329 }
330
331 if (!shutdown) {
332 shutdown = true;
333 cond.signal();
334 }
335
336 while (!stopped) {
337 stopCond.wait(m);
338 }
339
340 return 0;
341 }
342
343 DLLLOCAL int submit(ResolvedCallReferenceNode* c, ResolvedCallReferenceNode* cc, ExceptionSink* xsink) {
344 // optimistically create the task object outside the lock
345 ThreadTaskHolder task(new ThreadTask(c, cc), xsink);
346
347 AutoLocker al(m);
348 if (checkStopUnlocked("submit", xsink))
349 return -1;
350
351 if (q.empty())
352 cond.signal();
353 q.push_back(task.release());
354
355 return 0;
356 }
357
358 DLLLOCAL void threadCounts(int& idle, int& running) {
359 AutoLocker al(m);
360 idle = fh.size();
361 running = ah.size();
362 }
363
364 DLLLOCAL int done(ThreadPoolThread* tpt) {
365 {
366 AutoLocker al(m);
367 // allow the thread to be removed from the active list by ThreadPool::worker() to avoid race conditions
368 if (stopflag) {
369 return 0;
370 }
371
372 tplist_t::iterator i = tpt->getPos();
373 ah.erase(i);
374
375 // requeue thread if possible
376 if ((!maxidle && release_ms) || ((int)fh.size() < maxidle) || q.size() > fh.size()) {
377 fh.push_back(tpt);
378 if (waiting || (release_ms && (int)fh.size() > minidle))
379 cond.signal();
380 return 0;
381 }
382 }
383
384 return -1;
385 }
386
387 DLLLOCAL void worker(ExceptionSink* xsink);
388};
389
390#endif
the base class for all data to be used as private data of Qore objects
Definition: AbstractPrivateData.h:44
provides a safe and exception-safe way to hold locks in Qore, only to be used on the stack,...
Definition: QoreThreadLock.h:136
container for holding Qore-language exception information and also for registering a "thread_exit" ca...
Definition: ExceptionSink.h:50
DLLEXPORT AbstractQoreNode * raiseException(const char *err, const char *fmt,...)
appends a Qore-language exception to the list
a thread condition class implementing a wrapper for pthread_cond_t
Definition: QoreCondition.h:45
DLLEXPORT int wait(pthread_mutex_t *m)
blocks a thread on a mutex until the condition is signaled
DLLEXPORT int signal()
signals a single waiting thread to wake up
Qore's string type supported by the QoreEncoding class.
Definition: QoreString.h:93
DLLEXPORT void concat(const QoreString *str, ExceptionSink *xsink)
concatenates a string and converts encodings if necessary
DLLEXPORT int sprintf(const char *fmt,...)
this will concatentate a formatted string to the existing string according to the format string and t...
provides a mutually-exclusive thread lock
Definition: QoreThreadLock.h:49
base class for resolved call references
Definition: CallReferenceNode.h:109
The main value class in Qore, designed to be passed by value.
Definition: QoreValue.h:276