30#ifndef _QORE_THREADPOOL_H
31#define _QORE_THREADPOOL_H
33#define QTP_DEFAULT_RELEASE_MS 5000
39class ThreadPoolThread;
41typedef std::deque<ThreadTask*> taskq_t;
42typedef qlist<ThreadPoolThread*> tplist_t;
49 DLLLOCAL ~ThreadTask() {
57 cancelCode->deref(xsink);
66 return code->execValue(0, xsink);
71 cancelCode->execValue(0, xsink).discard(xsink);
79class ThreadTaskHolder {
85 DLLLOCAL ThreadTaskHolder(ThreadTask* t,
ExceptionSink* xs) : task(t), xsink(xs) {
88 DLLLOCAL ~ThreadTaskHolder() {
94 DLLLOCAL ThreadTask* release() {
95 ThreadTask* rv = task;
103class ThreadPoolThread {
107 ThreadTask* task =
nullptr;
111 tplist_t::iterator pos;
112 bool stopflag =
false,
118 DLLLOCAL ThreadPoolThread(ThreadPool& n_tp,
ExceptionSink* xsink);
120 DLLLOCAL ~ThreadPoolThread() {
125 DLLLOCAL
void setPos(tplist_t::iterator p) {
129 DLLLOCAL
bool valid()
const {
135 DLLLOCAL
void stop() {
143 DLLLOCAL
void stopWait() {
168 DLLLOCAL
void submit(ThreadTask* t) {
176 DLLLOCAL
int getId()
const {
180 DLLLOCAL tplist_t::iterator getPos()
const {
211 bool waiting =
false;
213 bool shutdown =
false,
218 DLLLOCAL
int checkStopUnlocked(
const char* m,
ExceptionSink* xsink) {
220 xsink->
raiseException(
"THREADPOOL-ERROR",
"ThreadPool::%s() cannot be executed because the ThreadPool " \
221 "is being destroyed", m);
229 std::unique_ptr<ThreadPoolThread> tpth(
new ThreadPoolThread(*
this, xsink));
230 if (!tpth->valid()) {
235 ThreadPoolThread* tpt = tpth.release();
239 tpt->setPos(fh.end());
244 DLLLOCAL ThreadPoolThread* getThreadUnlocked(
ExceptionSink* xsink) {
246 while (!stopflag && fh.empty() && max && (
int)ah.size() == max) {
255 ThreadPoolThread* tpt;
261 std::unique_ptr<ThreadPoolThread> tpt_pt(
new ThreadPoolThread(*
this, xsink));
262 if (!tpt_pt->valid()) {
266 tpt = tpt_pt.release();
270 tplist_t::iterator i = ah.end();
277 DLLLOCAL ThreadPool(
ExceptionSink* xsink,
int n_max = 0,
int n_minidle = 0,
int m_maxidle = 0,
int n_release_ms = QTP_DEFAULT_RELEASE_MS);
279 DLLLOCAL ~ThreadPool() {
289 str.
sprintf(
"ThreadPool %p total: %lu max: %d minidle: %d maxidle: %d release_ms: %d running: [",
this,
290 ah.size() + fh.size(), max, minidle, maxidle, release_ms);
291 for (tplist_t::iterator i = ah.begin(), e = ah.end(); i != e; ++i) {
294 str.
sprintf(
"%d", (*i)->getId());
299 for (tplist_t::iterator i = fh.begin(), e = fh.end(); i != e; ++i) {
302 str.
sprintf(
"%d", (*i)->getId());
309 DLLLOCAL
void stop() {
325 if (detach && !stopped) {
326 xsink->
raiseException(
"THREADPOOL-ERROR",
"cannot call ThreadPool::stopWait() after ThreadPool::stop() " \
327 "has been called since worker threads have been detached and can no longer be traced");
345 ThreadTaskHolder task(
new ThreadTask(c, cc), xsink);
348 if (checkStopUnlocked(
"submit", xsink))
353 q.push_back(task.release());
358 DLLLOCAL
void threadCounts(
int& idle,
int& running) {
364 DLLLOCAL
int done(ThreadPoolThread* tpt) {
372 tplist_t::iterator i = tpt->getPos();
376 if ((!maxidle && release_ms) || ((
int)fh.size() < maxidle) || q.size() > fh.size()) {
378 if (waiting || (release_ms && (
int)fh.size() > minidle))
the base class for all data to be used as private data of Qore objects
Definition: AbstractPrivateData.h:44
provides a safe and exception-safe way to hold locks in Qore, only to be used on the stack,...
Definition: QoreThreadLock.h:136
container for holding Qore-language exception information and also for registering a "thread_exit" ca...
Definition: ExceptionSink.h:50
DLLEXPORT AbstractQoreNode * raiseException(const char *err, const char *fmt,...)
appends a Qore-language exception to the list
a thread condition class implementing a wrapper for pthread_cond_t
Definition: QoreCondition.h:45
DLLEXPORT int wait(pthread_mutex_t *m)
blocks a thread on a mutex until the condition is signaled
DLLEXPORT int signal()
signals a single waiting thread to wake up
Qore's string type supported by the QoreEncoding class.
Definition: QoreString.h:93
DLLEXPORT void concat(const QoreString *str, ExceptionSink *xsink)
concatenates a string and converts encodings if necessary
DLLEXPORT int sprintf(const char *fmt,...)
this will concatentate a formatted string to the existing string according to the format string and t...
provides a mutually-exclusive thread lock
Definition: QoreThreadLock.h:49
base class for resolved call references
Definition: CallReferenceNode.h:109
The main value class in Qore, designed to be passed by value.
Definition: QoreValue.h:276