神刀安全网

【Ovirt 笔记】Reactive Streams 的实现原理

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

分析整理的版本为 Ovirt 4.2.3 版本。

1. 简介

1.1 流

  • 流是由生产者生产并由一个或多个消费者消费的元素(item)的序列。
    • 这种生产者 – 消费者模型也被称为 source/sink 模型或发布者 – 订阅者(Publisher-Subscriber )模型。
  • 在流的处理机制中,pull 模型和 push 模型最为常见。
    • push 模型中,发布者将元素推送给订阅者。
      • 采用 push 方式,可以尽可能快地将消息发送给消费者,但是若消费者的处理消息的能力较弱(一条消息长时间处理),发布者会不断地向订阅者发送消息,消费者的缓冲区可能会溢出。
    • pull 模型中,订阅者向发布者请求元素。
      • 采用 pull 方式,会增加消息的延迟,即消息到达消费者的时间变长。
    • 在理想的情况下,发布者和订阅者都以同样的速率工作。

1.2 特殊情况处理

1.2.1 发布者与订阅者不按照同样的速率工作

  • 策略一,发布者比订阅者快,后者必须有一个无边界缓冲区来保存快速传入的元素或者丢弃无法处理的元素。
  • 策略二,使用背压(backpressure)策略,其中订阅者告诉发布者减慢速率并保存元素,直到订阅者做好准备。
    • 使用背压可确保更快的发布者不会压制较慢的订阅者。
    • 背压策略的几种实现方式。
      • 要求发布者拥有无限制的缓冲区,一直生成和保存元素。
      • 发布者可以实现有界缓冲区来保存有限数量的元素,如果缓冲区已满,可以选择放弃。
      • 发布者将发布元素重新发送到订阅者,这些元素发布时订阅者不能接受。

1.2.2 订阅者请求发布者的元素不可用

  • 发布者同步地向订阅者发送元素,并且订阅者同步处理它们,则发布者必须阻塞直到数据处理完成。
  • 两端进行异步处理,订阅者可以在从发布者请求元素之后继续处理其他任务。
    • 当更多的元素准备就绪时,发布者将它们异步发送给订阅者。

1.3 响应式流(Reactive Streams)

  • 响应式流从 2013 年开始,作为提供非阻塞背压的异步流处理标准的倡议,旨在解决处理元素流的问题。
    • 如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。
    • 响应式流模型非常简单。订阅者向发布者发送多个元素的异步请求。 发布者向订阅者异步发送多个或稍少的元素。
  • 响应式流在 pull 模型和 push 模型流处理机制之间动态切换。当订阅者较慢时,使用 pull 模型,当订阅者更快时使用 push 模型。
  • 响应式流的更多信息,可访问 http://www.reactive-streams.org/

2. 实现分析与整理

  • vdsm-jsonrpc-java-clientorg.reactivestreams 包中进行了以下接口的定义。
    • Publisher<T, S extends Subscriber<T>>
    • Subscriber<T>
    • Subscription
【Ovirt 笔记】Reactive Streams 的实现原理

响应式流模型

  • 发布者(Publisher)是潜在的无限数量的有序元素的生产者。
    • 根据收到的要求向当前订阅者发布(或发送)元素。
  • 订阅者(Subscriber)从发布者那里订阅并接收元素。
    • 发布者向订阅者发送订阅令牌(Subscription)。
    • 使用订阅令牌,订阅者从发布者那里请求多个元素。
    • 当元素准备就绪时,发布者向订阅者发送多个或更少的元素。
  • 订阅者可以请求更多的元素。
  • 发布者可能有多个来自订阅者的待处理请求。

2.1 发布者与订阅者的交互

  • 发布者可以拥有零个或多个订阅者。

发布者与订阅者的交互交互步骤,以虚拟机迁移功能模块代码为例(发布者 EventPublisher 与订阅者 VmMigrationProgressMonitoring 的交互,订阅令牌为 Subscription)。

  1. 创建发布者和订阅者,它们分别是 PublisherSubscriber 接口的实例。
    • EventSubscriber 是订阅者的基类。
    • VmMigrationProgressMonitoring 是订阅者的其中一个子类。
    • EventPublisher 是发布者实现类。
  2. 订阅者通过调用发布者的 subscribe() 方法来尝试订阅(绑定)发布者。
@PostConstruct private void subscribe() {      resourceManager.subscribe(this); } 
public void subscribe(EventSubscriber subscriber) {      log.debug("subscribe called with subscription id: {}", subscriber.getSubscriptionId());      ReactorFactory.getWorker(this.parallelism).getPublisher().subscribe(subscriber); } 
  1. 如果订阅成功,发布者 EventPublisher 使用 Subscription 异步调用订阅者的 onSubscribe(Subscription s) 方法。
    • 如果尝试订阅失败,则使用调用订阅者的 onError(Throwable t) 方法,并且发布者与订阅者交互结束。
@Override public void subscribe(final EventSubscriber subscriber) {     final AtomicInteger count = new AtomicInteger();     final SubscriptionHolder holder = new SubscriptionHolder(subscriber, count);     Subscription subscription = new Subscription() {              @Override             public void request(int n) {                 count.addAndGet(n);                 process(holder);             }              @Override             public void cancel() {                 clean(holder);                 subscriber.onComplete();             }      };      subscriber.onSubscribe(subscription);      this.matcher.add(holder); } 
if (map.containsKey(JsonRpcEvent.ERROR_KEY)) {     subscriber.onError(new ClientConnectionException((String) map.get(JsonRpcEvent.ERROR_KEY))); ...... 
  1. 订阅者可以通过调用 Subscriptionrequest(int n) 方法向发布者发送多个元素的请求。订阅者可以向发布者发送更多元素的多个请求,而不必等待其先前请求是否完成。
@Override public void onSubscribe(Subscription sub) {       subscription = sub;       subscription.request(1); } 
  1. 发布者在所有先前的请求中调用订阅者的 onNext(T t) 方法,直到订阅者请求的元素数量上限,在每次调用中向订阅者发送一个元素(这里的元素为 Map<String, Object>)。
    • 如果发布者没有更多的元素要发送给订阅者,则发布者调用订阅者的 onComplete() 方法来发信号通知流,从而结束发布者与订阅者交互。
    • 如果订阅者请求 Long.MAX_VALUE 元素,则实际上是无限制的请求,并且流实际上是推送流。
Map<String, Object> map = this.decomposer.decompose(event); ...... subscriber.onNext(map); 
  1. 如果发布者随时遇到错误,会调用订阅者的 onError() 方法。
  2. 订阅者可以通过调用其 Subscriptioncancel() 方法来取消订阅。
    • 一旦订阅被取消,发布者与订阅者交互结束。然而,如果在请求取消之前存在未决请求,订阅者可以在取消订阅之后接收元素。
@Override public void cancel() {    clean(holder);    subscriber.onComplete(); } 

2.2 总结

  • 一旦在订阅者上调用了 onComplete()onError() 方法,订阅者就不再收到发布者的通知。
  • 发布者的 subscribe() 方法被调用之后,如果订阅者不取消其订阅,则保证以下订阅方法调用序列(正则表达式)。
    • 符号 * 表示零次或多次, ? 表示零次或一次。
onSubscribe onNext* (onError | onComplete)? 
  • 在订阅者上的第一个方法调用是 onSubscribe() 方法,它是成功订阅(绑定)发布者的通知。
  • 订阅者的 onNext() 方法可以被调用零次或多次,每次调用指定元素发布。onComplete()
    onError() 方法可以被调用零次或一次来指示终止状态,只要订阅者不取消其订阅,就会调用这些方法。

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » 【Ovirt 笔记】Reactive Streams 的实现原理

分享到:更多 ()