706 Push Stream Specification

706.1 Introduction

In large-scale distributed systems events are a commonly used communication mechanism for passing data and triggering behaviors. Events are typically generated asynchronously rather than at the request of the processing system, and once received an event usually undergoes some level of transformation before being stored, acted upon, or forwarded to another consumer.

Pipelines and streams are a popular and effective model for consuming and processing events, with numerous APIs providing this sort of model. One of the most well-known processing pipeline APIs is the Java 8 Streams API, which provides a functional pipeline for operating on Collections. The Streams API is inherently pull based” as it relies on iterators and spliterators to pull the next entry from the stream. This is the primary difference between synchronous and asynchronous models. In an asynchronous world events are pushed into the pipeline as they are received.

This specification defines a PushStream API which can be used on devices which support the Java 8 compact1 profile. The PushStream API defined by this specification depends on OSGi Promises but is independent of all other OSGi specifications, including the OSGi Framework, and thus can be easily used outside of the OSGi environment.

A PushStream object encapsulates a pipeline of a potentially asynchronous tasks which will be performed when an event arrives. The result of the processing pipeline is represented using a Promise object which will resolve when the result has been calculated.

PushStream capture the effects of errors, finite streams and back pressure by making these explicit in the API signatures. Errors and End of Stream conditions are represented by specific events which are pushed into the stream. Back pressure is represented by a delay value returned from the event pipeline stages.

706.1.1 Essentials

  • Common concepts - The API is inspired by the Streams API in Java 8 and uses the same basic concepts. See [1] Java 8 Stream API.

  • Independent - The design is independent of all other OSGi specifications (except for OSGi Promises) and can be used outside of an OSGi environment.

  • Asynchronous - The design is built to handle asynchronously produced events.

  • Back Pressure - The design provides a means for event pipelines to communicate back-pressure to the Event Source.

  • Complete - The design provides a very complete set of operations for PushStreams which are primitives that can be used to address most use cases.

  • Generified - Generics are used to promote type safety.

706.1.2 Entities

  • Push Event Source - A PushEventSource object represents a source of asynchronous events, and can be used to create a PushStream.

  • Push Event Consumer - A Push Event Consumer object represents a sink for asynchronous events, and can be attached to a PushEventSource or a PushStream.

  • Push Stream - A PushStream object represents a pipeline for processing asynchronous events.

  • Terminal Operation - The final operation of a PushStream pipeline results in a Promise which represents the completion state of the pipeline. The operation also begins the processing of events.

706.2 Asynchronous Event Streams

The Push Stream API is built upon the principals of Asynchronous Event streams, and therefore requires three basic primitives:

  • An event object

  • A source of event objects

  • A consumer of event objects

706.2.1 The Push Event

The PushEvent is an object representing an event. Every Push Event has an event type, which has one of three values:

  • DATA - A data event encapsulates a typed object

  • ERROR - An error event encapsulates an exception and indicates a failure in the event stream.

  • CLOSE - A close event represents the end of the stream of events.

An event stream consists of zero or more data events followed by a terminal event. A terminal event is either an error or a close, and it indicates that there will be no more events in this stream. Depending on the reason for the terminal event it may be possible to re-attach to the event source and consume more events.

706.2.2 The Push Event Source

A Push Event Source object represents a source of asynchronous Push Events. The event source defines a single method open(PushEventConsumer) which can be used to connect to the source and begin receiving a stream of events.

The open method of the Push Event Source returns an AutoCloseable which can be used to close the event stream. If the close method is called on this object then the stream is terminated by sending a close event. If additional calls are made to the close method then they return without further action.

706.2.3 The Push Event Consumer

A Push Event Consumer object represents a sink for asynchronous Push Events. The event consumer defines a single method accept(PushEvent) which can be used to receive a stream of events.

The accept method of the Push Event Consumer returns a long representing back pressure. Back pressure is described in detail in Back pressure. If the returned long is negative then the event stream should be closed by the event source.

706.2.4 Closing the Event Stream

There are three ways in which a stream of events can complete normally.

  • The Push Event Source may close the stream at any time by sending a terminal event to the consumer. Upon receiving a terminal event the consumer should clean up any resources and not expect to receive further messages. Note that in a multi-threaded system the consumer may receive events out of order, and in this case data events may be received after a terminal event. Event processors should be careful to ignore data events that occur after terminal events, and to ensure that any downstream consumers receive any pending data events before forwarding the terminal event.

  • The open method of the Push Event Source returns an AutoCloseable which can be used to close the event stream. If the close method is called on this object then the stream is terminated by sending a close event. If additional calls are made to the close method then they return without action. If the close method is called after a terminal event has been sent for any other reason then it must return without action.

  • The accept method of the Push Event Consumer returns a long indicating back pressure. If the long is negative then the event source must close the stream by sending a close event.

706.3 The Push Stream

Simple event passing can be achieved by connecting a Push Event Consumer directly to a Push Event Source, however this model forces a large amount of flow-control and resource management into a single location. Furthermore it is difficult to reuse business logic across different event streams.

The PushStream provides a powerful, flexible pipeline for event processing. The Push Stream API shares many concepts with the Java 8 Streams API, in particular Push Streams are lazy, they may not consume the entire event stream, and they can be composed from functional steps.

706.3.1 Simple Pipelines

A Push Stream can be created from a Push Event Source by using a PushStreamProvider. A Push Stream represents a stage in an event processing pipeline. The overall pipeline is constructed from zero or more intermediate operations, and completed with a single terminal operation.

Each intermediate operation returns a new Push Stream object chained to the previous pipeline step. Once a Push Stream object has had an intermediate operation invoked on it then it may not have any other operations chained to it. Terminal operations are either void, or return a Promise representing the future result of the pipeline. These API patterns allow Push Streams to be built using a fluent API.

Push Stream instances are lazy, and so the Push Stream will not be connected to the Push Event Source until a terminal operation is invoked on the Push Stream. This means that a push stream object can be safely built without events being received when the pipeline is partially initialized.

706.3.1.1 Mapping, Flat Mapping and Filtering

The simplest intermediate operations on a Push Stream are mapping and filtering. These operations use stateless, non-interfering functions to alter the data received by the next stage in the pipeline.

706.3.1.1.1 Mapping

Mapping is the act of transforming an event from one type into another. This may involve taking a field from the object, or performing some simple processing on it. When mapping there is an one to one relationship between input and output events, that is, each input event is mapped to exactly one output event.

    PushStream<String> streamOfStrings = getStreamOfStrings();
    
    PushStream<Integer> streamOfLengths = 
            streamOfStrings.map(String::length);

If the mapping function throws an Exception then an Error Event is propagated down the stream to the next pipeline step. The failure in the error event is set to the Exception thrown by the mapping function. The current pipeline step is also closed, and the close operation is propagated back upstream to the event source by closing previous pipeline stages. Any subsequently received events must not be propagated and must return negative back pressure.

706.3.1.1.2 Flat Mapping

Flat Mapping is the act of transforming an event from one type into multiple events of another type. This may involve taking fields from an object, or performing some simple processing on it. When flat mapping there is a one to many relationship between input and output events, that is, each input event is mapped to zero or more output events.

A flat mapping function should asynchronously consume the event data and return a Push Stream containing the flow of subsequent events.

    PushStream<String> streamOfStrings = getStreamOfStrings();
    
    PushStream<Character> streamOfCharacters = 
            streamOfStrings.flatMap(s -> {
                    SimplePushEventSource<Character> spes = 
                            getSimplePushEventSource();
                    
                    spes.connectPromise()
                        .onResolve(() ->
                            executor.execute(() -> {
                                    for(int i = 0; i < s.length; i++) {
                                        spes.publish(s.charAt(i));
                                    }
                                });
                    return pushStreamProvider.createStream(spes);
                });

If the flat mapping function throws an Exception then an Error Event is propagated down the stream to the next pipeline step. The failure in the error event is set to the Exception thrown by the mapping function. The current pipeline step is also closed, and the close operation is propagated back upstream to the event source by closing previous pipeline stages. Any subsequently received events must not be propagated and must return negative back pressure.

706.3.1.1.3 Filtering

Filtering is the act of removing events from the stream based on some characteristic of the event data. This may involve inspecting the fields of the data object, or performing some simple processing on it. If the filter function returns true for an event then it will be passed to the next stage of the pipeline. If the filter function returns false then it will be discarded, and not passed to the next pipeline stage.

    PushStream<String> streamOfStrings = getStreamOfStrings();
    
    PushStream<String> filteredStrings = 
            streamOfStrings.filter(s -> s.length() == 42);

If the filtering function throws an Exception then an Error Event is propagated down the stream to the next pipeline step. The failure in the error event is set to the Exception thrown by the filter function. The current pipeline step is also closed, and the close operation is propagated back upstream to the event source by closing previous pipeline stages. Any subsequently received events must not be propagated and must return negative back pressure.

706.3.1.1.4 Asynchronous Mapping

Mapping operations may sometimes take time to calculate their results. PushStream operations should, in general be fast and non-blocking and so long-running mapping operations should be run on a separate thread. The asyncMap(int,int,Function) operation allows the mapping function to return a Promise representing the ongoing calculation of the mapped value. When this promise resolves then its value will be passed to the next pipeline stage.

As asynchronous mapping operations are long-running they require back pressure to be generated as the number of running operations increases. The amount of back pressure returned is equal to the number of pending promises (aside from the mapping operation that has just started) plus the number of waiting threads if the maximum number of concurrent promises has been reached. The returned back pressure when only a single promise is running is therefore always zero.

706.3.1.2 Stateless and Stateful Intermediate Operations

Intermediate operations are either stateless or stateful. Stateless operations are ones where the pipeline stage does not need to remember the previous data from the stream. Mapping, Flat Mapping and Filtering are all stateless operations. The following table lists the stateless operations on the Push Stream.

Table 706.1 Stateless Intermediate Operations on the Push Stream

Intermediate Operation Description

adjustBackPressure(LongUnaryOperator)

adjustBackPressure(ToLongBiFunction)

Register a transformation function to adjust the back pressure returned by the previous entry in the stream. The result of this function will be returned as back pressure.

asyncMap(int,int,Function)

Register a mapping function which will asynchronously calculate the value to be passed to the next stage of the stream. The returned back pressure is equal to one less than the number of outstanding promises, plus the number of queued threads, multiplied by the delay value.

filter(Predicate)

Register a selection function to be called with each data event in the stream. If the function returns true then the event will propagated, if false then the event will dropped from the stream.

flatMap(Function)

Register a transformation function to be called with each data event in the stream. Each incoming data element is converted into a stream of elements. The transformed data is then propagated to the next stage of the stream.

fork(int,int,Executor)

Pushes event processing onto one or more threads in the supplied Executor returning a fixed back pressure

map(Function)

Register a transformation function to be called with each data event in the stream. The transformed data is propagated to the next stage of the stream.

merge(PushStream)

Merges this stream and another stream into a single stream. The returned stream will not close until both parent streams are closed.

sequential()

Forces data events to be delivered sequentially to the next stage of the stream. Events may be delivered on multiple threads, but will not arrive concurrently at the next stage of the pipeline.

split(Predicate...)

Register a set of filter functions to select elements that should be forwarded downstream. The returned streams correspond to the supplied filter functions.


Stateful operations differ from stateless operations in that they must remember items from the stream. Sometimes stateful operations must remember large numbers of events, or even the entire stream. For example the distinct operation remembers the identity of each entry in the stream, and filters out duplicate events.

Care should be taken when using Stateful operations with large or infinite streams. For example the sorted operation must process the entire stream until it receives a close event. At this point the events can be sorted and delivered in order. It is usually a good idea to use the limit operation to restrict the length of the stream before performing a stateful operation which must remember many elements.

The following table lists all of the stateful operations of the PushStream.

Table 706.2 Stateful Intermediate Operations on the Push Stream

Intermediate Operation Description

buffer()

Introduces a buffer before the next stage of the stream. The buffer can be used to provide a circuit breaker, or to allow a switch of consumer thread(s).

buildBuffer()

Introduces a configurable buffer before the next stage of the stream. The buffer can be used to provide a circuit breaker, or to allow a switch of consumer thread(s).

coalesce(Function)

coalesce(int,Function)

coalesce(IntSupplier,Function)

Register a coalescing function which aggregates one or more data events into a single data event which will be passed to the next stage of the stream.

The number of events to be accumulated is either provided as a fixed number, or as the result of a function

distinct()

A variation of filter(Predicate) which drops data from the stream that has already been seen. Specifically if a data element equals an element which has previously been seen then it will be dropped. This stateful operation must remember all data that has been seen.

limit(long)

Limits the length of the stream to the defined number of elements. Once that number of elements are received then a close event is propagated to the next stage of the stream.

limit(Duration)

Limits the time that the stream will remain open to the supplied Duration. Once that time has elapsed then a close event is propagated to the next stage of the stream.

skip(long)

Drops the supplied number of data events from the stream and then forwards any further data events.

sorted()

sorted(Comparator)

Remembers all items in the stream until the stream ends. At this point the data in the stream will be propagated to the next stage of the stream, either in the Natural Ordering of the elements, or in the order defined by the supplied Comparator.

timeout(Duration)

Tracks the time since the last event was received. If no event is received within the supplied Duration then an error event is propagated to the next stage of the stream. The exception in the event will be an org.osgi.util.promise.TimeoutException.

window(Duration,Function)

window(Duration,Executor,Function)

window(Supplier,IntSupplier,BiFunction)

window(Supplier,IntSupplier,Executor,BiFunction)

Collects events over the specified time-limit, passing them to the registered handler function. If no events occur during the time limit then a Collection containing no events is passed to the handler function.


706.3.1.3 Terminal Operations

Terminal operations mark the end of a processing pipeline. Invoking a terminal operation causes the PushStream to connect to its underlying event source and begin processing.

The simplest terminal operation is the count() operation. This method returns a promise that will resolve when the stream finishes. If the stream finishes with a close event then the promise will resolve with a Long representing the number of events that reached the end of the pipeline. If the stream finishes with an error then the promise will fail with that error.

Terminal operations such as forEachEvent(PushEventConsumer) are passed a handler function which will be called for each piece of data that reaches the end of the stream. If the handler function throws an Exception then the Promise returned by the terminal operation must fail with the Exception thrown by the handler function.

Some terminal operations, like count require the full stream to be processed, others are able to finish before the end of the stream. These are known as short circuiting operations. An example of a short-circuiting operation is findFirst(). This operation resolves the promise with the first event that is received by the end of the pipeline. Once a short-circuiting operation has completed it propagates negative back-pressure through the pipeline to close the source of events. Any subsequently received events must not affect the result and must return negative back pressure. If an asynchronous pipeline step is encountered, such as a buffer, the close operation is propagated back upstream to the event source by closing previous pipeline stages.

Table 706.3 Non Short Circuiting Terminal Operations on the Push Stream

Terminal Operation Description

collect(Collector)

Uses the Java Collector API to collect the data from events into a single Collection, Map, or other type.

count()

Counts the number of events that reach the end of the stream pipeline.

forEach(Consumer)

Register a function to be called back with the data from each event in the stream

forEachEvent(PushEventConsumer)

Register a PushEventConsumer to be called back with each event in the stream. If negative back-pressure is returned then the stream will be closed.

max(Comparator)

Uses a Comparator to find the largest data element in the stream of data. The promise is resolved with the final result when the stream finishes.

min(Comparator)

Uses a Comparator to find the smallest data element in the stream of data. The promise is resolved with the final result when the stream finishes.

reduce(BinaryOperator)

reduce(T,BinaryOperator)

reduce(U,BiFunction,BinaryOperator)

Uses a Binary Operator function to combine event data into a single object. The promise is resolved with the final result when the stream finishes.

toArray()

toArray(IntFunction)

Collects together all of the event data in a single array which is used to resolve the returned promise.


Table 706.4 Short Circuiting Terminal Operations on the Push Stream

Terminal Operation Description

allMatch(Predicate)

Resolves with false if any event reaches the end of the stream pipeline that does not match the predicate. If the stream ends without any data matching the predicate then the promise resolves with true

anyMatch(Predicate)

Resolves with true if any data event reaches the end of the stream pipeline and matches the supplied predicate. If the stream ends without any data matching the predicate then the promise resolves with false

findAny()

Resolves with an Optional representing the data from the first event that reaches the end of the pipeline. If the stream ends without any data reaching the end of the pipeline then the promise resolves with an empty Optional.

findFirst()

Resolves with an Optional representing the data from the first event that reaches the end of the pipeline. If the stream ends without any data reaching the end of the pipeline then the promise resolves with an empty Optional.

noneMatch(Predicate)

Resolves with false if any data event reaches the end of the stream pipeline and matches the supplied predicate. If the stream ends without any data matching the predicate then the promise resolves with true


706.3.2 Buffering, Back pressure and Circuit Breakers

Buffering and Back Pressure are an important part of asynchronous stream processing. Back pressure and buffering are therefore an important part of the push stream API.

706.3.2.1 Back pressure

In a synchronous model the producer's thread is held by the consumer until the consumer has finished processing the data. This is not true for asynchronous systems, and so a producer can easily overwhelm a consumer with data. Back pressure is therefore used in asynchronous systems to allow consumers to control the speed at which producers provide data.

Back pressure in the asynchronous event processing model is provided by the PushEventConsumer. The value returned by the accept method of the PushEventConsumer is an indication of the requested back pressure. A return of zero indicates that event delivery may continue immediately. A positive return value indicates that the source should delay sending any further events for the requested number of milliseconds. A negative return value indicates that no further events should be sent and that the stream can be closed.

Back pressure in a Push Stream can also be applied mid-way through the processing pipeline through the use of the adjustBackPressure(LongUnaryOperator) or adjustBackPressure(ToLongBiFunction) methods. These methods can be used to increase or decrease the back pressure requested by later stages of the pipeline.

706.3.2.2 Buffering

In asynchronous systems events may be produced and consumed at different rates. If the consumer is faster than the producer then there is no issue, however if the producer is faster than the consumer then events must be held somewhere. Back pressure provides some assistance here, however some sources do not have control over when events are produced. In these cases the data must be buffered until it can be processed.

As well as providing a queue for pending work, introducing buffers allows event processing to be moved onto a different thread, and for the number of processing threads to be changed part way through the pipeline. Buffering can therefore protect an PushEventSource from having its event generation thread “stolen” by a consumer which executes a long running operation. As a result the PushEventSource can be written more simply, without a thread switch, if a buffer is used.

Buffering also provides a “fire break” for back-pressure. Back-pressure return values propagate back along a PushStream until they reach a part of the stream that is able to respond. For some PushEventSource implementations it is not possible to slow or delay event generation, however a buffer can always respond to back pressure by not releasing events from the buffer. Buffers can therefore be used to “smooth out” sources that produce bursts of events more quickly than they can be immediately processed. This simplifies the creation of PushEventConsumer instances, which can rely on their back-pressure requests being honored.

Buffering is provided by the Push Stream using default configuration values, either when creating the Push Stream from the Push Stream Provider, or using the buffer method. These defaults are described in Building a Buffer or Push Stream.

The default configuration values can be overridden by using a BufferBuilder to explicitly provide the buffering parameters. If no Executor is provided then the PushStream will create its own internal Executor with the same number of threads as the defined parallelism. An internally created Executor will be shut down when the PushStream is closed.

706.3.2.3 Buffering policies

Buffering policies govern the behavior of a buffer as it becomes full.

The QueuePolicy of the buffer determines what happens when the queue becomes full. Different policies may discard incoming data, evict data from the buffer, block, or throw an exception.

The QueuePolicyOption provides basic implementations of the queue policies, but custom polices can be implemented to provide more complex behaviors.

The PushbackPolicy of the buffer determines how much back pressure is requested by the buffer. Different policies may return a constant value, slowly increase the back pressure as the buffer fills, or return an exponentially increasing value when the buffer is full.

The PushbackPolicyOption provides basic implementations of the push back policies, but custom polices can be implemented to provide more complex behaviors.

The ThresholdPushbackPolicy provides an implementation of a push back policy covering a common use case. In this use case back pressure needs to be applied only after the number of items in the buffer passes a certain threshold. Once this threshold is exceeded then the back pressure may be fixed, or may increase linearly based on the number of buffered items.

706.3.2.4 Building a Buffer or Push Stream

The PushStreamBuilder can be obtained from a Push Stream Provider and used to customize the buffer at the start of the PushStream, or it can be used to create an unbuffered PushStream. An unbuffered PushStream uses the incoming event delivery thread to process the events, and therefore users must be careful not to block the thread, or perform long-running tasks. The default configuration building a Push Stream is as follows:

  • A parallelism of one

  • A FAIL queue policy

  • A LINEAR push back policy with a maximum push back of one second

  • A Buffer with a capacity of 32 elements

A Push Stream also requires a timer and an executor. For a new Push Stream the Push Stream Provider must create a new fixed pool of worker threads with the same size as the parallelism. The Push Stream Provider may create a new ScheduledExecutorService for each new Push Stream, or reuse a common Scheduler. When adding a buffer to an existing Push Stream the existing executor and timer used by the Push Stream are reused by default. The builder of the Buffer/Push Stream may provide their own executor and timer using the withExecutor(Executor) and withScheduler(ScheduledExecutorService) methods

706.3.2.5 Circuit Breakers

Buffering is a powerful tool in event processing pipelines, however it cannot help in the situation where the average event production rate is higher than the average processing rate. Rather than having an infinitely growing buffer a circuit breaker is used. A circuit breaker is a buffer which fails the stream when the buffer is full. This halts event processing and prevents the consuming system from being overwhelmed.

The default policy for push stream buffers is the FAIL policy, which means that push stream buffers are all circuit breakers by default.

706.3.3 Forking

Sometimes the processing that needs to be performed on an event is long-running. An important part of the asynchronous eventing model is that callbacks are short and non-blocking, which means that these callbacks should not run using the primary event thread. One solution to this is to buffer the stream, allowing a thread handoff at the buffer and limiting the impact of the long-running task. Buffering, however, has other consequences, and so it may be the case that a simple thread hand-off is preferable.

Forking allows users to specify a maximum number of concurrent downstream operations. Incoming events will block if this limit has been reached. If there are blocked threads then the returned back pressure for an event will be equal to the number of queued threads multiplied by the supplied timeout value. If there are no blocked threads then the back pressure will be zero.

706.3.4 Coalescing and Windowing

Coalescing and windowing are both processes by which multiple incoming data events are collapsed into a single outgoing event.

706.3.4.1 Coalescing

There are two main ways to coalesce a stream.

The first mechanism delegates all responsibility to the coalescing function, which returns an Optional. The coalescing function is called for every data event, and returns an optional which either has a value, or is empty. If the optional has a value then this value is passed to the next stage of the processing pipeline. If the optional is empty then no data event is passed to the next stage.

The second mechanism allows the stream to be configured with a (potentially variable) buffer size. The stream then stores values into this buffer. When the buffer is full then the stream passes the buffer to the handler function, which returns data to be passed to the next stage. If the stream finishes when a buffer is partially filled then the partially filled buffer will be passed to the handler function.

When coalescing events there is no opportunity for feedback from the event handler while the events are being buffered. As a result back pressure from the handler is zero except when the event triggers a call to the next stage. When the next stage is triggered the back pressure from that stage is returned.

706.3.4.2 Windowing

Windowing is similar to coalescing, the primary difference between coalescing and windowing is the way in which the next stage of processing is triggered. A coalescing stage collects events until it has the correct number and then passes them to the handler function, regardless of how long this takes. A windowing stage collects events for a given amount of time, and then passes the collected events to the handler function, regardless of how many events are collected.

To avoid the need for a potentially infinite buffer a windowing stage may also place a limit on the number of events to be buffered. If this limit is reached then the window finishes early and the buffer is passed to the client, just like a coalescing stage. In this mode of operation the handler function is also passed the length of time for which the window lasted.

As windowing requires the collected events to be delivered asynchronously there is no opportunity for back-pressure from the previous stage to be applied upstream. Windowing therefore returns zero back-pressure in all cases except when a buffer size limit has been declared and is reached. If a window size limit is reached then the windowing stage returns the remaining window time as back pressure. Applying back pressure in this way means that the event source will tend not to repeatedly over saturate the window.

706.3.5 Merging and Splitting

Merging and Splitting are actions that can be used to combine push streams, or to convert one stream into many streams.

706.3.5.1 Merging

A client may need to consume data from more than one Event Sources. In this case the PushStream may be used to merge two event streams. The returned stream will receive events from both parent streams, but will only close when both parent streams have delivered terminal events.

706.3.5.2 Splitting

Sometimes it is desirable to split a stream into multiple parallel pipelines. These pipelines are independent from the point at which they are split, but share the same source and upstream pipeline.

Splitting a stream is possible using the split(Predicate<? super T >... predicates) method. For each predicate a PushStream will be returned that receives the events accepted by the predicate.

The lifecycle of a split stream differs from that of a normal stream in two key ways:

  • The stream will begin event delivery when any of the downstream handlers encounters a terminal operation

  • The stream will only close when all of the downstream handlers are closed

706.3.6 Time Limited Streams

An important difference between Push Streams and Java 8 Streams is that events occur over time, there are therefore some operations that do not apply to Java 8 Streams which are relevant to Push Streams.

The limit() operation on a Stream can be used to limit the number of elements that are processed, however on a Push Stream that number of events may never be reached, even though the stream has not closed. Push Streams therefore also have a limit method which takes a Duration. This duration limits the time for which the stream is open, closing it after the duration has elapsed.

The timeout operation of a Push Stream can be used to end a stream if no events are received for the given amount of time. If an event is received then this resets the timeout counter. The timeout operation is therefore a useful mechanism for identifying pipelines which have stalled in their processing. If the timeout expires then it propagates an error event to the next stage of the pipeline. The Exception in the error event is an org.osgi.util.promise.TimeoutException.

706.3.7 Closing Streams

A PushStream represents a stage in the processing pipeline and is AutoCloseable. When the close() method is invoked it will not, in general, coincide with the processing of an event. The closing of a stream in this way must therefore do the following things:

  • Send a close event downstream to close the stream

  • Discard events subsequently received by this pipeline stage, and return negative backpressure for any that do arrive at this pipeline stage.

  • Propagate the close operation upstream until the AutoCloseable returned by the open(PushEventConsumer) method is closed.

The result of this set of operations must be that all stages of the pipeline, including the connection to the PushEventSource, are eagerly closed. This may be as a result of receiving a close event, negative back pressure, or the close call being propagated back up the pipeline, but it must not wait for the next event. For example, if an event is produced every ten minutes and the stream is closed one minute after an event is created then it must not take a further nine minutes to close the connection to the Push Event Source.

706.4 The Push Stream Provider

The PushStreamProvider can be used to assist with a variety of asynchronous event handling use cases. A Push Stream Provider can create Push Stream instances from a Push Event Source, it can buffer an Push Event Consumer, or it can turn a Push Stream into a reusable Push Event Source.

706.4.1 Building Buffers

The Push Stream Provider allows several types of buffered objects to be created. By default all Push Streams are created with a buffer, but other objects can also be wrapped in a buffer. For example a Push Event Consumer can be wrapped in a buffer to isolate it from a Push Event Source. The SimplePushEventSource also has a buffer, which is used to isolate the event producing thread from event consumers.

In all cases buffers are configured using a BufferBuilder with the following defaults:

  • A parallelism of one

  • A FAIL QueuePolicy

  • A LINEAR PushbackPolicy with a maximum pushback of one second

  • A Buffer with a capacity of 32 elements

A Buffer requires a timer and an executor. If no Executor is provided when creating a buffer then the buffer will have its own internal Executor with the same number of threads as the defined parallelism. The Push Stream Provider may create a new ScheduledExecutorService for each buffer, or reuse a common Scheduler. The builder of the Buffer may provide their own executor and timer using the withExecutor(Executor) and withScheduler(ScheduledExecutorService) methods

Any internally created Executor will be shut down after the buffer has processed a terminal event.

706.4.2 Mapping between Java 8 Streams and Push Streams

There are a number of scenarios where an application developer may wish to convert between a Java 8 Stream and a PushStream. In particular, the flatMap(Function) operation of a Push Stream takes a single event and converts it into many events in a Push Stream. Common operations, such as splitting the event into child events will result in a Java Collection, or a Java 8 Stream. These need to be converted into a Push Stream before they can be returned from the flatMap operation.

To assist this model the PushStreamProvider provides two streamOf methods. These convert a Java 8 Stream into a Push Stream, changing the pull-based model of Java 8 Streams into the asynchronous model of the Push Stream.

The first streamOf(Stream) method takes a Java 8 Stream. The PushStream created by this method is not fully asynchronous, it uses the connecting thread to consume the Java 8 Stream. As a result the streams created using this method will block terminal operations. This method should therefore not normally be used for infinite event streams, but instead for short, finite streams of data that can be processed rapidly, for example as the result of a flatmapping operation. In this scenario reusing the incoming thread improves performance. In the following example an incoming list of URLs is registered for download.

PushStreamProvider psp = new PushStreamProvider();

PushStream<List<URL>> urls = getURLStream();

urls.flatMap(l -> psp.streamOf(l.stream()))
    .forEach(url -> registerDownload(url));

For larger Streams of data, or when truly asynchronous operation is required, there is a second streamOf(Executor,ScheduledExecutorService,Stream) method which allows for asynchronous consumption of the stream. The Executor is used to consume elements from the Java 8 Stream using a single task. This mode of operation is suitable for use with infinite data streams, or for streams which require a truly asynchronous mode of operation, and does not require the stream to be parallel. If null is passed for the Executor then the PushStreamProvider will create a fixed thread pool of size 2. This allows for work to continue in the Push Stream even if the passed-in Stream blocks the consuming thread. If null is passed for the ScheduledExecutor then the Push Stream Provider may create a new scheduler or use a shared default.

706.5 Simple Push Event Sources

The PushEventSource and PushEventConsumer are both functional interfaces, however it is noticeably harder to implement a PushEventSource than a PushEventConsumer. A PushEventSource must be able to support multiple independently closeable consumer registrations, all of which are providing potentially different amounts of back pressure.

To simplify the case where a user wishes to write a basic event source the PushStreamProvider is able to create a SimplePushEventSource. The SimplePushEventSource handles the details of implementing PushEventSource, providing a simplified API for the event producing code to use.

Events can be sent via the Simple Push Event Source publish(T) method at any time until it is closed. These events may be silently ignored if no consumer is connected, but if one or more consumers are connected then the event will be asynchronously delivered to them.

Close or error events can be sent equally easily using the endOfStream() and error(Throwable) methods. These will send disconnection events to all of the currently connected consumers and remove them from the Simple Push Event Source. Note that sending these events does not close the Simple Push Event Source. Subsequent connection attempts will succeed, and events can still be published.

706.5.1 Optimizing Event Creation

In addition to the publication methods the Simple Push Event Source provides isConnected() and connectPromise() methods. The isConnected method gives a point-in-time snapshot of whether there are any connections to the Simple Push Event Source. If this method returns false then the event producer may wish to avoid creating the event, particularly if it is computationally expensive to do so. The connectPromise method returns a Promise representing the current connection state. This Promise resolves when there is a client connected (which means it may be resolved immediately as it is created). If the Simple Push Event Source is closed before the Promise resolves then the Promise is failed with an IllegalStateException. The connect Promise can be used to trigger the initialization of an event thread, allowing lazier startup.

PushStreamProvider psp = new PushStreamProvider();

SimplePushEventSource<Long> ses = psp.createSimpleEventSource(Long.class))

Success<Void,Void> onConnect = p -> {
    new Thread(() -> {
        long counter = 0;
        // Keep going as long as someone is listening
        while (ses.isConnected()) {
          ses.publish(++counter);
          Thread.sleep(100);
          System.out.println("Published: " + counter);
        }
        // Restart delivery when a new listener connects
        ses.connectPromise().then(onConnect);
      }).start();
    return null;
  };

// Begin delivery when someone is listening
ses.connectPromise().then(onConnect);

// Create a listener which prints out even numbers
psp.createStream(ses).
  filter(l -> l % 2L == 0).
  limit(5000L).
  forEach(f -> System.out.println("Consumed event: " + f));

706.6 Security

The Push Stream API does not define any OSGi services nor does the API perform any privileged actions. Therefore, it has no security considerations.

706.7 org.osgi.util.pushstream

Version 1.1

Push Stream Package Version 1.1.

Bundles wishing to use this package must list the package in the Import-Package header of the bundle's manifest.

Example import for consumers using the API in this package:

Import-Package: org.osgi.util.pushstream; version="[1.1,2.0)"

Example import for providers implementing the API in this package:

Import-Package: org.osgi.util.pushstream; version="[1.1,1.2)"

706.7.1 Summary

706.7.2 public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? extends T>>>

The type of object being built

The type of objects in the PushEvent

The type of the Queue used in the user specified buffer

Create a buffered section of a Push-based stream

Consumers of this API must not implement this type

706.7.2.1 public R build()

the object being built

706.7.2.2 public BufferBuilder<R, T, U> withBuffer(U queue)

The BlockingQueue implementation to use as a buffer

this builder

706.7.2.3 public BufferBuilder<R, T, U> withExecutor(Executor executor)

Set the Executor that should be used to deliver events from this buffer

this builder

706.7.2.4 public BufferBuilder<R, T, U> withParallelism(int parallelism)

Set the maximum permitted number of concurrent event deliveries allowed from this buffer

this builder

706.7.2.5 public BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicy<T, U> pushbackPolicy)

Set the PushbackPolicy of this builder

this builder

706.7.2.6 public BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicyOption pushbackPolicyOption, long time)

Set the PushbackPolicy of this builder

this builder

706.7.2.7 public BufferBuilder<R, T, U> withQueuePolicy(QueuePolicy<T, U> queuePolicy)

Set the QueuePolicy of this Builder

this builder

706.7.2.8 public BufferBuilder<R, T, U> withQueuePolicy(QueuePolicyOption queuePolicyOption)

Set the QueuePolicy of this Builder

this builder

706.7.2.9 public BufferBuilder<R, T, U> withScheduler(ScheduledExecutorService scheduler)

Set the ScheduledExecutorService that should be used to trigger timed events after this buffer

this builder

706.7.3 public interface PushbackPolicy<T, U extends BlockingQueue<PushEvent<? extends T>>>

The type of the data

The type of the queue

A PushbackPolicy is used to calculate how much back pressure to apply based on the current buffer. The PushbackPolicy will be called after an event has been queued, and the returned value will be used as back pressure.

PushbackPolicyOption

706.7.3.1 public long pushback(U queue) throws Exception

Given the current state of the queue, determine the level of back pressure that should be applied

a back pressure value in nanoseconds

Exception– 

706.7.4 enum PushbackPolicyOption

PushbackPolicyOption provides a standard set of simple PushbackPolicy implementations.

PushbackPolicy

706.7.4.1 FIXED

Returns a fixed amount of back pressure, independent of how full the buffer is

706.7.4.2 ON_FULL_FIXED

Returns zero back pressure until the buffer is full, then it returns a fixed value

706.7.4.3 ON_FULL_EXPONENTIAL

Returns zero back pressure until the buffer is full, then it returns an exponentially increasing amount, starting with the supplied value and doubling it each time. Once the buffer is no longer full the back pressure returns to zero.

706.7.4.4 LINEAR

Returns zero back pressure when the buffer is empty, then it returns a linearly increasing amount of back pressure based on how full the buffer is. The maximum value will be returned when the buffer is full.

706.7.4.5 public abstract PushbackPolicy<T, U> getPolicy(long value)

<T, U extends BlockingQueue<PushEvent<? extends T>>>

Create a PushbackPolicy instance configured with a base back pressure time in nanoseconds The actual backpressure returned will vary based on the selected implementation, the base value, and the state of the buffer.

A PushbackPolicy to use

706.7.4.6 public static PushbackPolicyOption valueOf(String name)

706.7.4.7 public static PushbackPolicyOption[] values()

706.7.5 public abstract class PushEvent<T>

The payload type of the event.

A PushEvent is an immutable object that is transferred through a communication channel to push information to a downstream consumer. The event has three different types:

  • EventType.DATA – Provides access to a typed data element in the stream.

  • EventType.CLOSE – The stream is closed. After receiving this event, no more events will follow.

  • EventType.ERROR – The stream ran into an unrecoverable problem and is sending the reason downstream. The stream is closed and no more events will follow after this event.

Immutable

Consumers of this API must not implement this type

706.7.5.1 public static PushEvent<T> close()

<T>

The payload type.

Create a new close event.

A new close event.

706.7.5.2 public static PushEvent<T> data(T payload)

<T>

The payload type.

The payload.

Create a new data event.

A new data event wrapping the specified payload.

706.7.5.3 public static PushEvent<T> error(Throwable t)

<T>

The payload type.

The error.

Create a new error event.

A new error event with the specified error.

706.7.5.4 public T getData()

Return the data for this event.

The data payload.

IllegalStateException– if this event is not a EventType.DATA event.

706.7.5.5 public Throwable getFailure()

Return the error that terminated the stream.

The error that terminated the stream.

IllegalStateException– if this event is not an EventType.ERROR event.

706.7.5.6 public abstract PushEvent.EventType getType()

Get the type of this event.

The type of this event.

706.7.5.7 public boolean isTerminal()

Answer if no more events will follow after this event.

false if this is a data event, otherwise true.

706.7.5.8 public PushEvent<X> nodata()

<X>

The new payload type.

Convenience to cast a close/error event to another payload type. Since the payload type is not needed for these events this is harmless. This therefore allows you to forward the close/error event downstream without creating anew event.

The current error or close event mapped to a new payload type.

IllegalStateException– if the event is a EventType.DATA event.

706.7.6 enum PushEvent.EventType

The type of a PushEvent.

706.7.6.1 DATA

A data event forming part of the stream

706.7.6.2 ERROR

An error event that indicates streaming has failed and that no more events will arrive

706.7.6.3 CLOSE

An event that indicates that the stream has terminated normally

706.7.6.4 public static PushEvent.EventType valueOf(String name)

706.7.6.5 public static PushEvent.EventType[] values()

706.7.7 public interface PushEventConsumer<T>

The type for the event payload

An Async Event Consumer asynchronously receives Data events until it receives either a Close or Error event.

706.7.7.1 public static final long ABORT = -1L

If ABORT is used as return value, the sender should close the channel all the way to the upstream source. The ABORT will not guarantee that no more events are delivered since this is impossible in a concurrent environment. The consumer should accept subsequent events and close/clean up when the Close or Error event is received. Though ABORT has the value -1, any value less than 0 will act as an abort.

706.7.7.2 public static final long CONTINUE = 0L

A 0 indicates that the consumer is willing to receive subsequent events at full speeds. Any value more than 0 will indicate that the consumer is becoming overloaded and wants a delay of the given milliseconds before the next event is sent. This allows the consumer to pushback the event delivery speed.

706.7.7.3 public long accept(PushEvent<? extends T> event) throws Exception

The event

Accept an event from a source. Events can be delivered on multiple threads simultaneously. However, Close and Error events are the last events received, no more events must be sent after them.

less than 0 means abort, 0 means continue, more than 0 means delay ms

Exception– to indicate that an error has occurred and that no further events should be delivered to this PushEventConsumer

706.7.8 public interface PushEventSource<T>

The payload type

An event source. An event source can open a channel between a source and a consumer. Once the channel is opened (even before it returns) the source can send events to the consumer. A source should stop sending and automatically close the channel when sending an event returns a negative value, see PushEventConsumer.ABORT. Values that are larger than 0 should be treated as a request to delay the next events with those number of milliseconds.

706.7.8.1 public AutoCloseable open(PushEventConsumer<? super T> aec) throws Exception

the consumer (not null)

Open the asynchronous channel between the source and the consumer. The call returns an AutoCloseable. This can be closed, and should close the channel, including sending a Close event if the channel was not already closed. The returned object must be able to be closed multiple times without sending more than one Close events.

a AutoCloseable that can be used to close the stream

Exception– 

706.7.9 public interface PushStream<T>
extends AutoCloseable

The Payload type

A Push Stream fulfills the same role as the Java 8 stream but it reverses the control direction. The Java 8 stream is pull based and this is push based. A Push Stream makes it possible to build a pipeline of transformations using a builder kind of model. Just like streams, it provides a number of terminating methods that will actually open the channel and perform the processing until the channel is closed (The source sends a Close event). The results of the processing will be send to a Promise, just like any error events. A stream can be used multiple times. The Push Stream represents a pipeline. Upstream is in the direction of the source, downstream is in the direction of the terminating method. Events are sent downstream asynchronously with no guarantee for ordering or concurrency. Methods are available to provide serialization of the events and splitting in background threads.

Consumers of this API must not implement this type

706.7.9.1 public PushStream<T> adjustBackPressure(LongUnaryOperator adjustment)

Changes the back-pressure propagated by this pipeline stage.

The supplied function receives the back pressure returned by the next pipeline stage and returns the back pressure that should be returned by this stage. This function will not be called if the previous pipeline stage returns negative back pressure.

Builder style (can be a new or the same object)

706.7.9.2 public PushStream<T> adjustBackPressure(ToLongBiFunction<T, Long> adjustment)

Changes the back-pressure propagated by this pipeline stage.

The supplied function receives the data object passed to the next pipeline stage and the back pressure that was returned by that stage when accepting it. The function returns the back pressure that should be returned by this stage. This function will not be called if the previous pipeline stage returns negative back pressure.

Builder style (can be a new or the same object)

706.7.9.3 public Promise<Boolean> allMatch(Predicate<? super T> predicate)

Closes the channel and resolve the promise with false when the predicate does not matches a pay load. If the channel is closed before, the promise is resolved with true.

This is a short circuiting terminal operation

A Promise that will resolve when an event fails to match the predicate, or the end of the stream is reached

706.7.9.4 public Promise<Boolean> anyMatch(Predicate<? super T> predicate)

Close the channel and resolve the promise with true when the predicate matches a payload. If the channel is closed before the predicate matches, the promise is resolved with false.

This is a short circuiting terminal operation

A Promise that will resolve when an event matches the predicate, or the end of the stream is reached

706.7.9.5 public PushStream<R> asyncMap(int n, int delay, Function<? super T, Promise<? extends R>> mapper)

<R>

number of simultaneous promises to use

Nr of ms/promise that is queued back pressure

The mapping function

Asynchronously map the payload values. The mapping function returns a Promise representing the asynchronous mapping operation.

The PushStream limits the number of concurrently running mapping operations, and returns back pressure based on the number of existing queued operations.

Builder style (can be a new or the same object)

IllegalArgumentException– if the number of threads is < 1 or the delay is < 0

NullPointerException– if the mapper is null

706.7.9.6 public PushStream<T> buffer()

Buffer the events in a queue using default values for the queue size and other behaviors. Buffered work will be processed asynchronously in the rest of the chain. Buffering also blocks the transmission of back pressure to previous elements in the chain, although back pressure is honored by the buffer.

Buffers are useful for "bursty" event sources which produce a number of events close together, then none for some time. These bursts can sometimes overwhelm downstream event consumers. Buffering will not, however, protect downstream components from a source which produces events faster than they can be consumed. For fast sources filter(Predicate) and coalesce(int, Function) fork(int, int, Executor) are better choices.

Builder style (can be a new or the same object)

706.7.9.7 public PushStreamBuilder<T, U> buildBuffer()

<U extends BlockingQueue<PushEvent<? extends T>>>

Build a buffer to enqueue events in a queue using custom values for the queue size and other behaviors. Buffered work will be processed asynchronously in the rest of the chain. Buffering also blocks the transmission of back pressure to previous elements in the chain, although back pressure is honored by the buffer.

Buffers are useful for "bursty" event sources which produce a number of events close together, then none for some time. These bursts can sometimes overwhelm downstream event consumers. Buffering will not, however, protect downstream components from a source which produces events faster than they can be consumed. For fast sources filter(Predicate) and coalesce(int, Function) fork(int, int, Executor) are better choices.

Buffers are also useful as "circuit breakers" in the pipeline. If a QueuePolicyOption.FAIL is used then a full buffer will trigger the stream to close, preventing an event storm from reaching the client.

A builder which can be used to configure the buffer for this pipeline stage.

706.7.9.8 public void close()

Close this PushStream by sending an event of type PushEvent.EventType.CLOSE downstream. Closing a PushStream is a safe operation that will not throw an Exception.

Calling close() on a closed PushStream has no effect.

706.7.9.9 public PushStream<R> coalesce(Function<? super T, Optional<R>> f)

<R>

Coalesces a number of events into a new type of event. The input events are forwarded to a accumulator function. This function returns an Optional. If the optional is present, it's value is send downstream, otherwise it is ignored.

Builder style (can be a new or the same object)

706.7.9.10 public PushStream<R> coalesce(int count, Function<Collection<T>, R> f)

<R>

Coalesces a number of events into a new type of event. A fixed number of input events are forwarded to a accumulator function. This function returns new event data to be forwarded on.

Builder style (can be a new or the same object)

706.7.9.11 public PushStream<R> coalesce(IntSupplier count, Function<Collection<T>, R> f)

<R>

Coalesces a number of events into a new type of event. A variable number of input events are forwarded to a accumulator function. The number of events to be forwarded is determined by calling the count function. The accumulator function then returns new event data to be forwarded on.

Builder style (can be a new or the same object)

706.7.9.12 public Promise<R> collect(Collector<? super T, A, R> collector)

<R, A>

See Stream. Will resolve once the channel closes.

This is a terminal operation

A Promise representing the collected results

706.7.9.13 public Promise<Long> count()

See Stream. Will resolve onces the channel closes.

This is a terminal operation

A Promise representing the number of values in the stream

706.7.9.14 public PushStream<T> distinct()

Remove any duplicates. Notice that this can be expensive in a large stream since it must track previous payloads.

Builder style (can be a new or the same object)

706.7.9.15 public PushStream<T> filter(Predicate<? super T> predicate)

The predicate that is tested (not null)

Only pass events downstream when the predicate tests true.

Builder style (can be a new or the same object)

706.7.9.16 public Promise<Optional<T>> findAny()

Close the channel and resolve the promise with the first element. If the channel is closed before, the Optional will have no value.

This is a terminal operation

a promise

706.7.9.17 public Promise<Optional<T>> findFirst()

Close the channel and resolve the promise with the first element. If the channel is closed before, the Optional will have no value.

a promise

706.7.9.18 public PushStream<R> flatMap(Function<? super T, ? extends PushStream<? extends R>> mapper)

<R>

The flat map function

Flat map the payload value (turn one event into 0..n events of potentially another type).

Builder style (can be a new or the same object)

706.7.9.19 public Promise<Void> forEach(Consumer<? super T> action)

The action to perform

Execute the action for each event received until the channel is closed. This is a terminating method, the returned promise is resolved when the channel closes.

This is a terminal operation

A promise that is resolved when the channel closes.

706.7.9.20 public Promise<Long> forEachEvent(PushEventConsumer<? super T> action)

Pass on each event to another consumer until the stream is closed.

This is a terminal operation

a promise

706.7.9.21 public PushStream<T> fork(int n, int delay, Executor e)

number of simultaneous background threads to use

Nr of ms/thread that is queued back pressure

an executor to use for the background threads.

Execute the downstream events in up to n background threads. If more requests are outstanding apply delay * nr of delayed threads back pressure. A downstream channel that is closed or throws an exception will cause all execution to cease and the stream to close

Builder style (can be a new or the same object)

IllegalArgumentException– if the number of threads is < 1 or the delay is < 0

NullPointerException– if the Executor is null

706.7.9.22 public PushStream<T> limit(long maxSize)

Maximum number of elements has been received

Automatically close the channel after the maxSize number of elements is received.

Builder style (can be a new or the same object)

706.7.9.23 public PushStream<T> limit(Duration maxTime)

The maximum time that the stream should remain open

Automatically close the channel after the given amount of time has elapsed.

Builder style (can be a new or the same object)

706.7.9.24 public PushStream<R> map(Function<? super T, ? extends R> mapper)

<R>

The map function

Map a payload value.

Builder style (can be a new or the same object)

706.7.9.25 public Promise<Optional<T>> max(Comparator<? super T> comparator)

See Stream. Will resolve onces the channel closes.

This is a terminal operation

A Promise representing the maximum value, or null if no values are seen before the end of the stream

706.7.9.26 public PushStream<T> merge(PushEventSource<? extends T> source)

The source to merge in.

Merge in the events from another source. The resulting channel is not closed until this channel and the channel from the source are closed.

Builder style (can be a new or the same object)

706.7.9.27 public PushStream<T> merge(PushStream<? extends T> source)

The source to merge in.

Merge in the events from another PushStream. The resulting channel is not closed until this channel and the channel from the source are closed.

Builder style (can be a new or the same object)

706.7.9.28 public Promise<Optional<T>> min(Comparator<? super T> comparator)

See Stream. Will resolve onces the channel closes.

This is a terminal operation

A Promise representing the minimum value, or null if no values are seen before the end of the stream

706.7.9.29 public Promise<Boolean> noneMatch(Predicate<? super T> predicate)

Closes the channel and resolve the promise with false when the predicate matches any pay load. If the channel is closed before, the promise is resolved with true.

This is a short circuiting terminal operation

A Promise that will resolve when an event matches the predicate, or the end of the stream is reached

706.7.9.30 public PushStream<T> onClose(Runnable closeHandler)

Will be called on close

Provide a handler that must be run after the PushStream is closed. This handler will run after any downstream operations have processed the terminal event but before any upstream operations have processed the terminal event.

This stream

706.7.9.31 public PushStream<T> onError(Consumer<? super Throwable> errorHandler)

Will be called on an error event

Provide a handler that will be called if the PushStream is closed with an event of type PushEvent.EventType.ERROR. The error value from this event will be passed to the callback function after the PushStream is closed. This handler will run after any downstream operations have processed the error event but before any upstream operations have processed the error event.

This stream

706.7.9.32 public Promise<T> reduce(T identity, BinaryOperator<T> accumulator)

The identity/begin value

The accumulator

Standard reduce, see Stream. The returned promise will be resolved when the channel closes.

This is a terminal operation

A

706.7.9.33 public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator)

The accumulator

Standard reduce without identity, so the return is an Optional. The returned promise will be resolved when the channel closes.

This is a terminal operation

an Optional

706.7.9.34 public Promise<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner)

<U>

combines two U's into one U (for example, combine two lists)

Standard reduce with identity, accumulator and combiner. The returned promise will be resolved when the channel closes.

This is a terminal operation

The promise

706.7.9.35 public PushStream<T> sequential()

Ensure that any events are delivered sequentially. That is, no overlapping calls downstream. This can be used to turn a forked stream (where for example a heavy conversion is done in multiple threads) back into a sequential stream so a reduce is simple to do.

Builder style (can be a new or the same object)

706.7.9.36 public PushStream<T> skip(long n)

number of elements to skip

Skip a number of events in the channel.

Builder style (can be a new or the same object)

IllegalArgumentException– if the number of events to skip is negative

706.7.9.37 public PushStream<T> sorted()

Sorted the elements, assuming that T extends Comparable. This is of course expensive for large or infinite streams since it requires buffering the stream until close.

Builder style (can be a new or the same object)

706.7.9.38 public PushStream<T> sorted(Comparator<? super T> comparator)

Sorted the elements with the given comparator. This is of course expensive for large or infinite streams since it requires buffering the stream until close.

Builder style (can be a new or the same object)

706.7.9.39 public PushStream<T>[] split(Predicate<? super T>... predicates)

the predicates to test

Split the events to different streams based on a predicate. If the predicate is true, the event is dispatched to that channel on the same position. All predicates are tested for every event.

This method differs from other methods of PushStream in three significant ways:

  • The return value contains multiple streams.

  • This stream will only close when all of these child streams have closed.

  • Event delivery is made to all open children that accept the event.

streams that map to the predicates

706.7.9.40 public PushStream<T> timeout(Duration idleTime)

The length of time that the stream should remain open when no events are being received.

Automatically fail the channel if no events are received for the indicated length of time. If the timeout is reached then a failure event containing a TimeoutException will be sent.

Builder style (can be a new or the same object)

706.7.9.41 public Promise<Object> toArray()

Collect the payloads in an Object array after the channel is closed. This is a terminating method, the returned promise is resolved when the channel is closed.

This is a terminal operation

A promise that is resolved with all the payloads received over the channel

706.7.9.42 public Promise<A> toArray(IntFunction<A> generator)

<A>

The element type of the resulting array.

A function which returns an array into which the payloads are stored.

Collect the payloads in an Object array after the channel is closed. This is a terminating method, the returned promise is resolved when the channel is closed. The type of the array is handled by the caller using a generator function that gets the length of the desired array.

This is a terminal operation

A promise that is resolved with all the payloads received over the channel. The promise will be failed with an ArrayStoreException if the runtime type of the array returned by the array generator is not a supertype of the runtime type of every payload in the channel.

706.7.9.43 public PushStream<R> window(Duration d, Function<Collection<T>, R> f)

<R>

Buffers a number of events over a fixed time interval and then forwards the events to an accumulator function. This function returns new event data to be forwarded on. Note that:

  • The collection forwarded to the accumulator function will be empty if no events arrived during the time interval.

  • The accumulator function will be run and the forwarded event delivered as a different task, (and therefore potentially on a different thread) from the one that delivered the event to this PushStream.

  • Due to the buffering and asynchronous delivery required, this method prevents the propagation of back-pressure to earlier stages

Builder style (can be a new or the same object)

706.7.9.44 public PushStream<R> window(Duration d, Executor executor, Function<Collection<T>, R> f)

<R>

Buffers a number of events over a fixed time interval and then forwards the events to an accumulator function. This function returns new event data to be forwarded on. Note that:

  • The collection forwarded to the accumulator function will be empty if no events arrived during the time interval.

  • The accumulator function will be run and the forwarded event delivered by a task given to the supplied executor.

  • Due to the buffering and asynchronous delivery required, this method prevents the propagation of back-pressure to earlier stages

Builder style (can be a new or the same object)

706.7.9.45 public PushStream<R> window(Supplier<Duration> timeSupplier, IntSupplier maxEvents, BiFunction<Long, Collection<T>, R> f)

<R>

Buffers a number of events over a variable time interval and then forwards the events to an accumulator function. The length of time over which events are buffered is determined by the time function. A maximum number of events can also be requested, if this number of events is reached then the accumulator will be called early. The accumulator function returns new event data to be forwarded on. It is also given the length of time for which the buffer accumulated data. This may be less than the requested interval if the buffer reached the maximum number of requested events early. Note that:

  • The collection forwarded to the accumulator function will be empty if no events arrived during the time interval.

  • The accumulator function will be run and the forwarded event delivered as a different task, (and therefore potentially on a different thread) from the one that delivered the event to this PushStream.

  • Due to the buffering and asynchronous delivery required, this method prevents the propagation of back-pressure to earlier stages

  • If the window finishes by hitting the maximum number of events then the remaining time in the window will be applied as back-pressure to the previous stage, attempting to slow the producer to the expected windowing threshold.

Builder style (can be a new or the same object)

706.7.9.46 public PushStream<R> window(Supplier<Duration> timeSupplier, IntSupplier maxEvents, Executor executor, BiFunction<Long, Collection<T>, R> f)

<R>

Buffers a number of events over a variable time interval and then forwards the events to an accumulator function. The length of time over which events are buffered is determined by the time function. A maximum number of events can also be requested, if this number of events is reached then the accumulator will be called early. The accumulator function returns new event data to be forwarded on. It is also given the length of time for which the buffer accumulated data. This may be less than the requested interval if the buffer reached the maximum number of requested events early. Note that:

  • The collection forwarded to the accumulator function will be empty if no events arrived during the time interval.

  • The accumulator function will be run and the forwarded event delivered as a different task, (and therefore potentially on a different thread) from the one that delivered the event to this PushStream.

  • If the window finishes by hitting the maximum number of events then the remaining time in the window will be applied as back-pressure to the previous stage, attempting to slow the producer to the expected windowing threshold.

Builder style (can be a new or the same object)

706.7.10 public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent<? extends T>>>
extends BufferBuilder<PushStream<T>, T, U>

The type of objects in the PushEvent

The type of the Queue used in the user specified buffer

A Builder for a PushStream. This Builder extends the support of a standard BufferBuilder by allowing the PushStream to be unbuffered.

Consumers of this API must not implement this type

706.7.10.1 public PushStreamBuilder<T, U> unbuffered()

Tells this PushStreamBuilder to create an unbuffered stream which delivers events directly to its consumer using the incoming delivery thread. Setting the PushStreamBuilder to be unbuffered means that any buffer, queue policy or push back policy will be ignored. Note that calling one of:

after this method will reset this builder to require a buffer.

the builder

706.7.10.2 public PushStreamBuilder<T, U> withBuffer(U queue)

The BlockingQueue implementation to use as a buffer

this builder

706.7.10.3 public PushStreamBuilder<T, U> withExecutor(Executor executor)

Set the Executor that should be used to deliver events from this buffer

this builder

706.7.10.4 public PushStreamBuilder<T, U> withParallelism(int parallelism)

Set the maximum permitted number of concurrent event deliveries allowed from this buffer

this builder

706.7.10.5 public PushStreamBuilder<T, U> withPushbackPolicy(PushbackPolicy<T, U> pushbackPolicy)

Set the PushbackPolicy of this builder

this builder

706.7.10.6 public PushStreamBuilder<T, U> withPushbackPolicy(PushbackPolicyOption pushbackPolicyOption, long time)

Set the PushbackPolicy of this builder

this builder

706.7.10.7 public PushStreamBuilder<T, U> withQueuePolicy(QueuePolicy<T, U> queuePolicy)

Set the QueuePolicy of this Builder

this builder

706.7.10.8 public PushStreamBuilder<T, U> withQueuePolicy(QueuePolicyOption queuePolicyOption)

Set the QueuePolicy of this Builder

this builder

706.7.10.9 public PushStreamBuilder<T, U> withScheduler(ScheduledExecutorService scheduler)

Set the ScheduledExecutorService that should be used to trigger timed events after this buffer

this builder

706.7.11 public final class PushStreamProvider

A factory for PushStream instances, and utility methods for handling PushEventSources and PushEventConsumers

706.7.11.1 public PushStreamProvider()

706.7.11.2 public BufferBuilder<PushEventConsumer<T>, T, U> buildBufferedConsumer(PushEventConsumer<T> delegate)

<T, U extends BlockingQueue<PushEvent<? extends T>>>

Build a buffered PushEventConsumer with custom configuration.

The returned consumer will be buffered from the event source, and will honor back pressure requests from its delegate even if the event source does not.

Buffered consumers are useful for "bursty" event sources which produce a number of events close together, then none for some time. These bursts can sometimes overwhelm the consumer. Buffering will not, however, protect downstream components from a source which produces events faster than they can be consumed.

Buffers are also useful as "circuit breakers". If a QueuePolicyOption.FAIL is used then a full buffer will request that the stream close, preventing an event storm from reaching the client.

Note that this buffered consumer will close when it receives a terminal event, or if the delegate returns negative backpressure. No further events will be propagated after this time.

a PushEventConsumer with a buffer directly before it

706.7.11.3 public BufferBuilder<PushEventSource<T>, T, U> buildEventSourceFromStream(PushStream<T> stream)

<T, U extends BlockingQueue<PushEvent<? extends T>>>

Convert an PushStream into an PushEventSource. The first call to PushEventSource.open(PushEventConsumer) will begin event processing.

The PushEventSource will remain active until the backing stream is closed, and permits multiple consumers to PushEventSource.open(PushEventConsumer) it. Note that this means the caller of this method is responsible for closing the supplied stream if it is not finite in length.

Late joining consumers will not receive historical events, but will immediately receive the terminal event which closed the stream if the stream is already closed.

a PushEventSource backed by the PushStream

706.7.11.4 public BufferBuilder<SimplePushEventSource<T>, T, U> buildSimpleEventSource(Class<T> type)

<T, U extends BlockingQueue<PushEvent<? extends T>>>

Build a SimplePushEventSource with the supplied type and custom buffering behaviors. The SimplePushEventSource will respond to back pressure requests from the consumers connected to it.

a SimplePushEventSource

706.7.11.5 public PushStreamBuilder<T, U> buildStream(PushEventSource<T> eventSource)

<T, U extends BlockingQueue<PushEvent<? extends T>>>

The source of the events

Builds a push stream with custom configuration.

The resulting PushStream may be buffered or unbuffered depending on how it is configured.

A PushStreamBuilder for the stream

706.7.11.6 public PushEventConsumer<T> createBufferedConsumer(PushEventConsumer<T> delegate)

<T>

Create a buffered PushEventConsumer with the default configured buffer, executor size, queue, queue policy and pushback policy. This is equivalent to calling

 buildBufferedConsumer(delegate).create();

The returned consumer will be buffered from the event source, and will honor back pressure requests from its delegate even if the event source does not.

Buffered consumers are useful for "bursty" event sources which produce a number of events close together, then none for some time. These bursts can sometimes overwhelm the consumer. Buffering will not, however, protect downstream components from a source which produces events faster than they can be consumed.

a PushEventConsumer with a buffer directly before it

706.7.11.7 public PushEventSource<T> createEventSourceFromStream(PushStream<T> stream)

<T>

Convert an PushStream into an PushEventSource. The first call to PushEventSource.open(PushEventConsumer) will begin event processing. The PushEventSource will remain active until the backing stream is closed, and permits multiple consumers to PushEventSource.open(PushEventConsumer) it. This is equivalent to:

 buildEventSourceFromStream(stream).create();

a PushEventSource backed by the PushStream

706.7.11.8 public SimplePushEventSource<T> createSimpleEventSource(Class<T> type)

<T>

Create a SimplePushEventSource with the supplied type and default buffering behaviors. The SimplePushEventSource will respond to back pressure requests from the consumers connected to it. This is equivalent to:

 buildSimpleEventSource(type).create();

a SimplePushEventSource

706.7.11.9 public PushStream<T> createStream(PushEventSource<T> eventSource)

<T>

Create a stream with the default configured buffer, executor size, queue, queue policy and pushback policy. This is equivalent to calling

 buildStream(source).create();

This stream will be buffered from the event producer, and will honor back pressure even if the source does not.

Buffered streams are useful for "bursty" event sources which produce a number of events close together, then none for some time. These bursts can sometimes overwhelm downstream processors. Buffering will not, however, protect downstream components from a source which produces events faster (on average) than they can be consumed.

Event delivery will not begin until a terminal operation is reached on the chain of PushStreams. Once a terminal operation is reached the stream will be connected to the event source.

A PushStream with a default initial buffer

706.7.11.10 public PushStream<T> streamOf(Stream<T> items)

<T>

The items to push into the PushStream

Create an Unbuffered PushStream from a Java Stream The data from the stream will be pushed into the PushStream synchronously as it is opened. This may make terminal operations blocking unless a buffer has been added to the PushStream. Care should be taken with infinite Streams to avoid blocking indefinitely.

A PushStream containing the items from the Java Stream

706.7.11.11 public PushStream<T> streamOf(Executor executor, ScheduledExecutorService scheduler, Stream<T> items)

<T>

The worker to use to push items from the Stream into the PushStream

The scheduler to use to trigger timed events in the PushStream

The items to push into the PushStream

Create an Unbuffered PushStream from a Java Stream The data from the stream will be pushed into the PushStream asynchronously using the supplied Executor.

A PushStream containing the items from the Java Stream

706.7.12 public interface QueuePolicy<T, U extends BlockingQueue<PushEvent<? extends T>>>

The type of the data

The type of the queue

A QueuePolicy is used to control how events should be queued in the current buffer. The QueuePolicy will be called when an event has arrived.

QueuePolicyOption

706.7.12.1 public void doOffer(U queue, PushEvent<? extends T> event) throws Exception

Enqueue the event and return the remaining capacity available for events

Exception– If an error occurred adding the event to the queue. This exception will cause the connection between the PushEventSource and the PushEventConsumer to be closed with an EventType.ERROR

706.7.13 enum QueuePolicyOption

QueuePolicyOption provides a standard set of simple QueuePolicy implementations.

QueuePolicy

706.7.13.1 DISCARD_OLDEST

Attempt to add the supplied event to the queue. If the queue is unable to immediately accept the value then discard the value at the head of the queue and try again. Repeat this process until the event is enqueued.

706.7.13.2 BLOCK

Attempt to add the supplied event to the queue, blocking until the enqueue is successful.

706.7.13.3 FAIL

Attempt to add the supplied event to the queue, throwing an exception if the queue is full.

706.7.13.4 public abstract QueuePolicy<T, U> getPolicy()

<T, U extends BlockingQueue<PushEvent<? extends T>>>

a QueuePolicy implementation

706.7.13.5 public static QueuePolicyOption valueOf(String name)

706.7.13.6 public static QueuePolicyOption[] values()

706.7.14 public interface SimplePushEventSource<T>
extends PushEventSource<T>, AutoCloseable

The type of the events produced by this source

A SimplePushEventSource is a helper that makes it simpler to write a PushEventSource. Users do not need to manage multiple registrations to the stream, nor do they have to be concerned with back pressure.

Consumers of this API must not implement this type

706.7.14.1 public void close()

Close this source. Calling this method indicates that there will never be any more events published by it. Calling this method sends a close event to all connected consumers. After calling this method any PushEventConsumer that tries to open(PushEventConsumer) this source will immediately receive a close event, and will not see any remaining buffered events.

706.7.14.2 public Promise<Void> connectPromise()

This method can be used to delay event generation until an event source has connected. The returned promise will resolve as soon as one or more PushEventConsumer instances have opened the SimplePushEventSource.

The returned promise may already be resolved if this SimplePushEventSource already has connected consumers. If the SimplePushEventSource is closed before the returned Promise resolves then it will be failed with an IllegalStateException.

Note that the connected consumers are able to asynchronously close their connections to this SimplePushEventSource, and therefore it is possible that once the promise resolves this SimplePushEventSource may no longer be connected to any consumers.

A promise representing the connection state of this EventSource

706.7.14.3 public void endOfStream()

Close this source for now, but potentially reopen it later. Calling this method asynchronously sends a close event to all connected consumers and then disconnects them. Any events previously queued by the publish(Object) method will be delivered before this close event.

After calling this method any PushEventConsumer that wishes may open(PushEventConsumer) this source, and will receive subsequent events.

706.7.14.4 public void error(Throwable t)

the error

Close this source for now, but potentially reopen it later. Calling this method asynchronously sends an error event to all connected consumers and then disconnects them. Any events previously queued by the publish(Object) method will be delivered before this error event.

After calling this method any PushEventConsumer that wishes may open(PushEventConsumer) this source, and will receive subsequent events.

706.7.14.5 public boolean isConnected()

Determine whether there are any PushEventConsumers for this PushEventSource. This can be used to skip expensive event creation logic when there are no listeners.

true if any consumers are currently connected

706.7.14.6 public void publish(T t)

Asynchronously publish an event to this stream and all connected PushEventConsumer instances. When this method returns there is no guarantee that all consumers have been notified. Events published by a single thread will maintain their relative ordering, however they may be interleaved with events from other threads.

IllegalStateException– if the source is closed

706.7.15 public final class ThresholdPushbackPolicy<T, U extends BlockingQueue<PushEvent<? extends T>>>
implements PushbackPolicy<T, U>

The type of objects in the PushEvent

The type of the Queue used in the user specified buffer

Provides a configurable PushbackPolicy implementation that returns zero back pressure until the buffer fills beyond the supplied threshold. Once the threshold is reached back pressure is returned based on the supplied parameters.

The starting level of the back pressure once the threshold is exceeded is defined using the initial parameter. Additional back pressure is applied for each queued item over the threshold. The amount of this additional back pressure is defined by the increment parameter.

The following common use cases are supported:

  • The increment size can be zero, returning the initial value as a fixed back pressure after the threshold is exceeded.

  • The initial value can be zero, returning a linearly increasing back pressure from zero once the threshold is exceeded

1.1

706.7.15.1 public static ThresholdPushbackPolicy<T, U> createIncrementalThresholdPushbackPolicy(int threshold, long increment)

<T, U extends BlockingQueue<PushEvent<? extends T>>>

the increments in which the pressure increases

the queue size limit after which back pressure will be applied

A simple configuration, where the increment size is used as the initial back pressure.

a new ThresholdPushbackPolicy

IllegalArgumentException– if any of the given values is lower then zero

createThresholdPushbackPolicy(int threshold, long initial, long increment)

706.7.15.2 public static ThresholdPushbackPolicy<T, U> createSimpleThresholdPushbackPolicy(int threshold)

<T, U extends BlockingQueue<PushEvent<? extends T>>>

the queue size limit after which back pressure will be applied

A simple configuration with an initial back pressure of one and and increase increment size of one.

a new ThresholdPushbackPolicy

IllegalArgumentException– if any of the given values is lower then zero

createThresholdPushbackPolicy(int threshold, long initial, long increment)

706.7.15.3 public static ThresholdPushbackPolicy<T, U> createThresholdPushbackPolicy(int threshold, long initial, long increment)

<T, U extends BlockingQueue<PushEvent<? extends T>>>

the increments in which the pressure increases.

the initial back pressure which defines the floor after the threshold is exceeded

the queue size limit after which back pressure will be applied

Provides a ThresholdPushbackPolicy with an individual configuration for all possible parameters.

a new ThresholdPushbackPolicy

IllegalArgumentException– if any of the given values is lower then zero

706.7.15.4 public long pushback(U queue) throws Exception

Given the current state of the queue, determine the level of back pressure that should be applied

a back pressure value in nanoseconds

Exception– 

706.9 Changes