In large-scale distributed systems events are a commonly used communication mechanism for passing data and triggering behaviors. Events are typically generated asynchronously rather than at the request of the processing system, and once received an event usually undergoes some level of transformation before being stored, acted upon, or forwarded to another consumer.
Pipelines and streams are a popular and effective model for consuming and processing events, with numerous APIs providing this sort of model. One of the most well-known processing pipeline APIs is the Java 8 Streams API, which provides a functional pipeline for operating on Collections. The Streams API is inherently pull based” as it relies on iterators and spliterators to pull the next entry from the stream. This is the primary difference between synchronous and asynchronous models. In an asynchronous world events are pushed into the pipeline as they are received.
This specification defines a PushStream API which can be used on devices which support the Java 8 compact1 profile. The PushStream API defined by this specification depends on OSGi Promises but is independent of all other OSGi specifications, including the OSGi Framework, and thus can be easily used outside of the OSGi environment.
A PushStream object encapsulates a pipeline of a potentially asynchronous tasks which will be performed when an event arrives. The result of the processing pipeline is represented using a Promise object which will resolve when the result has been calculated.
PushStream capture the effects of errors, finite streams and back pressure by making these explicit in the API signatures. Errors and End of Stream conditions are represented by specific events which are pushed into the stream. Back pressure is represented by a delay value returned from the event pipeline stages.
-
Common concepts - The API is inspired by the Streams API in Java 8 and uses the same basic concepts. See [1] Java 8 Stream API.
-
Independent - The design is independent of all other OSGi specifications (except for OSGi Promises) and can be used outside of an OSGi environment.
-
Asynchronous - The design is built to handle asynchronously produced events.
-
Back Pressure - The design provides a means for event pipelines to communicate back-pressure to the Event Source.
-
Complete - The design provides a very complete set of operations for PushStreams which are primitives that can be used to address most use cases.
-
Generified - Generics are used to promote type safety.
-
Push Event Source - A PushEventSource object represents a source of asynchronous events, and can be used to create a PushStream.
-
Push Event Consumer - A Push Event Consumer object represents a sink for asynchronous events, and can be attached to a PushEventSource or a PushStream.
-
Push Stream - A PushStream object represents a pipeline for processing asynchronous events.
-
Terminal Operation - The final operation of a PushStream pipeline results in a Promise which represents the completion state of the pipeline. The operation also begins the processing of events.
The Push Stream API is built upon the principals of Asynchronous Event streams, and therefore requires three basic primitives:
-
An event object
-
A source of event objects
-
A consumer of event objects
The PushEvent is an object representing an event. Every Push Event has an event type, which has one of three values:
An event stream consists of zero or more data events followed by a terminal event. A terminal event is either an error or a close, and it indicates that there will be no more events in this stream. Depending on the reason for the terminal event it may be possible to re-attach to the event source and consume more events.
A Push Event Source object represents a source of asynchronous Push Events. The event source defines a single method open(PushEventConsumer) which can be used to connect to the source and begin receiving a stream of events.
The open
method of the Push Event Source returns an
AutoCloseable
which can be used to close the event stream.
If the close
method is called on this object then the
stream is terminated by sending a close event. If additional calls are
made to the close method then they return without further action.
A Push Event Consumer object represents a sink for asynchronous Push Events. The event consumer defines a single method accept(PushEvent) which can be used to receive a stream of events.
The accept
method of the Push Event Consumer returns
a long
representing back pressure.
Back pressure is described in detail in Back pressure. If the returned long is
negative then the event stream should be closed by the event
source.
There are three ways in which a stream of events can complete normally.
-
The Push Event Source may close the stream at any time by sending a terminal event to the consumer. Upon receiving a terminal event the consumer should clean up any resources and not expect to receive further messages. Note that in a multi-threaded system the consumer may receive events out of order, and in this case data events may be received after a terminal event. Event processors should be careful to ignore data events that occur after terminal events, and to ensure that any downstream consumers receive any pending data events before forwarding the terminal event.
-
The
open
method of the Push Event Source returns anAutoCloseable
which can be used to close the event stream. If theclose
method is called on this object then the stream is terminated by sending a close event. If additional calls are made to the close method then they return without action. If the close method is called after a terminal event has been sent for any other reason then it must return without action. -
The
accept
method of the Push Event Consumer returns a long indicating back pressure. If the long is negative then the event source must close the stream by sending a close event.
Simple event passing can be achieved by connecting a Push Event Consumer directly to a Push Event Source, however this model forces a large amount of flow-control and resource management into a single location. Furthermore it is difficult to reuse business logic across different event streams.
The PushStream provides a powerful, flexible pipeline for event processing. The Push Stream API shares many concepts with the Java 8 Streams API, in particular Push Streams are lazy, they may not consume the entire event stream, and they can be composed from functional steps.
A Push Stream can be created from a Push Event Source by using a PushStreamProvider. A Push Stream represents a stage in an event processing pipeline. The overall pipeline is constructed from zero or more intermediate operations, and completed with a single terminal operation.
Each intermediate operation returns a new Push Stream object
chained to the previous pipeline step. Once a Push Stream object has had
an intermediate operation invoked on it then it may not have any other
operations chained to it. Terminal operations are either void, or return
a Promise
representing the future result of the pipeline.
These API patterns allow Push Streams to be built using a fluent
API.
Push Stream instances are lazy, and so the Push Stream will not be
connected to the Push Event Source until a terminal
operation
is invoked on the Push Stream. This means that a push
stream object can be safely built without events being received when the
pipeline is partially initialized.
The simplest intermediate operations on a Push Stream are mapping and filtering. These operations use stateless, non-interfering functions to alter the data received by the next stage in the pipeline.
Mapping is the act of transforming an event from one type into another. This may involve taking a field from the object, or performing some simple processing on it. When mapping there is an one to one relationship between input and output events, that is, each input event is mapped to exactly one output event.
PushStream<String> streamOfStrings = getStreamOfStrings();
PushStream<Integer> streamOfLengths =
streamOfStrings.map(String::length);
If the mapping function throws an Exception then an Error Event is propagated down the stream to the next pipeline step. The failure in the error event is set to the Exception thrown by the mapping function. The current pipeline step is also closed, and the close operation is propagated back upstream to the event source by closing previous pipeline stages. Any subsequently received events must not be propagated and must return negative back pressure.
Flat Mapping is the act of transforming an event from one type into multiple events of another type. This may involve taking fields from an object, or performing some simple processing on it. When flat mapping there is a one to many relationship between input and output events, that is, each input event is mapped to zero or more output events.
A flat mapping function should asynchronously consume the event data and return a Push Stream containing the flow of subsequent events.
PushStream<String> streamOfStrings = getStreamOfStrings();
PushStream<Character> streamOfCharacters =
streamOfStrings.flatMap(s -> {
SimplePushEventSource<Character> spes =
getSimplePushEventSource();
spes.connectPromise()
.onResolve(() ->
executor.execute(() -> {
for(int i = 0; i < s.length; i++) {
spes.publish(s.charAt(i));
}
});
return pushStreamProvider.createStream(spes);
});
If the flat mapping function throws an Exception then an Error Event is propagated down the stream to the next pipeline step. The failure in the error event is set to the Exception thrown by the mapping function. The current pipeline step is also closed, and the close operation is propagated back upstream to the event source by closing previous pipeline stages. Any subsequently received events must not be propagated and must return negative back pressure.
Filtering is the act of removing events from the stream based on some characteristic of the event data. This may involve inspecting the fields of the data object, or performing some simple processing on it. If the filter function returns true for an event then it will be passed to the next stage of the pipeline. If the filter function returns false then it will be discarded, and not passed to the next pipeline stage.
PushStream<String> streamOfStrings = getStreamOfStrings();
PushStream<String> filteredStrings =
streamOfStrings.filter(s -> s.length() == 42);
If the filtering function throws an Exception then an Error Event is propagated down the stream to the next pipeline step. The failure in the error event is set to the Exception thrown by the filter function. The current pipeline step is also closed, and the close operation is propagated back upstream to the event source by closing previous pipeline stages. Any subsequently received events must not be propagated and must return negative back pressure.
Mapping operations may sometimes take time to calculate their results. PushStream operations should, in general be fast and non-blocking and so long-running mapping operations should be run on a separate thread. The asyncMap(int,int,Function) operation allows the mapping function to return a Promise representing the ongoing calculation of the mapped value. When this promise resolves then its value will be passed to the next pipeline stage.
As asynchronous mapping operations are long-running they require back pressure to be generated as the number of running operations increases. The amount of back pressure returned is equal to the number of pending promises (aside from the mapping operation that has just started) plus the number of waiting threads if the maximum number of concurrent promises has been reached. The returned back pressure when only a single promise is running is therefore always zero.
Intermediate operations are either stateless or stateful. Stateless operations are ones where the pipeline stage does not need to remember the previous data from the stream. Mapping, Flat Mapping and Filtering are all stateless operations. The following table lists the stateless operations on the Push Stream.
Table 706.1 Stateless Intermediate Operations on the Push Stream
Intermediate Operation | Description |
---|---|
Register a transformation function to adjust the back pressure returned by the previous entry in the stream. The result of this function will be returned as back pressure. |
|
Register a mapping function which will asynchronously calculate the value to be passed to the next stage of the stream. The returned back pressure is equal to one less than the number of outstanding promises, plus the number of queued threads, multiplied by the delay value. |
|
Register a selection function to be called with
each data event in the stream. If the function returns
|
|
Register a transformation function to be called with each data event in the stream. Each incoming data element is converted into a stream of elements. The transformed data is then propagated to the next stage of the stream. |
|
Pushes event processing onto one or more threads
in the supplied |
|
Register a transformation function to be called with each data event in the stream. The transformed data is propagated to the next stage of the stream. |
|
Merges |
|
Forces data events to be delivered sequentially to the next stage of the stream. Events may be delivered on multiple threads, but will not arrive concurrently at the next stage of the pipeline. |
|
Register a set of filter functions to select elements that should be forwarded downstream. The returned streams correspond to the supplied filter functions. |
Stateful operations differ from stateless operations in that
they must remember items from the stream. Sometimes stateful
operations must remember large numbers of events, or even the entire
stream. For example the distinct
operation remembers the
identity of each entry in the stream, and filters out duplicate
events.
Care should be taken when using Stateful operations with large
or infinite streams. For example the sorted
operation
must process the entire stream until it receives
a close event. At this point the events can be sorted and delivered in
order. It is usually a good idea to use the limit
operation to restrict the length of the stream before performing a
stateful operation which must remember many elements.
The following table lists all of the stateful operations of the PushStream.
Table 706.2 Stateful Intermediate Operations on the Push Stream
Intermediate Operation | Description |
---|---|
Introduces a buffer before the next stage of the stream. The buffer can be used to provide a circuit breaker, or to allow a switch of consumer thread(s). |
|
Introduces a configurable buffer before the next stage of the stream. The buffer can be used to provide a circuit breaker, or to allow a switch of consumer thread(s). |
|
Register a coalescing function which aggregates one or more data events into a single data event which will be passed to the next stage of the stream. The number of events to be accumulated is either provided as a fixed number, or as the result of a function |
|
A variation of filter(Predicate) which drops data from the stream that
has already been seen. Specifically if a data element
|
|
Limits the length of the stream to the defined number of elements. Once that number of elements are received then a close event is propagated to the next stage of the stream. |
|
Limits the time that the stream will remain open
to the supplied |
|
Drops the supplied number of data events from the stream and then forwards any further data events. |
|
Remembers all items in the stream until the stream ends. At this point the data in the stream will be propagated to the next stage of the stream, either in the Natural Ordering of the elements, or in the order defined by the supplied Comparator. |
|
Tracks the time since the last event was
received. If no event is received within the supplied Duration
then an error event is propagated to the next stage of the
stream. The exception in the event will be an
|
|
window(Duration,Executor,Function) |
Collects events over the specified time-limit, passing them to the registered handler function. If no events occur during the time limit then a Collection containing no events is passed to the handler function. |
Terminal operations mark the end of a processing pipeline. Invoking a terminal operation causes the PushStream to connect to its underlying event source and begin processing.
The simplest terminal operation is the count() operation. This method returns a promise that will resolve when the stream finishes. If the stream finishes with a close event then the promise will resolve with a Long representing the number of events that reached the end of the pipeline. If the stream finishes with an error then the promise will fail with that error.
Terminal operations such as forEachEvent(PushEventConsumer) are passed a handler function which will be called for each piece of data that reaches the end of the stream. If the handler function throws an Exception then the Promise returned by the terminal operation must fail with the Exception thrown by the handler function.
Some terminal operations, like count
require the
full stream to be processed, others are able to finish before the end
of the stream. These are known as short
circuiting operations. An example of a short-circuiting
operation is findFirst(). This operation resolves the promise with the
first event that is received by the end of the pipeline. Once a
short-circuiting operation has completed it propagates negative
back-pressure through the pipeline to close the source of events. Any
subsequently received events must not affect the result and must
return negative back pressure. If an asynchronous pipeline step is
encountered, such as a buffer, the close operation is propagated back
upstream to the event source by closing previous pipeline
stages.
Table 706.3 Non Short Circuiting Terminal Operations on the Push Stream
Terminal Operation | Description |
---|---|
Uses the Java Collector API to collect the data from events into a single Collection, Map, or other type. |
|
Counts the number of events that reach the end of the stream pipeline. |
|
Register a function to be called back with the data from each event in the stream |
|
Register a |
|
Uses a Comparator to find the largest data element in the stream of data. The promise is resolved with the final result when the stream finishes. |
|
Uses a Comparator to find the smallest data element in the stream of data. The promise is resolved with the final result when the stream finishes. |
|
Uses a Binary Operator function to combine event data into a single object. The promise is resolved with the final result when the stream finishes. |
|
Collects together all of the event data in a single array which is used to resolve the returned promise. |
Table 706.4 Short Circuiting Terminal Operations on the Push Stream
Terminal Operation | Description |
---|---|
Resolves with |
|
Resolves with |
|
Resolves with an Optional representing the data from the first event that reaches the end of the pipeline. If the stream ends without any data reaching the end of the pipeline then the promise resolves with an empty Optional. |
|
Resolves with an Optional representing the data from the first event that reaches the end of the pipeline. If the stream ends without any data reaching the end of the pipeline then the promise resolves with an empty Optional. |
|
Resolves with |
Buffering and Back Pressure are an important part of asynchronous stream processing. Back pressure and buffering are therefore an important part of the push stream API.
In a synchronous model the producer's thread is held by the consumer until the consumer has finished processing the data. This is not true for asynchronous systems, and so a producer can easily overwhelm a consumer with data. Back pressure is therefore used in asynchronous systems to allow consumers to control the speed at which producers provide data.
Back pressure in the asynchronous event processing model is
provided by the PushEventConsumer. The value returned by the
accept
method of the PushEventConsumer is an indication
of the requested back pressure. A return of zero indicates that event
delivery may continue immediately. A positive return value indicates
that the source should delay sending any further events for the
requested number of milliseconds. A negative return value indicates
that no further events should be sent and that the stream can be
closed.
Back pressure in a Push Stream can also be applied mid-way through the processing pipeline through the use of the adjustBackPressure(LongUnaryOperator) or adjustBackPressure(ToLongBiFunction) methods. These methods can be used to increase or decrease the back pressure requested by later stages of the pipeline.
In asynchronous systems events may be produced and consumed at different rates. If the consumer is faster than the producer then there is no issue, however if the producer is faster than the consumer then events must be held somewhere. Back pressure provides some assistance here, however some sources do not have control over when events are produced. In these cases the data must be buffered until it can be processed.
As well as providing a queue for pending work, introducing buffers allows event processing to be moved onto a different thread, and for the number of processing threads to be changed part way through the pipeline. Buffering can therefore protect an PushEventSource from having its event generation thread “stolen” by a consumer which executes a long running operation. As a result the PushEventSource can be written more simply, without a thread switch, if a buffer is used.
Buffering also provides a “fire break” for back-pressure. Back-pressure return values propagate back along a PushStream until they reach a part of the stream that is able to respond. For some PushEventSource implementations it is not possible to slow or delay event generation, however a buffer can always respond to back pressure by not releasing events from the buffer. Buffers can therefore be used to “smooth out” sources that produce bursts of events more quickly than they can be immediately processed. This simplifies the creation of PushEventConsumer instances, which can rely on their back-pressure requests being honored.
Buffering is provided by the Push Stream using default
configuration values, either when creating the Push Stream from the
Push Stream Provider, or using the buffer
method. These
defaults are described in Building a Buffer or Push Stream.
The default configuration values can be overridden by using a BufferBuilder to explicitly provide the buffering parameters. If no Executor is provided then the PushStream will create its own internal Executor with the same number of threads as the defined parallelism. An internally created Executor will be shut down when the PushStream is closed.
Buffering policies govern the behavior of a buffer as it becomes full.
The QueuePolicy of the buffer determines what happens when the queue becomes full. Different policies may discard incoming data, evict data from the buffer, block, or throw an exception.
The QueuePolicyOption provides basic implementations of the queue policies, but custom polices can be implemented to provide more complex behaviors.
The PushbackPolicy of the buffer determines how much back pressure is requested by the buffer. Different policies may return a constant value, slowly increase the back pressure as the buffer fills, or return an exponentially increasing value when the buffer is full.
The PushbackPolicyOption provides basic implementations of the push back policies, but custom polices can be implemented to provide more complex behaviors.
The PushStreamBuilder can be obtained from a Push Stream Provider and used to customize the buffer at the start of the PushStream, or it can be used to create an unbuffered PushStream. An unbuffered PushStream uses the incoming event delivery thread to process the events, and therefore users must be careful not to block the thread, or perform long-running tasks. The default configuration building a Push Stream is as follows:
A Push Stream also requires a timer and an executor. For a new
Push Stream the Push Stream Provider must create a new fixed pool of
worker threads with the same size as the parallelism. The Push Stream
Provider may create a new ScheduledExecutorService
for
each new Push Stream, or reuse a common Scheduler. When adding a
buffer to an existing Push Stream the existing executor and timer used
by the Push Stream are reused by default. The builder of the
Buffer/Push Stream may provide their own executor and timer using the
withExecutor(Executor) and withScheduler(ScheduledExecutorService) methods
Buffering is a powerful tool in event processing pipelines, however it cannot help in the situation where the average event production rate is higher than the average processing rate. Rather than having an infinitely growing buffer a circuit breaker is used. A circuit breaker is a buffer which fails the stream when the buffer is full. This halts event processing and prevents the consuming system from being overwhelmed.
The default policy for push stream buffers is the FAIL policy, which means that push stream buffers are all circuit breakers by default.
Sometimes the processing that needs to be performed on an event is long-running. An important part of the asynchronous eventing model is that callbacks are short and non-blocking, which means that these callbacks should not run using the primary event thread. One solution to this is to buffer the stream, allowing a thread handoff at the buffer and limiting the impact of the long-running task. Buffering, however, has other consequences, and so it may be the case that a simple thread hand-off is preferable.
Forking allows users to specify a maximum number of concurrent downstream operations. Incoming events will block if this limit has been reached. If there are blocked threads then the returned back pressure for an event will be equal to the number of queued threads multiplied by the supplied timeout value. If there are no blocked threads then the back pressure will be zero.
Coalescing and windowing are both processes by which multiple incoming data events are collapsed into a single outgoing event.
There are two main ways to coalesce a stream.
The first mechanism delegates all responsibility to the
coalescing function, which returns an Optional
. The
coalescing function is called for every data event, and returns an
optional which either has a value, or is empty. If the optional has a
value then this value is passed to the next stage of the processing
pipeline. If the optional is empty then no data event is passed to the
next stage.
The second mechanism allows the stream to be configured with a (potentially variable) buffer size. The stream then stores values into this buffer. When the buffer is full then the stream passes the buffer to the handler function, which returns data to be passed to the next stage. If the stream finishes when a buffer is partially filled then the partially filled buffer will be passed to the handler function.
When coalescing events there is no opportunity for feedback from the event handler while the events are being buffered. As a result back pressure from the handler is zero except when the event triggers a call to the next stage. When the next stage is triggered the back pressure from that stage is returned.
Windowing is similar to coalescing, the primary difference between coalescing and windowing is the way in which the next stage of processing is triggered. A coalescing stage collects events until it has the correct number and then passes them to the handler function, regardless of how long this takes. A windowing stage collects events for a given amount of time, and then passes the collected events to the handler function, regardless of how many events are collected.
To avoid the need for a potentially infinite buffer a windowing stage may also place a limit on the number of events to be buffered. If this limit is reached then the window finishes early and the buffer is passed to the client, just like a coalescing stage. In this mode of operation the handler function is also passed the length of time for which the window lasted.
As windowing requires the collected events to be delivered asynchronously there is no opportunity for back-pressure from the previous stage to be applied upstream. Windowing therefore returns zero back-pressure in all cases except when a buffer size limit has been declared and is reached. If a window size limit is reached then the windowing stage returns the remaining window time as back pressure. Applying back pressure in this way means that the event source will tend not to repeatedly over saturate the window.
Merging and Splitting are actions that can be used to combine push streams, or to convert one stream into many streams.
A client may need to consume data from more than one Event Sources. In this case the PushStream may be used to merge two event streams. The returned stream will receive events from both parent streams, but will only close when both parent streams have delivered terminal events.
Sometimes it is desirable to split a stream into multiple parallel pipelines. These pipelines are independent from the point at which they are split, but share the same source and upstream pipeline.
Splitting a stream is possible using the
split(Predicate<? super T >... predicates)
method.
For each predicate a PushStream will be returned that receives the
events accepted by the predicate.
The lifecycle of a split stream differs from that of a normal stream in two key ways:
-
The stream will begin event delivery when any of the downstream handlers encounters a terminal operation
-
The stream will only close when all of the downstream handlers are closed
An important difference between Push Streams and Java 8 Streams is that events occur over time, there are therefore some operations that do not apply to Java 8 Streams which are relevant to Push Streams.
The limit()
operation on a Stream can be used to
limit the number of elements that are processed, however on a Push
Stream that number of events may never be reached, even though the
stream has not closed. Push Streams therefore also have a
limit
method which takes a Duration
. This
duration limits the time for which the stream is open, closing it after
the duration has elapsed.
The timeout
operation of a Push Stream can be used to
end a stream if no events are received for the given amount of time. If
an event is received then this resets the timeout counter. The timeout
operation is therefore a useful mechanism for identifying pipelines
which have stalled in their processing. If the timeout expires then it
propagates an error event to the next stage of the pipeline. The
Exception in the error event is an
org.osgi.util.promise.TimeoutException
.
A PushStream represents a stage in the processing pipeline
and is AutoCloseable
. When the close() method is invoked it will not, in general,
coincide with the processing of an event. The closing of a stream in
this way must therefore do the following things:
-
Send a close event downstream to close the stream
-
Discard events subsequently received by this pipeline stage, and return negative backpressure for any that do arrive at this pipeline stage.
-
Propagate the close operation upstream until the
AutoCloseable
returned by the open(PushEventConsumer) method is closed.
The result of this set of operations must be that all stages of the pipeline, including the connection to the PushEventSource, are eagerly closed. This may be as a result of receiving a close event, negative back pressure, or the close call being propagated back up the pipeline, but it must not wait for the next event. For example, if an event is produced every ten minutes and the stream is closed one minute after an event is created then it must not take a further nine minutes to close the connection to the Push Event Source.
The PushStreamProvider
can be used to assist with a
variety of asynchronous event handling use cases. A Push Stream Provider
can create Push Stream instances from a Push Event Source, it can buffer
an Push Event Consumer, or it can turn a Push Stream into a reusable Push
Event Source.
The Push Stream Provider allows several types of buffered objects
to be created. By default all Push Streams are created with a buffer,
but other objects can also be wrapped in a buffer. For example a Push
Event Consumer can be wrapped in a buffer to isolate it from a Push
Event Source. The SimplePushEventSource
also has a buffer,
which is used to isolate the event producing thread from event
consumers.
In all cases buffers are configured using a
BufferBuilder
with the following defaults:
-
A parallelism of one
-
A
FAIL
QueuePolicy -
A
LINEAR
PushbackPolicy with a maximum pushback of one second -
A Buffer with a capacity of 32 elements
A Buffer requires a timer and an executor. If no Executor is
provided when creating a buffer then the buffer will have its own
internal Executor with the same number of threads as the defined
parallelism. The Push Stream Provider may create a new
ScheduledExecutorService
for each buffer, or reuse a common
Scheduler. The builder of the Buffer may provide their own executor and
timer using the withExecutor(Executor) and withScheduler(ScheduledExecutorService) methods
Any internally created Executor will be shut down after the buffer has processed a terminal event.
There are a number of scenarios where an application developer may wish to convert between a Java 8 Stream and a PushStream. In particular, the flatMap(Function) operation of a Push Stream takes a single event and converts it into many events in a Push Stream. Common operations, such as splitting the event into child events will result in a Java Collection, or a Java 8 Stream. These need to be converted into a Push Stream before they can be returned from the flatMap operation.
To assist this model the PushStreamProvider provides two
streamOf
methods. These convert a Java 8 Stream into a Push
Stream, changing the pull-based model of Java 8 Streams into the
asynchronous model of the Push Stream.
The first streamOf(Stream) method takes a Java 8 Stream. The PushStream created by this method is not fully asynchronous, it uses the connecting thread to consume the Java 8 Stream. As a result the streams created using this method will block terminal operations. This method should therefore not normally be used for infinite event streams, but instead for short, finite streams of data that can be processed rapidly, for example as the result of a flatmapping operation. In this scenario reusing the incoming thread improves performance. In the following example an incoming list of URLs is registered for download.
PushStreamProvider psp = new PushStreamProvider();
PushStream<List<URL>> urls = getURLStream();
urls.flatMap(l -> psp.streamOf(l.stream()))
.forEach(url -> registerDownload(url));
For larger Streams of data, or when truly asynchronous operation
is required, there is a second streamOf(Executor,ScheduledExecutorService,Stream) method which allows for asynchronous consumption
of the stream. The Executor is used to consume elements from the Java 8
Stream using a single task. This mode of operation is suitable for use
with infinite data streams, or for streams which require a truly
asynchronous mode of operation, and does not require the stream to be
parallel. If null
is passed for the Executor
then the PushStreamProvider will create a fixed thread pool of size 2.
This allows for work to continue in the Push Stream even if the
passed-in Stream blocks the consuming thread. If null
is
passed for the ScheduledExecutor
then the Push Stream
Provider may create a new scheduler or use a shared default.
The PushEventSource and PushEventConsumer are both functional interfaces, however it is noticeably harder to implement a PushEventSource than a PushEventConsumer. A PushEventSource must be able to support multiple independently closeable consumer registrations, all of which are providing potentially different amounts of back pressure.
To simplify the case where a user wishes to write a basic event source the PushStreamProvider is able to create a SimplePushEventSource. The SimplePushEventSource handles the details of implementing PushEventSource, providing a simplified API for the event producing code to use.
Events can be sent via the Simple Push Event Source publish(T) method at any time until it is closed. These events may be silently ignored if no consumer is connected, but if one or more consumers are connected then the event will be asynchronously delivered to them.
Close or error events can be sent equally easily using the endOfStream() and error(Throwable) methods. These will send disconnection events to all of the currently connected consumers and remove them from the Simple Push Event Source. Note that sending these events does not close the Simple Push Event Source. Subsequent connection attempts will succeed, and events can still be published.
In addition to the publication methods the Simple Push Event
Source provides isConnected()
and
connectPromise()
methods. The isConnected method gives a
point-in-time snapshot of whether there are any connections to the
Simple Push Event Source. If this method returns false then the event
producer may wish to avoid creating the event, particularly if it is
computationally expensive to do so. The connectPromise method returns a
Promise representing the current connection state. This Promise resolves
when there is a client connected (which means it may be resolved
immediately as it is created). If the Simple Push Event Source is closed
before the Promise resolves then the Promise is failed with an
IllegalStateException. The connect Promise can be used to trigger the
initialization of an event thread, allowing lazier startup.
PushStreamProvider psp = new PushStreamProvider();
SimplePushEventSource<Long> ses = psp.createSimpleEventSource(Long.class))
Success<Void,Void> onConnect = p -> {
new Thread(() -> {
long counter = 0;
// Keep going as long as someone is listening
while (ses.isConnected()) {
ses.publish(++counter);
Thread.sleep(100);
System.out.println("Published: " + counter);
}
// Restart delivery when a new listener connects
ses.connectPromise().then(onConnect);
}).start();
return null;
};
// Begin delivery when someone is listening
ses.connectPromise().then(onConnect);
// Create a listener which prints out even numbers
psp.createStream(ses).
filter(l -> l % 2L == 0).
limit(5000L).
forEach(f -> System.out.println("Consumed event: " + f));
The Push Stream API does not define any OSGi services nor does the API perform any privileged actions. Therefore, it has no security considerations.
Push Stream Package Version 1.0.
Bundles wishing to use this package must list the package in the Import-Package header of the bundle's manifest.
Example import for consumers using the API in this package:
Import-Package: org.osgi.util.pushstream; version="[1.0,2.0)"
Example import for providers implementing the API in this package:
Import-Package: org.osgi.util.pushstream; version="[1.0,1.1)"
-
BufferBuilder
- Create a buffered section of a Push-based stream -
PushbackPolicy
- A PushbackPolicy is used to calculate how much back pressure to apply based on the current buffer. -
PushbackPolicyOption
- PushbackPolicyOption provides a standard set of simple PushbackPolicy implementations. -
PushEvent
- A PushEvent is an immutable object that is transferred through a communication channel to push information to a downstream consumer. -
PushEvent.EventType
- The type of a PushEvent. -
PushEventConsumer
- An Async Event Consumer asynchronously receives Data events until it receives either a Close or Error event. -
PushEventSource
- An event source. -
PushStream
- A Push Stream fulfills the same role as the Java 8 stream but it reverses the control direction. -
PushStreamBuilder
- A Builder for a PushStream. -
PushStreamProvider
- A factory for PushStream instances, and utility methods for handling PushEventSources and PushEventConsumers -
QueuePolicy
- A QueuePolicy is used to control how events should be queued in the current buffer. -
QueuePolicyOption
- QueuePolicyOption provides a standard set of simple QueuePolicy implementations. -
SimplePushEventSource
- A SimplePushEventSource is a helper that makes it simpler to write a PushEventSource.
The type of object being built
The type of objects in the PushEvent
The type of the Queue used in the user specified buffer
Create a buffered section of a Push-based stream
Consumers of this API must not implement this type
The BlockingQueue implementation to use as a buffer
this builder
Set the Executor that should be used to deliver events from this buffer
this builder
Set the maximum permitted number of concurrent event deliveries allowed from this buffer
this builder
The type of the data
The type of the queue
A PushbackPolicy is used to calculate how much back pressure to apply based on the current buffer. The PushbackPolicy will be called after an event has been queued, and the returned value will be used as back pressure.
PushbackPolicyOption provides a standard set of simple PushbackPolicy implementations.
Returns zero back pressure until the buffer is full, then it returns a fixed value
Returns zero back pressure until the buffer is full, then it returns an exponentially increasing amount, starting with the supplied value and doubling it each time. Once the buffer is no longer full the back pressure returns to zero.
Returns zero back pressure when the buffer is empty, then it returns a linearly increasing amount of back pressure based on how full the buffer is. The maximum value will be returned when the buffer is full.
<T, U extends BlockingQueue<PushEvent<? extends T>>>
Create a PushbackPolicy instance configured with a base back pressure time in nanoseconds The actual backpressure returned will vary based on the selected implementation, the base value, and the state of the buffer.
A PushbackPolicy to use
The payload type of the event.
A PushEvent is an immutable object that is transferred through a communication channel to push information to a downstream consumer. The event has three different types:
-
EventType.DATA – Provides access to a typed data element in the stream.
-
EventType.CLOSE – The stream is closed. After receiving this event, no more events will follow.
-
EventType.ERROR – The stream ran into an unrecoverable problem and is sending the reason downstream. The stream is closed and no more events will follow after this event.
Immutable
Consumers of this API must not implement this type
<T>
The payload type.
Create a new close event.
A new close event.
<T>
The payload type.
The payload.
Create a new data event.
A new data event wrapping the specified payload.
<T>
The payload type.
The error.
Create a new error event.
A new error event with the specified error.
Return the data for this event.
The data payload.
IllegalStateException
– if this event is not a
EventType.DATA event.
Return the error that terminated the stream.
The error that terminated the stream.
IllegalStateException
– if this event is not an
EventType.ERROR event.
Get the type of this event.
The type of this event.
Answer if no more events will follow after this event.
false
if this is a data event, otherwise true
.
<X>
The new payload type.
Convenience to cast a close/error event to another payload type. Since the payload type is not needed for these events this is harmless. This therefore allows you to forward the close/error event downstream without creating anew event.
The current error or close event mapped to a new payload type.
IllegalStateException
– if the event is a EventType.DATA
event.
The type of a PushEvent.
An error event that indicates streaming has failed and that no more events will arrive
The type for the event payload
An Async Event Consumer asynchronously receives Data events until it receives either a Close or Error event.
If ABORT is used as return value, the sender should close the channel all the way to the upstream source. The ABORT will not guarantee that no more events are delivered since this is impossible in a concurrent environment. The consumer should accept subsequent events and close/clean up when the Close or Error event is received. Though ABORT has the value -1, any value less than 0 will act as an abort.
A 0 indicates that the consumer is willing to receive subsequent events at full speeds. Any value more than 0 will indicate that the consumer is becoming overloaded and wants a delay of the given milliseconds before the next event is sent. This allows the consumer to pushback the event delivery speed.
The event
Accept an event from a source. Events can be delivered on multiple threads simultaneously. However, Close and Error events are the last events received, no more events must be sent after them.
less than 0 means abort, 0 means continue, more than 0 means delay ms
Exception
– to indicate that an error has occurred and that no
further events should be delivered to this
PushEventConsumer
The payload type
An event source. An event source can open a channel between a source and a consumer. Once the channel is opened (even before it returns) the source can send events to the consumer. A source should stop sending and automatically close the channel when sending an event returns a negative value, see PushEventConsumer.ABORT. Values that are larger than 0 should be treated as a request to delay the next events with those number of milliseconds.
the consumer (not null)
Open the asynchronous channel between the source and the consumer. The call returns an AutoCloseable. This can be closed, and should close the channel, including sending a Close event if the channel was not already closed. The returned object must be able to be closed multiple times without sending more than one Close events.
a AutoCloseable that can be used to close the stream
Exception
–
The Payload type
A Push Stream fulfills the same role as the Java 8 stream but it reverses the control direction. The Java 8 stream is pull based and this is push based. A Push Stream makes it possible to build a pipeline of transformations using a builder kind of model. Just like streams, it provides a number of terminating methods that will actually open the channel and perform the processing until the channel is closed (The source sends a Close event). The results of the processing will be send to a Promise, just like any error events. A stream can be used multiple times. The Push Stream represents a pipeline. Upstream is in the direction of the source, downstream is in the direction of the terminating method. Events are sent downstream asynchronously with no guarantee for ordering or concurrency. Methods are available to provide serialization of the events and splitting in background threads.
Consumers of this API must not implement this type
Changes the back-pressure propagated by this pipeline stage.
The supplied function receives the back pressure returned by the next pipeline stage and returns the back pressure that should be returned by this stage. This function will not be called if the previous pipeline stage returns negative back pressure.
Builder style (can be a new or the same object)
Changes the back-pressure propagated by this pipeline stage.
The supplied function receives the data object passed to the next pipeline stage and the back pressure that was returned by that stage when accepting it. The function returns the back pressure that should be returned by this stage. This function will not be called if the previous pipeline stage returns negative back pressure.
Builder style (can be a new or the same object)
Closes the channel and resolve the promise with false when the predicate does not matches a pay load. If the channel is closed before, the promise is resolved with true.
This is a short circuiting terminal operation
A Promise that will resolve when an event fails to match the predicate, or the end of the stream is reached
Close the channel and resolve the promise with true when the predicate matches a payload. If the channel is closed before the predicate matches, the promise is resolved with false.
This is a short circuiting terminal operation
A Promise that will resolve when an event matches the predicate, or the end of the stream is reached
<R>
number of simultaneous promises to use
Nr of ms/promise that is queued back pressure
The mapping function
Asynchronously map the payload values. The mapping function returns a Promise representing the asynchronous mapping operation.
The PushStream limits the number of concurrently running mapping operations, and returns back pressure based on the number of existing queued operations.
Builder style (can be a new or the same object)
IllegalArgumentException
– if the number of threads is < 1 or
the delay is < 0
NullPointerException
– if the mapper is null
Buffer the events in a queue using default values for the queue size and other behaviors. Buffered work will be processed asynchronously in the rest of the chain. Buffering also blocks the transmission of back pressure to previous elements in the chain, although back pressure is honored by the buffer.
Buffers are useful for "bursty" event sources which produce a number of events close together, then none for some time. These bursts can sometimes overwhelm downstream event consumers. Buffering will not, however, protect downstream components from a source which produces events faster than they can be consumed. For fast sources filter(Predicate) and coalesce(int, Function) fork(int, int, Executor) are better choices.
Builder style (can be a new or the same object)
<U extends BlockingQueue<PushEvent<? extends T>>>
Build a buffer to enqueue events in a queue using custom values for the queue size and other behaviors. Buffered work will be processed asynchronously in the rest of the chain. Buffering also blocks the transmission of back pressure to previous elements in the chain, although back pressure is honored by the buffer.
Buffers are useful for "bursty" event sources which produce a number of events close together, then none for some time. These bursts can sometimes overwhelm downstream event consumers. Buffering will not, however, protect downstream components from a source which produces events faster than they can be consumed. For fast sources filter(Predicate) and coalesce(int, Function) fork(int, int, Executor) are better choices.
Buffers are also useful as "circuit breakers" in the pipeline. If a QueuePolicyOption.FAIL is used then a full buffer will trigger the stream to close, preventing an event storm from reaching the client.
A builder which can be used to configure the buffer for this pipeline stage.
Close this PushStream by sending an event of type PushEvent.EventType.CLOSE downstream. Closing a PushStream is a safe operation that will not throw an Exception.
Calling close()
on a closed PushStream has no effect.
<R>
Coalesces a number of events into a new type of event. The input events are forwarded to a accumulator function. This function returns an Optional. If the optional is present, it's value is send downstream, otherwise it is ignored.
Builder style (can be a new or the same object)
<R>
Coalesces a number of events into a new type of event. A fixed number of input events are forwarded to a accumulator function. This function returns new event data to be forwarded on.
Builder style (can be a new or the same object)
<R>
Coalesces a number of events into a new type of event. A variable number of input events are forwarded to a accumulator function. The number of events to be forwarded is determined by calling the count function. The accumulator function then returns new event data to be forwarded on.
Builder style (can be a new or the same object)
<R, A>
See Stream. Will resolve once the channel closes.
This is a terminal operation
A Promise representing the collected results
See Stream. Will resolve onces the channel closes.
This is a terminal operation
A Promise representing the number of values in the stream
Remove any duplicates. Notice that this can be expensive in a large stream since it must track previous payloads.
Builder style (can be a new or the same object)
The predicate that is tested (not null)
Only pass events downstream when the predicate tests true.
Builder style (can be a new or the same object)
Close the channel and resolve the promise with the first element. If the channel is closed before, the Optional will have no value.
This is a terminal operation
a promise
Close the channel and resolve the promise with the first element. If the channel is closed before, the Optional will have no value.
a promise
<R>
The flat map function
Flat map the payload value (turn one event into 0..n events of potentially another type).
Builder style (can be a new or the same object)
The action to perform
Execute the action for each event received until the channel is closed. This is a terminating method, the returned promise is resolved when the channel closes.
This is a terminal operation
A promise that is resolved when the channel closes.
Pass on each event to another consumer until the stream is closed.
This is a terminal operation
a promise
number of simultaneous background threads to use
Nr of ms/thread that is queued back pressure
an executor to use for the background threads.
Execute the downstream events in up to n background threads. If more requests are outstanding apply delay * nr of delayed threads back pressure. A downstream channel that is closed or throws an exception will cause all execution to cease and the stream to close
Builder style (can be a new or the same object)
IllegalArgumentException
– if the number of threads is < 1 or
the delay is < 0
NullPointerException
– if the Executor is null
Maximum number of elements has been received
Automatically close the channel after the maxSize number of elements is received.
Builder style (can be a new or the same object)
The maximum time that the stream should remain open
Automatically close the channel after the given amount of time has elapsed.
Builder style (can be a new or the same object)
<R>
The map function
Map a payload value.
Builder style (can be a new or the same object)
See Stream. Will resolve onces the channel closes.
This is a terminal operation
A Promise representing the maximum value, or null if no values are seen before the end of the stream
The source to merge in.
Merge in the events from another source. The resulting channel is not closed until this channel and the channel from the source are closed.
Builder style (can be a new or the same object)
The source to merge in.
Merge in the events from another PushStream. The resulting channel is not closed until this channel and the channel from the source are closed.
Builder style (can be a new or the same object)
See Stream. Will resolve onces the channel closes.
This is a terminal operation
A Promise representing the minimum value, or null if no values are seen before the end of the stream
Closes the channel and resolve the promise with false when the predicate matches any pay load. If the channel is closed before, the promise is resolved with true.
This is a short circuiting terminal operation
A Promise that will resolve when an event matches the predicate, or the end of the stream is reached
Will be called on close
Must be run after the channel is closed. This handler will run after the downstream methods have processed the close event and before the upstream methods have closed.
This stream
Will be called on close
Must be run after the channel is closed. This handler will run after the downstream methods have processed the close event and before the upstream methods have closed.
This stream
The identity/begin value
The accumulator
Standard reduce, see Stream. The returned promise will be resolved when the channel closes.
This is a terminal operation
A
The accumulator
Standard reduce without identity, so the return is an Optional. The returned promise will be resolved when the channel closes.
This is a terminal operation
an Optional
<U>
combines two U's into one U (for example, combine two lists)
Standard reduce with identity, accumulator and combiner. The returned promise will be resolved when the channel closes.
This is a terminal operation
The promise
Ensure that any events are delivered sequentially. That is, no overlapping calls downstream. This can be used to turn a forked stream (where for example a heavy conversion is done in multiple threads) back into a sequential stream so a reduce is simple to do.
Builder style (can be a new or the same object)
number of elements to skip
Skip a number of events in the channel.
Builder style (can be a new or the same object)
IllegalArgumentException
– if the number of events to skip is
negative
Sorted the elements, assuming that T extends Comparable. This is of course expensive for large or infinite streams since it requires buffering the stream until close.
Builder style (can be a new or the same object)
Sorted the elements with the given comparator. This is of course expensive for large or infinite streams since it requires buffering the stream until close.
Builder style (can be a new or the same object)
the predicates to test
Split the events to different streams based on a predicate. If the predicate is true, the event is dispatched to that channel on the same position. All predicates are tested for every event.
This method differs from other methods of PushStream in three significant ways:
-
The return value contains multiple streams.
-
This stream will only close when all of these child streams have closed.
-
Event delivery is made to all open children that accept the event.
streams that map to the predicates
The length of time that the stream should remain open when no events are being received.
Automatically fail the channel if no events are received for the indicated length of time. If the timeout is reached then a failure event containing a TimeoutException will be sent.
Builder style (can be a new or the same object)
Collect the payloads in an Object array after the channel is closed. This is a terminating method, the returned promise is resolved when the channel is closed.
This is a terminal operation
A promise that is resolved with all the payloads received over the channel
<A extends T>
Collect the payloads in an Object array after the channel is closed. This is a terminating method, the returned promise is resolved when the channel is closed. The type of the array is handled by the caller using a generator function that gets the length of the desired array.
This is a terminal operation
A promise that is resolved with all the payloads received over the channel
<R>
Buffers a number of events over a fixed time interval and then forwards the events to an accumulator function. This function returns new event data to be forwarded on. Note that:
-
The collection forwarded to the accumulator function will be empty if no events arrived during the time interval.
-
The accumulator function will be run and the forwarded event delivered as a different task, (and therefore potentially on a different thread) from the one that delivered the event to this PushStream.
-
Due to the buffering and asynchronous delivery required, this method prevents the propagation of back-pressure to earlier stages
Builder style (can be a new or the same object)
<R>
Buffers a number of events over a fixed time interval and then forwards the events to an accumulator function. This function returns new event data to be forwarded on. Note that:
-
The collection forwarded to the accumulator function will be empty if no events arrived during the time interval.
-
The accumulator function will be run and the forwarded event delivered by a task given to the supplied executor.
-
Due to the buffering and asynchronous delivery required, this method prevents the propagation of back-pressure to earlier stages
Builder style (can be a new or the same object)
<R>
Buffers a number of events over a variable time interval and then forwards the events to an accumulator function. The length of time over which events are buffered is determined by the time function. A maximum number of events can also be requested, if this number of events is reached then the accumulator will be called early. The accumulator function returns new event data to be forwarded on. It is also given the length of time for which the buffer accumulated data. This may be less than the requested interval if the buffer reached the maximum number of requested events early. Note that:
-
The collection forwarded to the accumulator function will be empty if no events arrived during the time interval.
-
The accumulator function will be run and the forwarded event delivered as a different task, (and therefore potentially on a different thread) from the one that delivered the event to this PushStream.
-
Due to the buffering and asynchronous delivery required, this method prevents the propagation of back-pressure to earlier stages
-
If the window finishes by hitting the maximum number of events then the remaining time in the window will be applied as back-pressure to the previous stage, attempting to slow the producer to the expected windowing threshold.
Builder style (can be a new or the same object)
<R>
Buffers a number of events over a variable time interval and then forwards the events to an accumulator function. The length of time over which events are buffered is determined by the time function. A maximum number of events can also be requested, if this number of events is reached then the accumulator will be called early. The accumulator function returns new event data to be forwarded on. It is also given the length of time for which the buffer accumulated data. This may be less than the requested interval if the buffer reached the maximum number of requested events early. Note that:
-
The collection forwarded to the accumulator function will be empty if no events arrived during the time interval.
-
The accumulator function will be run and the forwarded event delivered as a different task, (and therefore potentially on a different thread) from the one that delivered the event to this PushStream.
-
If the window finishes by hitting the maximum number of events then the remaining time in the window will be applied as back-pressure to the previous stage, attempting to slow the producer to the expected windowing threshold.
Builder style (can be a new or the same object)
The type of objects in the PushEvent
The type of the Queue used in the user specified buffer
A Builder for a PushStream. This Builder extends the support of a standard BufferBuilder by allowing the PushStream to be unbuffered.
Consumers of this API must not implement this type
Tells this PushStreamBuilder to create an unbuffered stream which delivers events directly to its consumer using the incoming delivery thread. Setting the PushStreamBuilder to be unbuffered means that any buffer, queue policy or push back policy will be ignored. Note that calling one of:
after this method will reset this builder to require a buffer.
the builder
The BlockingQueue implementation to use as a buffer
this builder
Set the Executor that should be used to deliver events from this buffer
this builder
Set the maximum permitted number of concurrent event deliveries allowed from this buffer
this builder
A factory for PushStream instances, and utility methods for handling PushEventSources and PushEventConsumers
<T, U extends BlockingQueue<PushEvent<? extends T>>>
Build a buffered PushEventConsumer with custom configuration.
The returned consumer will be buffered from the event source, and will honor back pressure requests from its delegate even if the event source does not.
Buffered consumers are useful for "bursty" event sources which produce a number of events close together, then none for some time. These bursts can sometimes overwhelm the consumer. Buffering will not, however, protect downstream components from a source which produces events faster than they can be consumed.
Buffers are also useful as "circuit breakers". If a QueuePolicyOption.FAIL is used then a full buffer will request that the stream close, preventing an event storm from reaching the client.
Note that this buffered consumer will close when it receives a terminal event, or if the delegate returns negative backpressure. No further events will be propagated after this time.
a PushEventConsumer with a buffer directly before it
<T, U extends BlockingQueue<PushEvent<? extends T>>>
Convert an PushStream into an PushEventSource. The first call to PushEventSource.open(PushEventConsumer) will begin event processing.
The PushEventSource will remain active until the backing stream is closed, and permits multiple consumers to PushEventSource.open(PushEventConsumer) it. Note that this means the caller of this method is responsible for closing the supplied stream if it is not finite in length.
Late joining consumers will not receive historical events, but will immediately receive the terminal event which closed the stream if the stream is already closed.
a PushEventSource backed by the PushStream
<T, U extends BlockingQueue<PushEvent<? extends T>>>
Build a SimplePushEventSource with the supplied type and custom buffering behaviors. The SimplePushEventSource will respond to back pressure requests from the consumers connected to it.
<T, U extends BlockingQueue<PushEvent<? extends T>>>
The source of the events
Builds a push stream with custom configuration.
The resulting PushStream may be buffered or unbuffered depending on how it is configured.
A PushStreamBuilder for the stream
<T>
Create a buffered PushEventConsumer with the default configured buffer, executor size, queue, queue policy and pushback policy. This is equivalent to calling
buildBufferedConsumer(delegate).create();
The returned consumer will be buffered from the event source, and will honor back pressure requests from its delegate even if the event source does not.
Buffered consumers are useful for "bursty" event sources which produce a number of events close together, then none for some time. These bursts can sometimes overwhelm the consumer. Buffering will not, however, protect downstream components from a source which produces events faster than they can be consumed.
a PushEventConsumer with a buffer directly before it
<T>
Convert an PushStream into an PushEventSource. The first call to PushEventSource.open(PushEventConsumer) will begin event processing. The PushEventSource will remain active until the backing stream is closed, and permits multiple consumers to PushEventSource.open(PushEventConsumer) it. This is equivalent to:
buildEventSourceFromStream(stream).create();
a PushEventSource backed by the PushStream
<T>
Create a SimplePushEventSource with the supplied type and default buffering behaviors. The SimplePushEventSource will respond to back pressure requests from the consumers connected to it. This is equivalent to:
buildSimpleEventSource(type).create();
<T>
Create a stream with the default configured buffer, executor size, queue, queue policy and pushback policy. This is equivalent to calling
buildStream(source).create();
This stream will be buffered from the event producer, and will honor back pressure even if the source does not.
Buffered streams are useful for "bursty" event sources which produce a number of events close together, then none for some time. These bursts can sometimes overwhelm downstream processors. Buffering will not, however, protect downstream components from a source which produces events faster (on average) than they can be consumed.
Event delivery will not begin until a terminal operation is reached on the chain of PushStreams. Once a terminal operation is reached the stream will be connected to the event source.
A PushStream with a default initial buffer
<T>
The items to push into the PushStream
Create an Unbuffered PushStream from a Java Stream The data from the stream will be pushed into the PushStream synchronously as it is opened. This may make terminal operations blocking unless a buffer has been added to the PushStream. Care should be taken with infinite Streams to avoid blocking indefinitely.
A PushStream containing the items from the Java Stream
<T>
The worker to use to push items from the Stream into the PushStream
The scheduler to use to trigger timed events in the PushStream
The items to push into the PushStream
Create an Unbuffered PushStream from a Java Stream The data from the stream will be pushed into the PushStream asynchronously using the supplied Executor.
A PushStream containing the items from the Java Stream
The type of the data
The type of the queue
A QueuePolicy is used to control how events should be queued in the current buffer. The QueuePolicy will be called when an event has arrived.
Enqueue the event and return the remaining capacity available for events
Exception
– If an error occurred adding the event to the queue.
This exception will cause the connection between the
PushEventSource and the PushEventConsumer to
be closed with an EventType.ERROR
QueuePolicyOption provides a standard set of simple QueuePolicy implementations.
Attempt to add the supplied event to the queue. If the queue is unable to immediately accept the value then discard the value at the head of the queue and try again. Repeat this process until the event is enqueued.
Attempt to add the supplied event to the queue, blocking until the enqueue is successful.
Attempt to add the supplied event to the queue, throwing an exception if the queue is full.
<T, U extends BlockingQueue<PushEvent<? extends T>>>
a QueuePolicy implementation
The type of the events produced by this source
A SimplePushEventSource is a helper that makes it simpler to write a PushEventSource. Users do not need to manage multiple registrations to the stream, nor do they have to be concerned with back pressure.
Consumers of this API must not implement this type
Close this source. Calling this method indicates that there will never be any more events published by it. Calling this method sends a close event to all connected consumers. After calling this method any PushEventConsumer that tries to open(PushEventConsumer) this source will immediately receive a close event, and will not see any remaining buffered events.
This method can be used to delay event generation until an event source has connected. The returned promise will resolve as soon as one or more PushEventConsumer instances have opened the SimplePushEventSource.
The returned promise may already be resolved if this SimplePushEventSource already has connected consumers. If the SimplePushEventSource is closed before the returned Promise resolves then it will be failed with an IllegalStateException.
Note that the connected consumers are able to asynchronously close their connections to this SimplePushEventSource, and therefore it is possible that once the promise resolves this SimplePushEventSource may no longer be connected to any consumers.
A promise representing the connection state of this EventSource
Close this source for now, but potentially reopen it later. Calling this method asynchronously sends a close event to all connected consumers and then disconnects them. Any events previously queued by the publish(Object) method will be delivered before this close event.
After calling this method any PushEventConsumer that wishes may open(PushEventConsumer) this source, and will receive subsequent events.
the error
Close this source for now, but potentially reopen it later. Calling this method asynchronously sends an error event to all connected consumers and then disconnects them. Any events previously queued by the publish(Object) method will be delivered before this error event.
After calling this method any PushEventConsumer that wishes may open(PushEventConsumer) this source, and will receive subsequent events.
Determine whether there are any PushEventConsumers for this PushEventSource. This can be used to skip expensive event creation logic when there are no listeners.
true if any consumers are currently connected
Asynchronously publish an event to this stream and all connected PushEventConsumer instances. When this method returns there is no guarantee that all consumers have been notified. Events published by a single thread will maintain their relative ordering, however they may be interleaved with events from other threads.
IllegalStateException
– if the source is closed