Qore DataProvider Module Reference 2.7.5
Loading...
Searching...
No Matches
DataProviderPipeline.qc.dox.h
1// -*- mode: c++; indent-tabs-mode: nil -*-
3
25// assume local scope for variables, do not use "$" signs
26// require type definitions everywhere
28// enable all warnings
29// allow weak references
30
32namespace DataProvider {
37const PS_ABORTED = "ABORTED";
38
40const PS_RUNNING = "RUNNING";
41
43const PS_IDLE = "IDLE";
45
47public hashdecl PipelineInfo {
49 string name;
50
53
55 *date stop_time;
56
58
60 string status;
61
64
67
69
71 bool bulk;
72
75
78
81};
82
84public hashdecl PipelineOptionInfo {
86
88 *code debug_log;
89
91
93 *code error_log;
94
96
98 *code info_log;
99
101 /* it is called in the new pipeline queue thread with no arguments
102 */
104
106 *string name;
107};
108
111
112public:
114 int id;
115
117 Mutex lck;
118
120 Counter cnt;
121
123 Condition cond();
124
126 Condition flush_cond();
127
130
133
136
138 list<auto> queue;
139
141 int size;
142
144 int tid;
145
147
151 list<auto> elems();
152
155
158
161
163 constructor(DataProviderPipeline parent, Mutex lck, Counter cnt, int id, int size);
164
165
167 int getId();
168
169
171
175 submit(auto qdata);
176
177
179 run(Counter run_cnt);
180
181
183
186
187
190protected:
192public:
193
194
195protected:
196 flushIntern();
197public:
198
199};
200
202
208
209public:
210
211
212protected:
214 string name;
215
217 hash<string, PipelineQueue> pmap;
218
220
222 bool do_bulk = True;
223
225 bool locked = False;
226
228 Sequence seq(1);
229
231 list<hash<ExceptionInfo>> error_list;
232
235
238
241
243 *code info_log;
244
247
250
252 /* it is called in the new pipeline queue thread with no arguments
253 */
255
257 Mutex lck();
258
261
264
266 Counter cnt();
267
268public:
269
271
275 constructor(*hash<PipelineOptionInfo> opts);
276
277
279
282
283
285
292
293
295 string getName();
296
297
299 bool hasQueue(int id);
300
301
303 bool stopping();
304
305
307 bool aborting();
308
309
312
313
315
321
322
324
334 append(int id, AbstractDataProcessor processor);
335
336
338
348 int appendQueue(int id);
349
350
353
354
356 submit(auto _data);
357
358
360 submitData(AbstractIterator i);
361
362
365
366
369
370
372
376
377
379
383
384
386
394 abort(*bool ignore_exceptions);
395
396
398
402 hash<PipelineInfo> getInfo();
403
404
406 reportError(PipelineQueue queue, hash<ExceptionInfo> ex);
407
408
410 logInfo(string fmt);
411
412
414 logError(string fmt);
415
416
418 logDebug(string fmt);
419
420
422private:
424public:
425
426
428
430private:
431 submitIntern(auto _data);
432public:
433
434
436
438private:
439 submitDataIntern(auto _data);
440public:
441
442
444
448private:
450public:
451
452
454
456private:
458public:
459
460
462
464private:
466public:
467
468
470
472private:
474public:
475
476
478
480private:
482public:
483
484
486private:
488public:
489
490
492private:
494public:
495
496
498private:
500public:
501
502};
503};
504
505*list get_stack();
506
507
508
Defines an abstract class for accepting data and outputting optionally transformed or filtered data.
Definition: AbstractDataProcessor.qc.dox.h:33
Defines the abstract class for data provider iterators; the destructor releases the iterator.
Definition: AbstractDataProviderBulkRecordInterface.qc.dox.h:33
Defines the abstract class for data provider iterators; the destructor releases the iterator.
Definition: AbstractDataProviderRecordIterator.qc.dox.h:339
Defines a class for passing data through record processors.
Definition: DataProviderPipeline.qc.dox.h:207
string getName()
Returns the pipeline name.
*code error_log
Error log closure; takes a single format string and then arguments for format placeholders.
Definition: DataProviderPipeline.qc.dox.h:246
Counter cnt()
Thread counter.
*code thread_callback
a closure or call reference for setting thread-local data in new pipeline queue threads
Definition: DataProviderPipeline.qc.dox.h:254
int record_count
Record count.
Definition: DataProviderPipeline.qc.dox.h:240
bool hasQueue(int id)
Returns True if the given queue exists, False if not.
bool stop_flag
Stop flag.
Definition: DataProviderPipeline.qc.dox.h:260
hash< string, PipelineQueue > pmap
Hash of queues keyed by queue ID.
Definition: DataProviderPipeline.qc.dox.h:217
abort(*bool ignore_exceptions)
Aborts execution of a pipeline in progress.
Sequence seq(1)
Pipeline ID sequence generator.
submitBulkIntern(AbstractDataProviderBulkRecordInterface i)
Submits bulk data for processing.
logError(string fmt)
Logs to the error log, if set.
copy()
Copy constructor; creates an empty pipeline with the same configuration as the original.
bool stopping()
Returns True if the object is stopping.
registerThread(PipelineQueue queue)
Registers a new thread.
throwPipelineException()
Throws an exception if errors occured in background pipeline processing.
resetIntern()
Resets the pipeline.
submitIntern(auto _data)
Submits data for processing.
list< hash< ExceptionInfo > > error_list
list of exceptions in pipelines
Definition: DataProviderPipeline.qc.dox.h:231
bool aborting()
Returns True if the object is aborting.
bool isProcessing()
Returns True if the pipeline is processing data.
PipelineQueue copyPipeline(PipelineQueue old_queue)
Called by the copy constructor to copy the queues.
int appendQueue(int id)
Appends a new queue to an existing pipeline and returns the new queue ID.
*code debug_log
Debug log closure; takes a single format string and then arguments for format placeholders.
Definition: DataProviderPipeline.qc.dox.h:249
reportError(PipelineQueue queue, hash< ExceptionInfo > ex)
Called from a pipeline queue object to report a fatal error durring processing.
*code info_log
Info log closure; takes a single format string and then arguments for format placeholders.
Definition: DataProviderPipeline.qc.dox.h:243
string name
A descriptive name for logging purposes.
Definition: DataProviderPipeline.qc.dox.h:214
append(int id, AbstractDataProcessor processor)
Appends a data processor to a queue.
date stop_time
run stop time (set in waitDone())
Definition: DataProviderPipeline.qc.dox.h:237
submit(auto _data)
Submits data for processing.
logInfo(string fmt)
Logs to the info log, if set.
submit(AbstractDataProviderBulkRecordInterface i)
Submits data for processing.
hash< PipelineInfo > getInfo()
Returns pipeline info.
waitDone()
Waits for all queues to have processed remaining data.
date start_time
run start time
Definition: DataProviderPipeline.qc.dox.h:234
bool locked
Locked flag.
Definition: DataProviderPipeline.qc.dox.h:225
logDebug(string fmt)
Logs to the debug log, if set.
submit(AbstractDataProviderRecordIterator i)
Submits data for processing.
destructor()
Destroys the object.
submitData(AbstractIterator i)
Submits data for processing.
bool abort_flag
Abort flag.
Definition: DataProviderPipeline.qc.dox.h:263
checkUpdatePipelineIntern(int id)
Check if the given queue exists.
stopInternUnlocked()
Stops all background pipeline queues; lock must be held.
checkSubmitIntern()
Throws an exception if the pipeline cannot be used; locks the pipeline for changes otherwise.
bool do_bulk
Bulk flag.
Definition: DataProviderPipeline.qc.dox.h:222
checkLockedIntern()
Throws an exception if the pipeline is locked.
append(AbstractDataProcessor processor)
Appends a data processor to the default queue.
submitDataIntern(auto _data)
Submits a single record for processing.
constructor(*hash< PipelineOptionInfo > opts)
Creates the object with the given options.
stopIntern()
Stops all background pipeline queues.
Pipeline element.
Definition: DataProviderPipeline.qc.dox.h:110
bool do_flush
Flush pipeline flag.
Definition: DataProviderPipeline.qc.dox.h:154
Mutex lck
Parent lock.
Definition: DataProviderPipeline.qc.dox.h:117
list< auto > queue
Data queue.
Definition: DataProviderPipeline.qc.dox.h:138
int getId()
Returns the pipeline ID.
waitDone()
Wait for the queue to be empty, then wait for all terminating pipelines to be empty.
int data_waiting
Number of threads waiting on data.
Definition: DataProviderPipeline.qc.dox.h:132
int id
Queue ID.
Definition: DataProviderPipeline.qc.dox.h:114
Condition cond()
Queue condition variable.
submit(auto qdata)
Submits data for processing.
Condition flush_cond()
Flush condition variable.
int tid
TID of the background thread.
Definition: DataProviderPipeline.qc.dox.h:144
constructor(DataProviderPipeline parent, Mutex lck, Counter cnt, int id, int size)
Creates the object.
bool data_flushed
Data flushed confirmation.
Definition: DataProviderPipeline.qc.dox.h:157
list< auto > elems()
Pipeline elements.
int size
Maximum queue size.
Definition: DataProviderPipeline.qc.dox.h:141
int queue_waiting
Number of threads waiting data to be removed from the queue.
Definition: DataProviderPipeline.qc.dox.h:129
run(Counter run_cnt)
Processing thread.
Counter cnt
Parent counter.
Definition: DataProviderPipeline.qc.dox.h:120
int flush_waiting
Number of threads waiting on the flush cond.
Definition: DataProviderPipeline.qc.dox.h:135
DataProviderPipeline parent
Parent object.
Definition: DataProviderPipeline.qc.dox.h:160
const PS_IDLE
Pipeline status: IDLE.
Definition: DataProviderPipeline.qc.dox.h:43
const PS_RUNNING
Pipeline status: RUNNING.
Definition: DataProviderPipeline.qc.dox.h:40
const PS_ABORTED
Definition: DataProviderPipeline.qc.dox.h:37
Qore AbstractDataField class definition.
Definition: AbstractDataField.qc.dox.h:27
Pipeline info.
Definition: DataProviderPipeline.qc.dox.h:47
date duration
Total time processing end to end.
Definition: DataProviderPipeline.qc.dox.h:74
*date stop_time
Stop time for processing.
Definition: DataProviderPipeline.qc.dox.h:55
float recs_per_sec
Records processed per second end to end.
Definition: DataProviderPipeline.qc.dox.h:80
*date start_time
Start of processing.
Definition: DataProviderPipeline.qc.dox.h:52
bool bulk
Flag that indicates if the pipeline is capable of bulk record processing.
Definition: DataProviderPipeline.qc.dox.h:71
int num_queues
Number of pipeline queues.
Definition: DataProviderPipeline.qc.dox.h:66
int record_count
Number of input records submitted.
Definition: DataProviderPipeline.qc.dox.h:63
string status
Pipeline status.
Definition: DataProviderPipeline.qc.dox.h:60
float duration_secs
Total time processing end to end as a floating-point value in durationSecondsFloat.
Definition: DataProviderPipeline.qc.dox.h:77
string name
The name of the pipeline.
Definition: DataProviderPipeline.qc.dox.h:49
Pipeline option info.
Definition: DataProviderPipeline.qc.dox.h:84
*string name
the name of the pipeline for logging purposes; if this key is not included, a generic name will be ge...
Definition: DataProviderPipeline.qc.dox.h:106
*code info_log
a closure or call reference for info logging
Definition: DataProviderPipeline.qc.dox.h:98
*code error_log
a closure or call reference for error logging
Definition: DataProviderPipeline.qc.dox.h:93
*code thread_callback
a closure or call reference for setting thread-local data in new pipeline queue threads
Definition: DataProviderPipeline.qc.dox.h:103
*code debug_log
a closure or call reference for debug logging
Definition: DataProviderPipeline.qc.dox.h:88