Qore Programming Language Reference Manual  0.9.16
Qore::StreamPipe Class Reference

This class provides a pair of streams connected through a buffer. More...

Public Member Methods

 constructor (bool syncClose=True, timeout timeout_ms=-1, int bufferSize=4096)
 Creates the StreamPipe. More...
 
PipeInputStream getInputStream ()
 Returns the input stream connected to the pipe. More...
 
PipeOutputStream getOutputStream ()
 Returns the output stream connected to the pipe. More...
 

Detailed Description

This class provides a pair of streams connected through a buffer.

The input stream provides the bytes that are written to the output stream. Data should be written to and read from the streams by different threads. The pipe contains a buffer (the size can be specified in the constructor) - the reading operations on the input stream are blocked if the buffer is empty and the writing operations on the output stream are blocked if the buffer is full. The input stream reports the end of the stream once the output stream is closed using OutputStream::close() and all remaining data are read from the buffer. On the other hand, the OutputStream::close() method waits until PipeInputStream::finishClose() is called on the StreamPipe object which can be used to delay the main thread until all data are read from the pipe in the background thread.

Broken pipe
A broken pipe is a situation when one of the streams ceases to exist (goes out of scope). In that case, a BROKEN-PIPE-ERROR is thrown by any operation on the other stream. This is particularly useful for unblocking the background producer (or consumer) when the other end stops consuming data before the end of the stream is reached (or stops producing data without closing the stream). For this to work as intended, it is important not to hold on to the StreamPipe instance since it keeps references to both streams. See the examples below for templates of correct usage and note that the StreamPipe instance goes out of scope as soon as possible.
Example: pulling data from a background producer
InputStream sub example() {
StreamPipe pipe(False); # False indicates that the output stream's close() will not block
PipeOutputStream os = pipe.getOutputStream();
background sub() {
try {
os.write(<01>); # produce data and write bytes to the pipe using os.write()
os.close(); # causes the input stream's read() method to report the end of data
} catch (hash ex) {
os.reportError(ex); # causes the input stream's read() method to throw the exception
}
}();
return pipe.getInputStream();
}
InputStream is = example();
*binary b;
while (b = is.read(4096)) {
# process the data
}
Example: pushing data to a background consumer
OutputStream sub example() {
StreamPipe pipe();
PipeInputStream is = pipe.getInputStream();
background sub() {
try {
binary *b;
while (b = is.read(4096)) {
# process the data
}
# finish processing / cleanup
is.finishClose(); # wakeup PipeOutputStream::close()
} catch (hash ex) {
is.reportError(ex); # causes the output stream's write() or close() methods to throw the exception
}
}();
return pipe.getOutputStream();
}
OutputStream os = example();
os.write(<01>); # produce data
os.close(); # waits until the consumer is done or reports an error
Since
Qore 0.8.13

Member Function Documentation

◆ constructor()

Qore::StreamPipe::constructor ( bool  syncClose = True,
timeout  timeout_ms = -1,
int  bufferSize = 4096 
)

Creates the StreamPipe.

Parameters
syncCloseif True, then the output stream's close() method blocks until the input stream's finishClose() method is called
timeout_msa timeout period with a resolution of milliseconds (a relative date/time value; integer arguments will be assumed to be milliseconds); if not given or negative the operations will never time out
bufferSizethe size of the internal buffer

◆ getInputStream()

PipeInputStream Qore::StreamPipe::getInputStream ( )

Returns the input stream connected to the pipe.

Returns
the input stream connected to the pipe

◆ getOutputStream()

PipeOutputStream Qore::StreamPipe::getOutputStream ( )

Returns the output stream connected to the pipe.

Returns
the output stream connected to the pipe