Qore DataProvider Module Reference  2.2
DataProvider::DataProviderPipeline Class Reference

Defines a class for passing data through record processors. More...

Public Member Methods

 abort (*bool ignore_exceptions)
 Aborts execution of a pipeline in progress. More...
 
bool aborting ()
 Returns True if the object is aborting.
 
 append (AbstractDataProcessor processor)
 Appends a data processor to the default queue. More...
 
 append (int id, AbstractDataProcessor processor)
 Appends a data processor to a queue. More...
 
int appendQueue (int id)
 Appends a new queue to an existing pipeline and returns the new queue ID. More...
 
 constructor (*hash< PipelineOptionInfo > opts)
 Creates the object with the given options. More...
 
 copy ()
 Copy constructor; creates an empty pipeline with the same configuration as the original.
 
 destructor ()
 Destroys the object. More...
 
hash< PipelineInfogetInfo ()
 Returns pipeline info. More...
 
string getName ()
 Returns the pipeline name.
 
bool hasQueue (int id)
 Returns True if the given queue exists, False if not.
 
bool isProcessing ()
 Returns True if the pipeline is processing data.
 
 logDebug (string fmt)
 Logs to the debug log, if set.
 
 logError (string fmt)
 Logs to the error log, if set.
 
 logInfo (string fmt)
 Logs to the info log, if set.
 
 registerThread (PipelineQueue queue)
 Registers a new thread.
 
 reportError (PipelineQueue queue, hash< ExceptionInfo > ex)
 Called from a pipeline queue object to report a fatal error durring processing.
 
 reset ()
 Resets the pipeline. More...
 
bool stopping ()
 Returns True if the object is stopping.
 
 submit (AbstractDataProviderBulkRecordInterface i)
 Submits data for processing.
 
 submit (AbstractDataProviderRecordIterator i)
 Submits data for processing.
 
 submit (auto _data)
 Submits data for processing.
 
 submitData (AbstractIterator i)
 Submits data for processing.
 
 waitDone ()
 Waits for all queues to have processed remaining data. More...
 

Private Member Methods

Counter cnt ()
 Thread counter.
 
Mutex lck ()
 Atomic lock.
 
Sequence seq (1)
 Pipeline ID sequence generator.
 

Private Attributes

bool abort_flag
 Abort flag.
 
*code debug_log
 Debug log closure; takes a single format string and then arguments for format placeholders.
 
bool do_bulk = True
 Bulk flag. More...
 
list< hash< ExceptionInfo > > error_list
 list of exceptions in pipelines
 
*code error_log
 Error log closure; takes a single format string and then arguments for format placeholders.
 
*code info_log
 Info log closure; takes a single format string and then arguments for format placeholders.
 
bool locked = False
 Locked flag.
 
string name
 A descriptive name for logging purposes.
 
hash< string, PipelineQueuepmap
 Hash of queues keyed by queue ID.
 
int record_count = 0
 Record count.
 
date start_time
 run start time
 
bool stop_flag
 Stop flag.
 
date stop_time
 run stop time (set in waitDone())
 
*code thread_callback
 a closure or call reference for setting thread-local data in new pipeline queue threads
 

Private:Internal Member Methods

 checkLockedIntern ()
 Throws an exception if the pipeline is locked. More...
 
 checkSubmitIntern ()
 Throws an exception if the pipeline cannot be used; locks the pipeline for changes otherwise. More...
 
 checkUpdatePipelineIntern (int id)
 Check if the given queue exists.
 
PipelineQueue copyPipeline (PipelineQueue old_queue)
 Called by the copy constructor to copy the queues.
 
 resetIntern ()
 Resets the pipeline. More...
 
 stopIntern ()
 Stops all background pipeline queues.
 
 stopInternUnlocked ()
 Stops all background pipeline queues; lock must be held.
 
 submitBulkIntern (AbstractDataProviderBulkRecordInterface i)
 Submits bulk data for processing. More...
 
 submitDataIntern (auto _data)
 Submits a single record for processing. More...
 
 submitIntern (auto _data)
 Submits data for processing. More...
 
 throwPipelineException ()
 Throws an exception if errors occured in background pipeline processing. More...
 

Detailed Description

Defines a class for passing data through record processors.

Record processing pipelines run in background threads. Each queue has an integer queue ID; a queue with ID 0 is created by default as the initial queue.

Note
Pipeline data can be of any type

Member Function Documentation

◆ abort()

DataProvider::DataProviderPipeline::abort ( *bool  ignore_exceptions)

Aborts execution of a pipeline in progress.

Parameters
ignore_exceptionsif True then any processing exceptions are ignored
Exceptions
PIPELINE-FAILEDthrown if ignore_exceptions is not True and there are any errors in pipeline processing, in which case the exception argument will have a list of exception arguments thrown by pipeline threads
Note
The pipeline must be reset once aborted to be used again

◆ append() [1/2]

DataProvider::DataProviderPipeline::append ( AbstractDataProcessor  processor)

Appends a data processor to the default queue.

Note
The initial queue is queue 0
See also
appendQueue()

◆ append() [2/2]

DataProvider::DataProviderPipeline::append ( int  id,
AbstractDataProcessor  processor 
)

Appends a data processor to a queue.

Parameters
idthe queue ID as returned from appendQueue()
processorthe data processor to append to the pipeline queue
Exceptions
PIPELINE-ERRORinvalid queue ID or the queue already terminates in additional queues; the pipeline is locked
Note
The initial queue is queue 0
See also
appendQueue()

◆ appendQueue()

int DataProvider::DataProviderPipeline::appendQueue ( int  id)

Appends a new queue to an existing pipeline and returns the new queue ID.

Parameters
idthe queue to which the new queue will be appended
Returns
the new queue ID
Exceptions
PIPELINE-ERRORthe pipeline is locked, or the given queue does not exist
Note
The initial queue is queue 0
See also
append(int, AbstractDataProcessor)

◆ checkLockedIntern()

DataProvider::DataProviderPipeline::checkLockedIntern ( )
private:internal

Throws an exception if the pipeline is locked.

Must be called with the lock held

◆ checkSubmitIntern()

DataProvider::DataProviderPipeline::checkSubmitIntern ( )
private:internal

Throws an exception if the pipeline cannot be used; locks the pipeline for changes otherwise.

Must be called with the lock held

◆ constructor()

DataProvider::DataProviderPipeline::constructor ( *hash< PipelineOptionInfo opts)

Creates the object with the given options.

Parameters
optsany options for the pipeline; see PipelineOptionInfo for more information
Note
The object is created with an initial queue with ID 0

◆ destructor()

DataProvider::DataProviderPipeline::destructor ( )

Destroys the object.

To ensure that the destructor does not throw a PIPELINE-FAILED exception, call run() or runAsync() and waitDone()

Exceptions
PIPELINE-FAILEDthrown if there are any errors in pipeline processing, in which case the exception argument will have a list of exception arguments thrown by pipeline queue threads

◆ getInfo()

hash<PipelineInfo> DataProvider::DataProviderPipeline::getInfo ( )

Returns pipeline info.

Returns
pipeline info; see PipelineInfo for more information
Note
record count and performance intormation is only valid after the pipeline has completed processing

◆ reset()

DataProvider::DataProviderPipeline::reset ( )

Resets the pipeline.

Exceptions
PIPELINE-ERRORthis method cannot be called while the pipeline is processing data; call abort() or waitDone() before resetting if the pipeline is processing data

◆ resetIntern()

DataProvider::DataProviderPipeline::resetIntern ( )
private:internal

Resets the pipeline.

Must be called with the lock held

◆ submitBulkIntern()

DataProvider::DataProviderPipeline::submitBulkIntern ( AbstractDataProviderBulkRecordInterface  i)
private:internal

Submits bulk data for processing.

Must be called with the lock held

See also
dataprovider_pipeline_bulk_processing

◆ submitDataIntern()

DataProvider::DataProviderPipeline::submitDataIntern ( auto  _data)
private:internal

Submits a single record for processing.

Must be called with the lock held

◆ submitIntern()

DataProvider::DataProviderPipeline::submitIntern ( auto  _data)
private:internal

Submits data for processing.

Must be called with the lock held

◆ throwPipelineException()

DataProvider::DataProviderPipeline::throwPipelineException ( )
private:internal

Throws an exception if errors occured in background pipeline processing.

Must be called with the lock held

◆ waitDone()

DataProvider::DataProviderPipeline::waitDone ( )

Waits for all queues to have processed remaining data.

Exceptions
PIPELINE-FAILEDthrown if there are any errors in pipeline processing, in which case the exception argument will have a list of exception arguments thrown by pipeline queue threads

Member Data Documentation

◆ do_bulk

bool DataProvider::DataProviderPipeline::do_bulk = True
private

Bulk flag.

See also
dataprovider_pipeline_bulk_processing