Qore Programming Language  1.7.0
ThreadPool.h
1 /* -*- mode: c++; indent-tabs-mode: nil -*- */
2 /*
3  Qore Programming Language
4 
5  Copyright (C) 2003 - 2022 Qore Technologies, s.r.o.
6 
7  Permission is hereby granted, free of charge, to any person obtaining a
8  copy of this software and associated documentation files (the "Software"),
9  to deal in the Software without restriction, including without limitation
10  the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  and/or sell copies of the Software, and to permit persons to whom the
12  Software is furnished to do so, subject to the following conditions:
13 
14  The above copyright notice and this permission notice shall be included in
15  all copies or substantial portions of the Software.
16 
17  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  DEALINGS IN THE SOFTWARE.
24 
25  Note that the Qore library is released under a choice of three open-source
26  licenses: MIT (as above), LGPL 2+, or GPL 2+; see README-LICENSE for more
27  information.
28 */
29 
30 #ifndef _QORE_THREADPOOL_H
31 #define _QORE_THREADPOOL_H
32 
33 #define QTP_DEFAULT_RELEASE_MS 5000
34 
35 #include <deque>
36 #include <qore/qlist>
37 
38 class ThreadTask;
39 class ThreadPoolThread;
40 
41 typedef std::deque<ThreadTask*> taskq_t;
42 typedef qlist<ThreadPoolThread*> tplist_t;
43 
44 class ThreadTask {
45 public:
46  DLLLOCAL ThreadTask(ResolvedCallReferenceNode* c, ResolvedCallReferenceNode* cc) : code(c), cancelCode(cc) {
47  }
48 
49  DLLLOCAL ~ThreadTask() {
50  assert(!code);
51  assert(!cancelCode);
52  }
53 
54  DLLLOCAL void del(ExceptionSink* xsink) {
55  code->deref(xsink);
56  if (cancelCode)
57  cancelCode->deref(xsink);
58 #ifdef DEBUG
59  code = nullptr;
60  cancelCode = nullptr;
61 #endif
62  delete this;
63  }
64 
65  DLLLOCAL QoreValue run(ExceptionSink* xsink) {
66  return code->execValue(0, xsink);
67  }
68 
69  DLLLOCAL void cancel(ExceptionSink* xsink) {
70  if (cancelCode)
71  cancelCode->execValue(0, xsink).discard(xsink);
72  }
73 
74 protected:
76  ResolvedCallReferenceNode* cancelCode;
77 };
78 
79 class ThreadTaskHolder {
80 protected:
81  ThreadTask* task;
82  ExceptionSink* xsink;
83 
84 public:
85  DLLLOCAL ThreadTaskHolder(ThreadTask* t, ExceptionSink* xs) : task(t), xsink(xs) {
86  }
87 
88  DLLLOCAL ~ThreadTaskHolder() {
89  if (task) {
90  task->del(xsink);
91  }
92  }
93 
94  DLLLOCAL ThreadTask* release() {
95  ThreadTask* rv = task;
96  task = 0;
97  return rv;
98  }
99 };
100 
101 class ThreadPool;
102 
103 class ThreadPoolThread {
104 protected:
105  int id;
106  ThreadPool& tp;
107  ThreadTask* task = nullptr;
108  QoreCondition c,
109  *stopCond = nullptr;
110  QoreThreadLock m;
111  tplist_t::iterator pos;
112  bool stopflag = false,
113  stopped = false;
114 
115  DLLLOCAL void finalize(ExceptionSink* xsink);
116 
117 public:
118  DLLLOCAL ThreadPoolThread(ThreadPool& n_tp, ExceptionSink* xsink);
119 
120  DLLLOCAL ~ThreadPoolThread() {
121  delete stopCond;
122  assert(!task);
123  }
124 
125  DLLLOCAL void setPos(tplist_t::iterator p) {
126  pos = p;
127  }
128 
129  DLLLOCAL bool valid() const {
130  return id != -1;
131  }
132 
133  DLLLOCAL void worker(ExceptionSink* xsink);
134 
135  DLLLOCAL void stop() {
136  AutoLocker al(m);
137  assert(!stopflag);
138  stopflag = true;
139  c.signal();
140  //printd(5, "ThreadPoolThread::stop() signaling stop for id %d\n", id);
141  }
142 
143  DLLLOCAL void stopWait() {
144  assert(!stopCond);
145  stopCond = new QoreCondition;
146 
147  //printd(5, "ThreadPoolThread::stopWait() stopping id %d\n", id);
148  AutoLocker al(m);
149 
150  assert(!stopflag);
151  stopflag = true;
152  c.signal();
153  }
154 
155  DLLLOCAL void stopConfirm(ExceptionSink* xsink) {
156  {
157  AutoLocker al(m);
158  assert(stopflag);
159  assert(stopCond);
160  while (!stopped)
161  stopCond->wait(m);
162  }
163 
164  //printd(5, "ThreadPoolThread::stopConfirm() stopped id %d\n", id);
165  finalize(xsink);
166  }
167 
168  DLLLOCAL void submit(ThreadTask* t) {
169  AutoLocker al(m);
170  assert(!stopflag);
171  assert(!task);
172  task = t;
173  c.signal();
174  }
175 
176  DLLLOCAL int getId() const {
177  return id;
178  }
179 
180  DLLLOCAL tplist_t::iterator getPos() const {
181  return pos;
182  }
183 };
184 
185 class ThreadPool : public AbstractPrivateData {
186 protected:
187  int max, // maximum number of threads in pool (if <= 0 then unlimited)
188  minidle, // minimum number of idle threads
189  maxidle, // maximum number of idle threads
190  release_ms; // number of milliseconds before idle threads are released when > minidle
191 
192  // mutex for atomicity
193  QoreThreadLock m;
194 
195  // worker thread condition variable
196  QoreCondition cond;
197 
198  // stop condition variable
199  QoreCondition stopCond;
200 
201  tplist_t ah, // allocated thread list
202  fh; // free thread list
203 
204  // quit flag
205  bool quit = false;
206 
207  // master task queue
208  taskq_t q;
209 
210  // task waiting flag
211  bool waiting = false;
212 
213  bool shutdown = false, // shutdown flag
214  stopflag = false, // stop flag
215  stopped = false, // stopped flag
216  detach = false; // detach flag
217 
218  DLLLOCAL int checkStopUnlocked(const char* m, ExceptionSink* xsink) {
219  if (shutdown) {
220  xsink->raiseException("THREADPOOL-ERROR", "ThreadPool::%s() cannot be executed because the ThreadPool " \
221  "is being destroyed", m);
222  return -1;
223  }
224  return 0;
225  }
226 
227  DLLLOCAL int addIdleWorker(ExceptionSink* xsink) {
228  assert(xsink);
229  std::unique_ptr<ThreadPoolThread> tpth(new ThreadPoolThread(*this, xsink));
230  if (!tpth->valid()) {
231  assert(*xsink);
232  return -1;
233  }
234 
235  ThreadPoolThread* tpt = tpth.release();
236  fh.push_back(tpt);
237 #ifdef DEBUG
238  // set to an invalid iterator
239  tpt->setPos(fh.end());
240 #endif
241  return 0;
242  }
243 
244  DLLLOCAL ThreadPoolThread* getThreadUnlocked(ExceptionSink* xsink) {
245  assert(xsink);
246  while (!stopflag && fh.empty() && max && (int)ah.size() == max) {
247  waiting = true;
248  cond.wait(m);
249  waiting = false;
250  }
251 
252  if (stopflag)
253  return nullptr;
254 
255  ThreadPoolThread* tpt;
256 
257  if (!fh.empty()) {
258  tpt = fh.front();
259  fh.pop_front();
260  } else {
261  std::unique_ptr<ThreadPoolThread> tpt_pt(new ThreadPoolThread(*this, xsink));
262  if (!tpt_pt->valid()) {
263  assert(*xsink);
264  return nullptr;
265  }
266  tpt = tpt_pt.release();
267  }
268 
269  ah.push_back(tpt);
270  tplist_t::iterator i = ah.end();
271  --i;
272  tpt->setPos(i);
273  return tpt;
274  }
275 
276 public:
277  DLLLOCAL ThreadPool(ExceptionSink* xsink, int n_max = 0, int n_minidle = 0, int m_maxidle = 0, int n_release_ms = QTP_DEFAULT_RELEASE_MS);
278 
279  DLLLOCAL ~ThreadPool() {
280  assert(q.empty());
281  assert(ah.empty());
282  assert(fh.empty());
283  assert(stopped);
284  }
285 
286  DLLLOCAL void toString(QoreString& str) {
287  AutoLocker al(m);
288 
289  str.sprintf("ThreadPool %p total: %lu max: %d minidle: %d maxidle: %d release_ms: %d running: [", this,
290  ah.size() + fh.size(), max, minidle, maxidle, release_ms);
291  for (tplist_t::iterator i = ah.begin(), e = ah.end(); i != e; ++i) {
292  if (i != ah.begin())
293  str.concat(", ");
294  str.sprintf("%d", (*i)->getId());
295  }
296 
297  str.concat("] idle: [");
298 
299  for (tplist_t::iterator i = fh.begin(), e = fh.end(); i != e; ++i) {
300  if (i != fh.begin())
301  str.concat(", ");
302  str.sprintf("%d", (*i)->getId());
303  }
304 
305  str.concat(']');
306  }
307 
308  // does not return until the thread pool has been stopped
309  DLLLOCAL void stop() {
310  AutoLocker al(m);
311  if (!stopflag) {
312  detach = true;
313  shutdown = true;
314  stopflag = true;
315  cond.signal();
316  }
317 
318  while (!stopped) {
319  stopCond.wait(m);
320  }
321  }
322 
323  DLLLOCAL int stopWait(ExceptionSink* xsink) {
324  AutoLocker al(m);
325  if (detach && !stopped) {
326  xsink->raiseException("THREADPOOL-ERROR", "cannot call ThreadPool::stopWait() after ThreadPool::stop() " \
327  "has been called since worker threads have been detached and can no longer be traced");
328  return -1;
329  }
330 
331  if (!shutdown) {
332  shutdown = true;
333  cond.signal();
334  }
335 
336  while (!stopped) {
337  stopCond.wait(m);
338  }
339 
340  return 0;
341  }
342 
343  DLLLOCAL int submit(ResolvedCallReferenceNode* c, ResolvedCallReferenceNode* cc, ExceptionSink* xsink) {
344  // optimistically create the task object outside the lock
345  ThreadTaskHolder task(new ThreadTask(c, cc), xsink);
346 
347  AutoLocker al(m);
348  if (checkStopUnlocked("submit", xsink))
349  return -1;
350 
351  if (q.empty())
352  cond.signal();
353  q.push_back(task.release());
354 
355  return 0;
356  }
357 
358  DLLLOCAL void threadCounts(int& idle, int& running) {
359  AutoLocker al(m);
360  idle = fh.size();
361  running = ah.size();
362  }
363 
364  DLLLOCAL int done(ThreadPoolThread* tpt) {
365  {
366  AutoLocker al(m);
367  // allow the thread to be removed from the active list by ThreadPool::worker() to avoid race conditions
368  if (stopflag) {
369  return 0;
370  }
371 
372  tplist_t::iterator i = tpt->getPos();
373  ah.erase(i);
374 
375  // requeue thread if possible
376  if ((!maxidle && release_ms) || ((int)fh.size() < maxidle) || q.size() > fh.size()) {
377  fh.push_back(tpt);
378  if (waiting || (release_ms && (int)fh.size() > minidle))
379  cond.signal();
380  return 0;
381  }
382  }
383 
384  return -1;
385  }
386 
387  DLLLOCAL void worker(ExceptionSink* xsink);
388 };
389 
390 #endif
the base class for all data to be used as private data of Qore objects
Definition: AbstractPrivateData.h:44
provides a safe and exception-safe way to hold locks in Qore, only to be used on the stack,...
Definition: QoreThreadLock.h:136
container for holding Qore-language exception information and also for registering a "thread_exit" ca...
Definition: ExceptionSink.h:48
DLLEXPORT AbstractQoreNode * raiseException(const char *err, const char *fmt,...)
appends a Qore-language exception to the list
a thread condition class implementing a wrapper for pthread_cond_t
Definition: QoreCondition.h:45
DLLEXPORT int wait(pthread_mutex_t *m)
blocks a thread on a mutex until the condition is signaled
DLLEXPORT int signal()
signals a single waiting thread to wake up
Qore's string type supported by the QoreEncoding class.
Definition: QoreString.h:93
DLLEXPORT void concat(const QoreString *str, ExceptionSink *xsink)
concatenates a string and converts encodings if necessary
DLLEXPORT int sprintf(const char *fmt,...)
this will concatentate a formatted string to the existing string according to the format string and t...
provides a mutually-exclusive thread lock
Definition: QoreThreadLock.h:49
base class for resolved call references
Definition: CallReferenceNode.h:109
The main value class in Qore, designed to be passed by value.
Definition: QoreValue.h:275