Untangling Reactive Streams' Gordian Knot, Wiring Subscriber, Publisher and Subscription

By Rebecca Searls | November 23, 2020

At first look the Reactive Streams 1.0 Specification and its four short interfaces look straight forward to learn but libraries that implement Reactive Streams must provide a lot of infrastructure to enable it to work in the manner intended by the specification. These additions add a layer of complexity to the learning process. This article will strip away some of these complexities and look at the bare bones wiring of a Subscriber to a Publisher and Subscription. I will be using the RxJava version 3.0 implementation for this discussion.

The demo code provided here can be found in github project untangling-reactive-streams.

From the Reactive Streams Specification here are the three interfaces of interest.

      public interface Publisher<T> {
          public void subscribe(Subscriber<? super T> s);
      }

      public interface Subscriber<T> {
          public void onSubscribe(Subscription s);
          public void onNext(T t);
          public void onError(Throwable t);
          public void onComplete();
      }

      public interface Subscription {
          public void request(long n);
          public void cancel();
      }

A publisher object provides an “unbounded sequence of elements”. The purpose of the Publisher interface is to provide a means for a subscriber to register itself with the object. A publisher requires a means to pass data to the subscriber, notify it of errors and notify it when a terminal state has been reached. To that end, the Subscriber interface provides methods, onNext, onError and onComplete respectively to be called by the publisher.

A subscription is issued by a publisher to a subscriber as part of the registration process. It represents a one-to-one relationship between the publisher and subscriber. In reactive the publisher can send data at intervals that are too fast for the subscriber to handle. Subscription provides the means for data flow regulation to be communicated from the subscriber to the publisher. The subscriber can adjust the flow rate during the life of the subscription. The subscriber also uses the Subscription object to cancel the subscription with the publisher.

Here is a very simple example that shows how these components are wired together. There is synchronous data flow between the publisher and subscriber to keep it simple. The subscriber will toggle the flow rate between 1 and 2 items at a time during the run. Data flow will terminate normally (i.e. onComplete will be called). I will point out what I think are the essential rules from the specification that pertain to the basic wiring of these components. There are many implementation rules to fully implementing these interfaces, but those are addressed better in other articles. I am using POJOs rather than lambdas because I think it easier to see what is occurring.

NewsPublisher is a fictitious clearing house for all types of publications. It sends the publications to the subscriber as it receives them. For this example there is a fixed set of publication titles. A Stack is used to mimic data streaming.

The Reactive Streams Specification mandates that the first thing Publisher.subscribe must do is call Subscriber.onSubscribe, (Publisher rule 9). This allows the subscriber to perform any initialization it needs before receiving data. This initialization includes the subscriber calling Subscription.request to set the data flow rate. In this example once the subscriber has completed its initialization the publisher starts sending data. The while loop in method sendData checks the flow rate designed by the subscriber. The for loop sends that number of data items requested.

public class NewsPublisher implements Publisher<String> {
    private Subscriber subscriber;
    private NewsSubscription subscription;

    private List<String> articles = List.of(
            "NASA Notables",
            "The Gardian",
            "Soccer Weekly",
            "Better Farming",
            "Fine Home Building",
            "Consumer Report");
    private Stack<String> stack = null;

    public void subscribe(Subscriber<? super String> s) {
        // register Subscriber
        subscriber = s;

        // Issue subscription and register it with subscriber
        subscription = new NewsSubscription();
        s.onSubscribe(subscription);

        // setup and start the data flow
        stack = new Stack();
        articles.forEach(a -> stack.push(a));
        sendData();
    }

    private void sendData() {

        if (stack.empty()) {
            subscriber.onError(new RuntimeException("no news articles") );
            return;
        }

        while(true) {
            long cnt = subscription.getCnt();

            if (stack.empty()) {
                subscriber.onComplete();
                return;
            } else {
                for (; cnt > 0; cnt--) {
                    subscriber.onNext(stack.pop());

                    if (stack.empty()) {
                        subscriber.onComplete();
                        return;
                    }
                }
            }
        }
    }
}

NewsSubscriber receives the data but does nothing with it. It toggles the data flow between 1 and 2 items. Each interface method reports when it is called, so we can see the call order between the publisher and subscriber.

Subscriber rule 1 in the specification states,

A Subscriber MUST signal demand via Subscription.request(long n) to receive 
onNext signals.  

It is the responsibility if the subscriber to tell the publisher how much data it can initially handle. It is recommended the subscriber request the upper limit of what it is able to process. Given that method, request, takes an input parameter of type long, the largest value that can be specified is Long.MAX_VALUE. I am using a value of 1 here but in the real world a value of 1 would make for very inefficient processing.

The specification does not require Subscription.request to be called from method onSubscribe but it seems the logical place to declare the initial flow rate, because the Publisher can start sending data upon return from Subscriber.onSubscribe.

This example toggles the data flow rate in method onNext. The value is toggled once the previous number of requested data are received.

public class NewsSubscriber implements Subscriber<String> {
    private Subscription subscription;
    private ArrayList<String> articleList = new ArrayList<>();
    private long r = 1;

    @Override
    public void onSubscribe(Subscription s) {
        System.out.println("Subscriber received Subscription object: "
                + s.getClass().getSimpleName());
        subscription = s;
        subscription.request(r);
    }

    @Override
    public void onNext(String t) {
        System.out.println("r=" + r + "  Subscriber received data: " + t);
        articleList.add(t);
        --r;
        if(r == 0) {
            // toggle article flow between 1 and 2 articles
            r = (articleList.size() % 2) + 1;
            subscription.request(r);
        }
    }

    @Override
    public void onError(Throwable t) {
        System.out.println("Subscriber received Error notification. Msg:" +t);
    }

    @Override
    public void onComplete() {
        System.out.println("Subscriber notified Publication Compete");
    }
}

In terms of wiring the components there is nothing of significance the user must be aware of in a Subscription. It is just a means of conveying a value from a subscriber to a publisher. There are of coarse specification rules for its implementation.

public class NewsSubscription implements Subscription {
    private long cnt = 0;

    @Override
    public void request(long n) {
        cnt = n;
    }

    @Override
    public void cancel() {
    }

    public long getCnt() {
        return cnt;
    }
}

To summarize the steps in wiring a Publisher, Subscriber and Subscription are,

  • The Subscriber registers itself with the Publisher.
  • The Publisher retains a reference to the Subscriber.
  • The Publisher creates a Subscription and registers it with the Subscriber.
  • The Subscriber retains a reference to the Subscription.
  • The Subscriber sets the initial dataflow value in Subscription.request.

Here is the wiring of the example code in Main.pojoStyle().

        NewsPublisher publisher = new NewsPublisher();
        NewsSubscriber subscriber = new NewsSubscriber();
        publisher.subscribe(subscriber);

This is the result of running this example. The name of the Subscription object is listed when Subscriber.onSubscribe is called. “r” is the data flow count.

        -- pojoStyle --
        Subscriber received Subscription object: NewsSubscription
        r=1  Subscriber received data: Consumer Report
        r=2  Subscriber received data: Fine Home Building
        r=1  Subscriber received data: Better Farming
        r=2  Subscriber received data: Soccer Weekly
        r=1  Subscriber received data: The Gardian
        r=2  Subscriber received data: NASA Notables
        Subscriber notified Publication Compete

Here is the more common lambda form. It can be found in Main.lambdaStyle(). Note I have removed the code that toggles the flow rate and set request size to Long.MAX_VALUE.

        NewsPublisher publisher = new NewsPublisher();
        publisher.subscribe(new Subscriber <>(){
            ArrayList<String> articleList = new ArrayList<>();
            private Subscription subscription;
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("Subscriber received Subscription object: "
                        + s.getClass().getSimpleName());
                subscription = s;
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(String t) {
                System.out.println("Subscriber received data: " + t);
                articleList.add(t);
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("Subscriber received Error notification. Msg:" +t);
            }

            @Override
            public void onComplete() {
                System.out.println("Subscriber notified Publication Compete");
            }
        });
    }

The results of running this code is as follows.

        -- lambdaStyle --
        Subscriber received Subscription object: NewsSubscription
        Subscriber received data: Consumer Report
        Subscriber received data: Fine Home Building
        Subscriber received data: Better Farming
        Subscriber received data: Soccer Weekly
        Subscriber received data: The Gardian
        Subscriber received data: NASA Notables
        Subscriber notified Publication Compete

Lets see how this example is implemented using the RxJava library.

RxJava is a Java VM implementation of Reactive Extensions (ReactiveX), a library for composing asynchronous and event-based programs. It is built upon the Reactive Streams’ API and concepts. The RxJava library provides a base set of reactive classes, Observable, Flowable, Completable, Maybe, and Single, that are Publishers. Each of these classes provide factory methods, intermediate operators and the ability to consume reactive dataflows.

Flowable is the only implementation in the set that adheres fully to the Reactive Streams 1.0 Specification. The other classes don’t directly implement the Publisher interface or other Reactive Streams interfaces, but they do define like interfaces that are in keeping with those in the specification.

RxJava introduces an interface I think is of particular interest in this discussion, Emitter. The Emitter is the interface though which an external data source is connected to a Publisher. The definition of Emitter is nearly identical to the Subscriber interface. It is lacking the onSubscribe method. Having identical method names and nearly identical method signatures was quite confusing initially. There are many implementations of this interface in RxJava. Each implements a different dataflow control strategy.

    public interface Emitter<T> {
      void onNext(@NonNull T value);
      void onError(@NonNull Throwable error);
      void onComplete();
      }

Because Flowable adheres to the Reactive Streams Specification. I will use it for the examples. Two classes are available to create a Flowable object, FlowableCreate and Flowable. We will first look at FlowableCreate. It is a subclass of Flowable and performs the actual wiring of a subscriber to a publisher and a publisher to an external data source.

The constructor for FlowableCreate requires input parameters, FlowableOnSubscribe and BackpressureStrategy. FlowableOnSubscribe is a RxJava Publisher type. It has a similar interface definition to Reactive Streams’ Publisher. It has a single method subscribe but the input parameter to subscribe, is a type of Emitter not a Subscriber. This interface provides the means for external data to be passed to the provided Emitter for handling.

    public interface FlowableOnSubscribe<T> {
        void subscribe(@NonNull FlowableEmitter<T> emitter) throws Exception;
    }

I will create a Flowable object whose emitter transmits the same data we used in NewsPublisher. Below is my code for that. The lambda for FlowableOnSubscribe.subscribe (lines 90-94) will pass the list of news articles to an emitter provided by FlowableCreate. (We will look at this in more detail shortly.) Input parameter, BackpressureStrategies (line 96) is a enum. There are 5 strategy types. I am choosing BackpressureStrategy.BUFFER for no particular reason.

To subscribe to FlowableCreate one must call method subscribeActual not method subscribe. Method subscribeActual is defined as an abstract method in Flowable and implemented in FlowableCreate, which you will remember is a subclass of Flowable. When Flowable.subscribe is called it in turn calls FlowableCreate.subscribeActual thus allowing both classes to share the same code for component wiring. At line 98 subscribeActual is called and my subscriber, NewsSubscriber is passed to it.

   88     FlowableCreate<String> flowableC = new FlowableCreate(new FlowableOnSubscribe<String>() {
   89         @Override
   90         public void subscribe(FlowableEmitter<String> emitter) throws Exception {
   91             for (int i = articles.size()-1; i >= 0; i--) {
   92                 emitter.onNext(articles.get(i));
   93             }
   94             emitter.onComplete();
   95         }
   96     }, BackpressureStrategy.BUFFER);
   97
   98     flowableC.subscribeActual(new NewsSubscriber());

Lets examine the code in FlowableCreate’s constructor and the subscribeActual method.

    32  public final class FlowableCreate<T> extends Flowable<T> {
    33
    34    final FlowableOnSubscribe<T> source;
    35    final BackpressureStrategy backpressure;
    36
    37    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
    38        this.source = source;
    39        this.backpressure = backpressure;
    40    }
    41
    42    @Override
    43    public void subscribeActual(Subscriber<? super T> t) {
    44        BaseEmitter<T> emitter;
    45
    46        switch (backpressure) {
    47            case MISSING: {
    48                emitter = new MissingEmitter<T>(t);
    49                break;
    50            }
    51            case ERROR: {
    52                emitter = new ErrorAsyncEmitter<T>(t);
    53                break;
    54            }
    55            case DROP: {
    56                emitter = new DropAsyncEmitter<T>(t);
    57                break;
    58            }
    59            case LATEST: {
    60                emitter = new LatestAsyncEmitter<T>(t);
    61                break;
    62            }
    63            default: {
    64                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
    65                break;
    66            }
    67        }
    68
    69        t.onSubscribe(emitter);
    70        try {
    71            source.subscribe(emitter);
    72        } catch (Throwable ex) {
    73            Exceptions.throwIfFatal(ex);
    74            emitter.onError(ex);
    75        }
    76    }

In the constructor, my implementation for class FlowableOnSubscribe is saved in variable source. The value of BackpressureStrategy is saved in variable backpressure.

In method subscribeActual you will see classes representing the 5 backpressure strategies supported by Flowable (lines 46-64). Their implementations reside in this class if you care to take a look. A backpressure strategy is always allocated. BUFFERED is the default strategy. Class BufferAsyncEmitter implements that strategy. Each of these strategies implement the Reactive Streams’ Subscription interface.

In my call to subscribeActual, NewsSubscriber is referenced via variable t. My NewsSubscriber object is registered with subscription, BufferAsyncEmitter (line 64). BufferAsyncEmitter is registered with NewsSubscriber (line 69) when t.onSubscribe(emitter) is called. The final wiring step is to connect the emitter (BufferAsyncEmitter) with the external source of data generation, source, (FlowableOnSubscribe) (line 71) in the call to source.subscribe(emitter).

If you recall I mentioned earlier in the discussion that when Subscriber.onSubscribe is called, part of its initialization process is to call Subscription.request to set the data flow rate. NewsSubscriber, t does that, however BufferAsyncEmitter is not yet connection to a data source, so there is no data to be transmitted upon return from onSubscribe. It is not until line 71 when emitter, (BufferAsyncEmitter) is passed to source, (FlowableOnSubscribe) subscribe that external data is provided to the emitter and it starts sending the data to NewsSubscriber.

Here is the output from the example code in Main.flowableCreateStyle(). I am using NewsSubscriber which throttles the flow rate between 1 and 2. As I would expect the output is the same as for pojoStyle except the subscription object is BufferAsyncEmitter and not NewsSubscription.

        -- flowableCreateStyle --
        Subscriber received Subscription object: BufferAsyncEmitter
        r=1  Subscriber received data: Consumer Report
        r=2  Subscriber received data: Fine Home Building
        r=1  Subscriber received data: Better Farming
        r=2  Subscriber received data: Soccer Weekly
        r=1  Subscriber received data: The Gardian
        r=2  Subscriber received data: NASA Notables
        Subscriber notified Publication Compete

Now lets look at an example that creates a Flowable using Flowable.create. Method Flowable.create is a static method requiring the same input parameters as the FlowableCreate constructor. As you can see below, create is a wrapper method whose only purpose is to create and return a FlowableCreate object (line 1813).

RxJavaPlugins is a utility class that enables new handlers to be injected into some operations, such as onAssembly and onSubscribe, however I am not using this feature, so an unaltered FlowableCreate object is being returned by this method.

   1810     public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
   1811         ObjectHelper.requireNonNull(source, "source is null");
   1812         ObjectHelper.requireNonNull(mode, "mode is null");
   1813         return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
   1814     }

Here is the code that creates a Flowable object whose emitter transmits the same data we used in NewsPublisher. The implementation of FlowableOnSubscribe is identical to that used in FlowableCreate.

   113     Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
   114         @Override
   115         public void subscribe(FlowableEmitter<String> emitter) throws Exception {
   116             for (int i = articles.size()-1; i >= 0; i--) {
   117                 emitter.onNext(articles.get(i));
   118             }
   119             emitter.onComplete();
   120         }
   121     }, BackpressureStrategy.BUFFER);

Next I call flowable.subscribe. Flowable has many overloaded methods for subscribe. I will leave it to you to give them a look. I am using the one associated with the Reactive Streams’ Publisher interface so I must cast flowable to type Publisher for the desired method to be called. I am using lambda code here just for variety. I could have used class NewsSubscriber just as easily.

   123     ((Publisher<String>)flowable).subscribe(new Subscriber <>(){
   124         ArrayList<String> articleList = new ArrayList<>();
   125         private Subscription subscription;
   126         @Override
   127         public void onSubscribe(Subscription s) {
   128             System.out.println("Subscriber received Subscription object: "
   129                     + s.getClass().getSimpleName());
   130             subscription = s;
   131             subscription.request(Long.MAX_VALUE);
   132         }
   133
   134         @Override
   135         public void onNext(String t) {
   136             System.out.println("Subscriber received data: " + t);
   137             articleList.add(t);
   138         }
   139
   140         @Override
   141         public void onError(Throwable t) {
   142             System.out.println("Subscriber received Error notification. Msg:" +t);
   143         }
   144
   145         @Override
   146         public void onComplete() {
   147             System.out.println("Subscriber notified Publication Compete");
   148         }
   149     });

As I noted previously method Flowable.subscribe calls FlowableCreate.subscribeActual. Here is the result I get when I run the example code in Main.flowableStyle().

        -- flowableStyle --
        Subscriber received Subscription object: StrictSubscriber
        Subscriber received data: Consumer Report
        Subscriber received data: Fine Home Building
        Subscriber received data: Better Farming
        Subscriber received data: Soccer Weekly
        Subscriber received data: The Gardian
        Subscriber received data: NASA Notables
        Subscriber notified Publication Compete

Hmmm why is the subscription object StrictSubscriber and not BufferAsyncEmitter? Where does StrictSubscriber come from and what does it do?

Below is the code for the Flowable.subscribe method my code calls. At line 14756 my subscriber, s, is wrapped in class StrictSubscriber which is a type of Subscription.

   14749     @Override
   14750     public final void subscribe(Subscriber<? super T> s) {
   14751         if (s instanceof FlowableSubscriber) {
   14752             subscribe((FlowableSubscriber<? super T>)s);
   14753         } else {
   14754             ObjectHelper.requireNonNull(s, "s is null");
   14755             subscribe(new StrictSubscriber<T>(s));
   14756         }
   14757     }

The JavaDoc for StrictSubscriber states,

Ensures that the event flow between the upstream and downstream follow
the Reactive-Streams 1.0 specification ...

My subscriber object is the downstream object in this example. I am assuming BufferAsyncEmitter will become the upstream object.

Lets continue following Flowable’s method calls into subscribeActual. Executing line 14755 above drops us into method, subscribe, below. The value of the input parameter, s, is the StrictSubscriber object. At line 14801 the call to RxJavaPlugins.onSubscribe(this, s) does nothing but cast StrictSubscriber to Subscriber. The call to subscribeActual is made at line 14805. z, the unchanged value of StrictSubscriber, is passed in to subscribeActual.

  14798     public final void subscribe(FlowableSubscriber<? super T> s) {
  14799         ObjectHelper.requireNonNull(s, "s is null");
  14800         try {
  14801              Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
  14802
  14803              ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
  14804
  14805              subscribeActual(z);
  14806          } catch (NullPointerException e) { // NOPMD
  14807              throw e;
  14808          } catch (Throwable e) {
                     .............

We are now in FlowableCreate.subscribeActual. Lets review the value of the variables in this class. Variable source is the FlowableOnSubscribe implementation I provided in the call to Flowable.create in Main.flowableStyle. It is the object that is going to pass the external data to BufferAsyncEmitter. Variable backpressure was set to BUFFER in that same call. StrictSubscriber is the value of the input parameter, t in method subscribeActual. It is a wrapper class around the lambda definition of Subscriber I provided in Main.flowableStyle. Its downstream variable is set. Its upstream variable is not.

At line 64 t, StrictSubscriber, is registered with BufferAsyncEmitter and at line 69 BufferAsyncEmitter is registered with StrictSubscriber.

    42    @Override
    43    public void subscribeActual(Subscriber<? super T> t) {
    44        BaseEmitter<T> emitter;
                ......
    63            default: {
    64                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
    65                break;
    66            }
    67        }
    68
    69        t.onSubscribe(emitter);
    70        try {
    71            source.subscribe(emitter);
    72        }

Lets look at the code for StrictSubscriber.onSubscribe to see how BufferAsyncEmitter is set.

   81     @Override
   82     public void onSubscribe(Subscription s) {
   83         if (once.compareAndSet(false, true)) {
   84
   85             downstream.onSubscribe(this);
   86
   87             SubscriptionHelper.deferredSetOnce(this.upstream, requested, s);
   88        } else {
   89             s.cancel();
   90             cancel();
   91             onError(new IllegalStateException("§2.12 violated: onSubscribe must be called at most once"));
   92         }
   93     }

Variable downstream is the Subscriber object I define in Main.flowableStyle. The execution of line 85 registers this, StrictSubscriber, with my original subscriber, that is why the first line of the results output says the subscription type is StrictSubscriber and not BufferAsyncEmitter. This is new a layer of indirection that adds complexity to the wiring. I found it initially confusing.

The SubscriptionHelper (line 87) is setting the local variable, upstream to the value of input parameter s, (BufferAsyncEmitter) and passes the initial dataflow rate set in my subscriber via variable requested into upstream.

Back in FlowableCreate.subscribeActual, the final step is to connect the emitter, (BufferAsyncEmitter) with the external data source, (FlowableOnSubscribe) with the call to source.subscribe(emitter) at line 71.

That completes my discussion of wiring of a Subscriber, Publisher and Subscription. I hope it provided some clarity to this programming paradigm. But wait! How does the dataflow from FlowableOnSubscribe, to BufferAsyncEmitter, to StrictSubscriber, and then to Subscriber, you ask. I leave that as a topic for you to explore. I suggest using the github project for this article to walk through that code using a debugger.

References

         

YourKit
YourKit supports open source projects with innovative and intelligent tools for monitoring and profiling Java and .NET applications. YourKit is the creator of YourKit Java Profiler, YourKit .NET Profiler, and YourKit YouMonitor