神刀安全网

Combining multiple RxJs streams in Angular 2.0

Author:Torgeir Helgevold

Published: Sun Apr 17 2016

Viewed 179 times

In RxJs we often deal with multiple streams, but the end consumer typically only subscribes to a single stream. In this article we will look at ways to combine multiple streams into a single stream.

There are many ways for RxJs streams to converge on a single stream, but in this article we will look at flatMap, forkJoin, merge and concat.

Concat

Concat will combine two observables into a combined sequence, but the second observable will not start emitting until the first one has completed.

In my sample I am concatenating two timer observables using concat.

let first = Observable.timer(10,500).map(r => { return {source:1,value:r}; }).take(4); let second = Observable.timer(10,500).map(r => { return {source:2,value:r}; }).take(4); first.concat(second).subscribe(res => this.concatStream.push(res));

The emitted values are very simple, but we will receive all the values from the first observable before we start receiving values from the second. Below is a diagram showing the order of emitted events. Values from the first observable are in shown in green and values from the second observable are in blue.

Combining multiple RxJs streams in Angular 2.0

Merge

Merge is similar to concat, but it will interleave the emitted values instead of completing the first observable before starting the second one.

let first = Observable.timer(10,500).map(r => { return {source:1,value:r}; }).take(4); let second = Observable.timer(10,500).map(r => { return {source:2,value:r}; }).take(4); first.merge(second).subscribe(res => this.mergeStream.push(res));

In the diagram you can see the values from both observables interleaved with alternate green and blue boxes.

Combining multiple RxJs streams in Angular 2.0

forkJoin

We use forkJoin to execute observables in parallel. One common use case of this is making multiple http requests in parallel. In my sample I am forkJoining two very simple observables, but the key point is that the subscriber won’t receive any values until both observables have completed.

let first = Observable.of({source:1,value:1}); let second = Observable.of({source:2,value:1}); Observable.forkJoin(first,second) .subscribe((res:Array ) => this.forkJoinStream = res);

If we want to compare this to promises it would be similar to $q.all() from Angular 1.x

In the diagram we see that both observables completed before we received the result.

Combining multiple RxJs streams in Angular 2.0

flatMap

flatMap is how we handle dependencies between observables. My sample is contrived, but I am returning a value from the first observable that is needed by the second observable to calculate a sum.

Like I said this example is contrived, but if we compare this to promises, flatMap is how we would create the equivalent of promise chains. The code can be seen below.

let first = Observable.of(10); first.flatMap((operand1) => { return Observable.of(operand1 + 10); }) .subscribe(res => this.flatMappedStreams = {msg: ’10 + 10 = ‘ + res});

As always there is a livedemo.

I have also put the code on Github .

If you liked this article, share it with your friends. I also invite you to follow me on twitter at @helgevold

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » Combining multiple RxJs streams in Angular 2.0

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
分享按钮