Interface PushStream<T>

Type Parameters:
T - The Payload type
All Superinterfaces:
AutoCloseable

@ProviderType public interface PushStream<T> extends AutoCloseable
A Push Stream fulfills the same role as the Java 8 stream but it reverses the control direction. The Java 8 stream is pull based and this is push based. A Push Stream makes it possible to build a pipeline of transformations using a builder kind of model. Just like streams, it provides a number of terminating methods that will actually open the channel and perform the processing until the channel is closed (The source sends a Close event). The results of the processing will be send to a Promise, just like any error events. A stream can be used multiple times. The Push Stream represents a pipeline. Upstream is in the direction of the source, downstream is in the direction of the terminating method. Events are sent downstream asynchronously with no guarantee for ordering or concurrency. Methods are available to provide serialization of the events and splitting in background threads.
  • Method Details

    • close

      void close()
      Close this PushStream by sending an event of type PushEvent.EventType.CLOSE downstream. Closing a PushStream is a safe operation that will not throw an Exception.

      Calling close() on a closed PushStream has no effect.

      Specified by:
      close in interface AutoCloseable
    • onClose

      PushStream<T> onClose(Runnable closeHandler)
      Provide a handler that must be run after the PushStream is closed. This handler will run after any downstream operations have processed the terminal event but before any upstream operations have processed the terminal event.
      Parameters:
      closeHandler - Will be called on close
      Returns:
      This stream
    • onError

      PushStream<T> onError(Consumer<? super Throwable> errorHandler)
      Provide a handler that will be called if the PushStream is closed with an event of type PushEvent.EventType.ERROR. The error value from this event will be passed to the callback function after the PushStream is closed. This handler will run after any downstream operations have processed the error event but before any upstream operations have processed the error event.
      Parameters:
      errorHandler - Will be called on an error event
      Returns:
      This stream
    • filter

      PushStream<T> filter(Predicate<? super T> predicate)
      Only pass events downstream when the predicate tests true.
      Parameters:
      predicate - The predicate that is tested (not null)
      Returns:
      Builder style (can be a new or the same object)
    • map

      <R> PushStream<R> map(Function<? super T,? extends R> mapper)
      Map a payload value.
      Parameters:
      mapper - The map function
      Returns:
      Builder style (can be a new or the same object)
    • asyncMap

      <R> PushStream<R> asyncMap(int n, int delay, Function<? super T,Promise<? extends R>> mapper)
      Asynchronously map the payload values. The mapping function returns a Promise representing the asynchronous mapping operation.

      The PushStream limits the number of concurrently running mapping operations, and returns back pressure based on the number of existing queued operations.

      Parameters:
      n - number of simultaneous promises to use
      delay - Nr of ms/promise that is queued back pressure
      mapper - The mapping function
      Returns:
      Builder style (can be a new or the same object)
      Throws:
      IllegalArgumentException - if the number of threads is < 1 or the delay is < 0
      NullPointerException - if the mapper is null
    • flatMap

      <R> PushStream<R> flatMap(Function<? super T,? extends PushStream<? extends R>> mapper)
      Flat map the payload value (turn one event into 0..n events of potentially another type).
      Parameters:
      mapper - The flat map function
      Returns:
      Builder style (can be a new or the same object)
    • distinct

      PushStream<T> distinct()
      Remove any duplicates. Notice that this can be expensive in a large stream since it must track previous payloads.
      Returns:
      Builder style (can be a new or the same object)
    • sorted

      PushStream<T> sorted()
      Sorted the elements, assuming that T extends Comparable. This is of course expensive for large or infinite streams since it requires buffering the stream until close.
      Returns:
      Builder style (can be a new or the same object)
    • sorted

      PushStream<T> sorted(Comparator<? super T> comparator)
      Sorted the elements with the given comparator. This is of course expensive for large or infinite streams since it requires buffering the stream until close.
      Parameters:
      comparator -
      Returns:
      Builder style (can be a new or the same object)
    • limit

      PushStream<T> limit(long maxSize)
      Automatically close the channel after the maxSize number of elements is received.
      Parameters:
      maxSize - Maximum number of elements has been received
      Returns:
      Builder style (can be a new or the same object)
    • limit

      PushStream<T> limit(Duration maxTime)
      Automatically close the channel after the given amount of time has elapsed.
      Parameters:
      maxTime - The maximum time that the stream should remain open
      Returns:
      Builder style (can be a new or the same object)
    • timeout

      PushStream<T> timeout(Duration idleTime)
      Automatically fail the channel if no events are received for the indicated length of time. If the timeout is reached then a failure event containing a TimeoutException will be sent.
      Parameters:
      idleTime - The length of time that the stream should remain open when no events are being received.
      Returns:
      Builder style (can be a new or the same object)
    • skip

      PushStream<T> skip(long n)
      Skip a number of events in the channel.
      Parameters:
      n - number of elements to skip
      Returns:
      Builder style (can be a new or the same object)
      Throws:
      IllegalArgumentException - if the number of events to skip is negative
    • fork

      PushStream<T> fork(int n, int delay, Executor e)
      Execute the downstream events in up to n background threads. If more requests are outstanding apply delay * nr of delayed threads back pressure. A downstream channel that is closed or throws an exception will cause all execution to cease and the stream to close
      Parameters:
      n - number of simultaneous background threads to use
      delay - Nr of ms/thread that is queued back pressure
      e - an executor to use for the background threads.
      Returns:
      Builder style (can be a new or the same object)
      Throws:
      IllegalArgumentException - if the number of threads is < 1 or the delay is < 0
      NullPointerException - if the Executor is null
    • buffer

      PushStream<T> buffer()
      Buffer the events in a queue using default values for the queue size and other behaviors. Buffered work will be processed asynchronously in the rest of the chain. Buffering also blocks the transmission of back pressure to previous elements in the chain, although back pressure is honored by the buffer.

      Buffers are useful for "bursty" event sources which produce a number of events close together, then none for some time. These bursts can sometimes overwhelm downstream event consumers. Buffering will not, however, protect downstream components from a source which produces events faster than they can be consumed. For fast sources filter(Predicate) and coalesce(int, Function) fork(int, int, Executor) are better choices.

      Returns:
      Builder style (can be a new or the same object)
    • buildBuffer

      <U extends BlockingQueue<PushEvent<? extends T>>> PushStreamBuilder<T,U> buildBuffer()
      Build a buffer to enqueue events in a queue using custom values for the queue size and other behaviors. Buffered work will be processed asynchronously in the rest of the chain. Buffering also blocks the transmission of back pressure to previous elements in the chain, although back pressure is honored by the buffer.

      Buffers are useful for "bursty" event sources which produce a number of events close together, then none for some time. These bursts can sometimes overwhelm downstream event consumers. Buffering will not, however, protect downstream components from a source which produces events faster than they can be consumed. For fast sources filter(Predicate) and coalesce(int, Function) fork(int, int, Executor) are better choices.

      Buffers are also useful as "circuit breakers" in the pipeline. If a QueuePolicyOption.FAIL is used then a full buffer will trigger the stream to close, preventing an event storm from reaching the client.

      Returns:
      A builder which can be used to configure the buffer for this pipeline stage.
    • merge

      PushStream<T> merge(PushEventSource<? extends T> source)
      Merge in the events from another source. The resulting channel is not closed until this channel and the channel from the source are closed.
      Parameters:
      source - The source to merge in.
      Returns:
      Builder style (can be a new or the same object)
    • merge

      PushStream<T> merge(PushStream<? extends T> source)
      Merge in the events from another PushStream. The resulting channel is not closed until this channel and the channel from the source are closed.
      Parameters:
      source - The source to merge in.
      Returns:
      Builder style (can be a new or the same object)
    • split

      PushStream<T>[] split(Predicate<? super T>... predicates)
      Split the events to different streams based on a predicate. If the predicate is true, the event is dispatched to that channel on the same position. All predicates are tested for every event.

      This method differs from other methods of PushStream in three significant ways:

      • The return value contains multiple streams.
      • This stream will only close when all of these child streams have closed.
      • Event delivery is made to all open children that accept the event.
      Parameters:
      predicates - the predicates to test
      Returns:
      streams that map to the predicates
    • sequential

      PushStream<T> sequential()
      Ensure that any events are delivered sequentially. That is, no overlapping calls downstream. This can be used to turn a forked stream (where for example a heavy conversion is done in multiple threads) back into a sequential stream so a reduce is simple to do.
      Returns:
      Builder style (can be a new or the same object)
    • coalesce

      <R> PushStream<R> coalesce(Function<? super T,Optional<R>> f)
      Coalesces a number of events into a new type of event. The input events are forwarded to a accumulator function. This function returns an Optional. If the optional is present, it's value is send downstream, otherwise it is ignored.
      Parameters:
      f -
      Returns:
      Builder style (can be a new or the same object)
    • coalesce

      <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f)
      Coalesces a number of events into a new type of event. A fixed number of input events are forwarded to a accumulator function. This function returns new event data to be forwarded on.
      Parameters:
      count -
      f -
      Returns:
      Builder style (can be a new or the same object)
    • coalesce

      <R> PushStream<R> coalesce(IntSupplier count, Function<Collection<T>,R> f)
      Coalesces a number of events into a new type of event. A variable number of input events are forwarded to a accumulator function. The number of events to be forwarded is determined by calling the count function. The accumulator function then returns new event data to be forwarded on.
      Parameters:
      count -
      f -
      Returns:
      Builder style (can be a new or the same object)
    • window

      <R> PushStream<R> window(Duration d, Function<Collection<T>,R> f)
      Buffers a number of events over a fixed time interval and then forwards the events to an accumulator function. This function returns new event data to be forwarded on. Note that:
      • The collection forwarded to the accumulator function will be empty if no events arrived during the time interval.
      • The accumulator function will be run and the forwarded event delivered as a different task, (and therefore potentially on a different thread) from the one that delivered the event to this PushStream.
      • Due to the buffering and asynchronous delivery required, this method prevents the propagation of back-pressure to earlier stages
      Parameters:
      d -
      f -
      Returns:
      Builder style (can be a new or the same object)
    • window

      <R> PushStream<R> window(Duration d, Executor executor, Function<Collection<T>,R> f)
      Buffers a number of events over a fixed time interval and then forwards the events to an accumulator function. This function returns new event data to be forwarded on. Note that:
      • The collection forwarded to the accumulator function will be empty if no events arrived during the time interval.
      • The accumulator function will be run and the forwarded event delivered by a task given to the supplied executor.
      • Due to the buffering and asynchronous delivery required, this method prevents the propagation of back-pressure to earlier stages
      Parameters:
      d -
      executor -
      f -
      Returns:
      Builder style (can be a new or the same object)
    • window

      <R> PushStream<R> window(Supplier<Duration> timeSupplier, IntSupplier maxEvents, BiFunction<Long,Collection<T>,R> f)
      Buffers a number of events over a variable time interval and then forwards the events to an accumulator function. The length of time over which events are buffered is determined by the time function. A maximum number of events can also be requested, if this number of events is reached then the accumulator will be called early. The accumulator function returns new event data to be forwarded on. It is also given the length of time for which the buffer accumulated data. This may be less than the requested interval if the buffer reached the maximum number of requested events early. Note that:
      • The collection forwarded to the accumulator function will be empty if no events arrived during the time interval.
      • The accumulator function will be run and the forwarded event delivered as a different task, (and therefore potentially on a different thread) from the one that delivered the event to this PushStream.
      • Due to the buffering and asynchronous delivery required, this method prevents the propagation of back-pressure to earlier stages
      • If the window finishes by hitting the maximum number of events then the remaining time in the window will be applied as back-pressure to the previous stage, attempting to slow the producer to the expected windowing threshold.
      Parameters:
      timeSupplier -
      maxEvents -
      f -
      Returns:
      Builder style (can be a new or the same object)
    • window

      <R> PushStream<R> window(Supplier<Duration> timeSupplier, IntSupplier maxEvents, Executor executor, BiFunction<Long,Collection<T>,R> f)
      Buffers a number of events over a variable time interval and then forwards the events to an accumulator function. The length of time over which events are buffered is determined by the time function. A maximum number of events can also be requested, if this number of events is reached then the accumulator will be called early. The accumulator function returns new event data to be forwarded on. It is also given the length of time for which the buffer accumulated data. This may be less than the requested interval if the buffer reached the maximum number of requested events early. Note that:
      • The collection forwarded to the accumulator function will be empty if no events arrived during the time interval.
      • The accumulator function will be run and the forwarded event delivered as a different task, (and therefore potentially on a different thread) from the one that delivered the event to this PushStream.
      • If the window finishes by hitting the maximum number of events then the remaining time in the window will be applied as back-pressure to the previous stage, attempting to slow the producer to the expected windowing threshold.
      Parameters:
      timeSupplier -
      maxEvents -
      executor -
      f -
      Returns:
      Builder style (can be a new or the same object)
    • adjustBackPressure

      PushStream<T> adjustBackPressure(LongUnaryOperator adjustment)
      Changes the back-pressure propagated by this pipeline stage.

      The supplied function receives the back pressure returned by the next pipeline stage and returns the back pressure that should be returned by this stage. This function will not be called if the previous pipeline stage returns negative back pressure.

      Parameters:
      adjustment -
      Returns:
      Builder style (can be a new or the same object)
    • adjustBackPressure

      PushStream<T> adjustBackPressure(ToLongBiFunction<T,Long> adjustment)
      Changes the back-pressure propagated by this pipeline stage.

      The supplied function receives the data object passed to the next pipeline stage and the back pressure that was returned by that stage when accepting it. The function returns the back pressure that should be returned by this stage. This function will not be called if the previous pipeline stage returns negative back pressure.

      Parameters:
      adjustment -
      Returns:
      Builder style (can be a new or the same object)
    • forEach

      Promise<Void> forEach(Consumer<? super T> action)
      Execute the action for each event received until the channel is closed. This is a terminating method, the returned promise is resolved when the channel closes.

      This is a terminal operation

      Parameters:
      action - The action to perform
      Returns:
      A promise that is resolved when the channel closes.
    • toArray

      Promise<Object[]> toArray()
      Collect the payloads in an Object array after the channel is closed. This is a terminating method, the returned promise is resolved when the channel is closed.

      This is a terminal operation

      Returns:
      A promise that is resolved with all the payloads received over the channel
    • toArray

      <A> Promise<A[]> toArray(IntFunction<A[]> generator)
      Collect the payloads in an Object array after the channel is closed. This is a terminating method, the returned promise is resolved when the channel is closed. The type of the array is handled by the caller using a generator function that gets the length of the desired array.

      This is a terminal operation

      Type Parameters:
      A - The element type of the resulting array.
      Parameters:
      generator - A function which returns an array into which the payloads are stored.
      Returns:
      A promise that is resolved with all the payloads received over the channel. The promise will be failed with an ArrayStoreException if the runtime type of the array returned by the array generator is not a supertype of the runtime type of every payload in the channel.
    • reduce

      Promise<T> reduce(T identity, BinaryOperator<T> accumulator)
      Standard reduce, see Stream. The returned promise will be resolved when the channel closes.

      This is a terminal operation

      Parameters:
      identity - The identity/begin value
      accumulator - The accumulator
      Returns:
      A
    • reduce

      Promise<Optional<T>> reduce(BinaryOperator<T> accumulator)
      Standard reduce without identity, so the return is an Optional. The returned promise will be resolved when the channel closes.

      This is a terminal operation

      Parameters:
      accumulator - The accumulator
      Returns:
      an Optional
    • reduce

      <U> Promise<U> reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)
      Standard reduce with identity, accumulator and combiner. The returned promise will be resolved when the channel closes.

      This is a terminal operation

      Parameters:
      identity -
      accumulator -
      combiner - combines two U's into one U (for example, combine two lists)
      Returns:
      The promise
    • collect

      <R, A> Promise<R> collect(Collector<? super T,A,R> collector)
      See Stream. Will resolve once the channel closes.

      This is a terminal operation

      Parameters:
      collector -
      Returns:
      A Promise representing the collected results
    • min

      Promise<Optional<T>> min(Comparator<? super T> comparator)
      See Stream. Will resolve onces the channel closes.

      This is a terminal operation

      Parameters:
      comparator -
      Returns:
      A Promise representing the minimum value, or null if no values are seen before the end of the stream
    • max

      Promise<Optional<T>> max(Comparator<? super T> comparator)
      See Stream. Will resolve onces the channel closes.

      This is a terminal operation

      Parameters:
      comparator -
      Returns:
      A Promise representing the maximum value, or null if no values are seen before the end of the stream
    • count

      Promise<Long> count()
      See Stream. Will resolve onces the channel closes.

      This is a terminal operation

      Returns:
      A Promise representing the number of values in the stream
    • anyMatch

      Promise<Boolean> anyMatch(Predicate<? super T> predicate)
      Close the channel and resolve the promise with true when the predicate matches a payload. If the channel is closed before the predicate matches, the promise is resolved with false.

      This is a short circuiting terminal operation

      Parameters:
      predicate -
      Returns:
      A Promise that will resolve when an event matches the predicate, or the end of the stream is reached
    • allMatch

      Promise<Boolean> allMatch(Predicate<? super T> predicate)
      Closes the channel and resolve the promise with false when the predicate does not matches a pay load. If the channel is closed before, the promise is resolved with true.

      This is a short circuiting terminal operation

      Parameters:
      predicate -
      Returns:
      A Promise that will resolve when an event fails to match the predicate, or the end of the stream is reached
    • noneMatch

      Promise<Boolean> noneMatch(Predicate<? super T> predicate)
      Closes the channel and resolve the promise with false when the predicate matches any pay load. If the channel is closed before, the promise is resolved with true.

      This is a short circuiting terminal operation

      Parameters:
      predicate -
      Returns:
      A Promise that will resolve when an event matches the predicate, or the end of the stream is reached
    • findFirst

      Promise<Optional<T>> findFirst()
      Close the channel and resolve the promise with the first element. If the channel is closed before, the Optional will have no value.
      Returns:
      a promise
    • findAny

      Promise<Optional<T>> findAny()
      Close the channel and resolve the promise with the first element. If the channel is closed before, the Optional will have no value.

      This is a terminal operation

      Returns:
      a promise
    • forEachEvent

      Promise<Long> forEachEvent(PushEventConsumer<? super T> action)
      Pass on each event to another consumer until the stream is closed.

      This is a terminal operation

      Parameters:
      action -
      Returns:
      a promise