Basics in the foundation of RxJS
WARNING: This is a Google translated (originally Chinese) and reposted article that probably is mostly comfortably understandable for senior JavaScript developers.
4RxJS has attracted attention in recent years, such as being adopted also in Angular dependency library.
Let's confirm the foundation in that foundation.
There are various concepts such as Observable, Observer, Subscribe, and so on, so the purpose is to organize around it.
To be honest, I also took quite a while just to understand this far. Even if he said "to" subscribe "to the stream, I could not understand it at all. I did not understand the meaning, or "How I am going in the end" was a feeling that I could not grasp it.
As in this article, I would like to accumulate understanding one by one for the first time from as simple as possible.
Instead of using TypeScript, I will write code in JavaScript. Since TypeScript allows you to describe the type, reference information increases, but the necessary knowledge to understand also increases. Less necessary knowledge is better.
Relationship between Observer and Subscribe
Observer
Observer is an object.
It is made by combining three functions into one object.
Three functions are "next" "error" "complete".
Subscribe
Subscribe is a function. So sometimes we call it "Subscribe function" below.
The Subscribe function receives the Observer and executes the function (next, error, complete) in the Observer.
In what order, what kind of argument to give and execute is described in the Subscribe function.
An example
Let's check with the code.
In this example, after executing the next function in Observer three times while changing the argument, try executing the complete function in Observer.
RxJS has not appeared yet (What is it !!!)
code
var Observer_1 = {
next : x => console . log ( 'Observer1 got a value:' + x ),
error : err => console . log ( 'Observer1 got an, error:' + err ),
complete : () => console . log ( 'Observer 1 got Complete' ),
}; // collection of three functions next, error, complete
function Subscribe_1 ( observer ) {
observer . next ( 1 );
observer . next ( 2 );
observer . next ( 3 );
observer . complete ();
} receive // observer, functions to execute a function with the observer
subscribe_1 ( observer_1 );
output
Observer 1 got a value: 1
Observer 1 got a value: 2
Observer 1 got a value: 3
Observer 1 got Complete
Try changing the interpretation
By the way, this code
Subscribe handed over the data "1, 2, 3" to the Observer in turn, and then notified "completion"
It can be interpreted as.
Let's look at the same code again from this point of view.
code
var Observer_1 = {
next : x => console . log ( 'Observer1 got a value:' + x ),
error : err => console . log ( 'Observer1 got an, error:' + err ),
complete : () => console . log ( 'Observer 1 got Complete' ),
}; // what you do when you receive the data is written
function Subscribe_1 ( observer ) {
observer . next ( 1 );
observer . next ( 2 );
observer . next ( 3 );
observer . complete ();
} // the specified Observer, data of "1, 2, 3" And "complete" is sent
subscribe_1 ( observer_1 );
// specify observer_1 as the receiver of data sent by subscribe_1
output
Observer 1 got a value: 1
Observer 1 got a value: 2
Observer 1 got a value: 3
Observer 1 got Complete
I got the same output. Because it is the same code other than comments, it seems so.
Summary
- Observer has three functions, next, error, and complete.
- Passing Observer to the Subscribe function executes the function in Observer.
- It can be interpreted that Subscribe is passing data to Observer.
Relationship between Observable and Subscribe
Let's finally read and use the RxJS library.
Observable
The most basic in RxJS is the Observable object.
It is similar in name to the Observer object, but please be careful as it is a different one. I also do not want such a confusing place. Please distinguish firmly.
An Observable object has subscribea method named.
Image, this is it.
Observable_a = {
subscribe : function ( observer ) {
observer . next ( 1 );
observer . next ( 2 );
observer . next ( 3 );
observer . complete ();
}
}
subscribe_1I tried to write the contents of the function we just made.
subscribeThe method is a method that receives an Observer object and executes the functions (next, error, complete) in it. (Although we have not called error at least for now)
Even this is really an object with just one method, and there is nothing else to do.
Do not do that, let's make a real Observable object using the RxJS library properly.
Make Observable
createMake with
To create an Observable object from 1 Rx.Observable.createwe use the function.
When Rx.Observable.createpassing a function to this argument, it is subscriberegistered as a method named as it is.
Then, passing the Observer object to the subscribe method will execute the function (next, error, complete) in the Observer.
let's do it.
code
Rx = require ( './rx.min.js' ); // You can substitute import statements
function Subscribe_1 ( observer ) {
observer . next ( 1 );
observer . next ( 2 );
observer . next ( 3 );
observer . complete ();
} // same Subscribe function as earlier
the Observer of // "Specify" 1, 2, 3 "data and" to send "completion" "
var observable_1 = Rx . Observable . create ( subscribe_1 );
// Create an Observable object
// subscribe_1 is registered as a subscribe method.
var Observer_1 = {
next : x => console . log ( 'Observer1 got a value:' + x ),
error : err => console . log ( 'Observer1 got an, error:' + err ),
complete : () => console . log ( 'Observer 1 got Complete' ),
}; //
You can also interpret it as the same Observer object // "Behavior when data is received"
observable_1 . subscribe ( observer_1 );
// Call the subscribe method and pass the Observer
// You can interpret it as "observer_1 specified as the data recipient"
output
Observer 1 got a value: 1
Observer 1 got a value: 2
Observer 1 got a value: 3
Observer 1 got Complete
The same output was output.
It is almost the same as when we made and executed Observer object and Subscribe function earlier.
Before calling the Subscribe function directly, the difference is that it is called only once in Observable method.
Make it without using create
createYou can create the same Observable object as before without using.
code
Rx = require ( './rx.min.js' ); // You can substitute import statements
observable_1 = Rx . Observable . of ( 1 , 2 , 3 ); // use of rather than create
var Observer_1 = {
next : x => console . log ( 'Observer1 got a value:' + x ),
error : err => console . log ( 'Observer1 got an, error:' + err ),
complete : () => console . log ( 'Observer 1 got Complete' ),
}; // The same Observer object
observable_1 . subscribe ( observer_ 1 );
output
Observer 1 got a value: 1
Observer 1 got a value: 2
Observer 1 got a value: 3
Observer 1 got Complete
The points are the following two points.
- createInstead ofof using
- I did not create a Subscribe function myself
- Observable made subscribewith of had methods from the beginning
The same is true as follows.
observable_1 = Rx . Observable . from ([ 1 , 2 , 3 ]); // use from instead of create
Such a function that generates Observable is called "Creation Operator".
For RxJS, there are various Creation Operators that automatically create Observable with a nice Subscribe function.
Abbreviation of Observer object
Look at the following code
code
Rx = require ( './rx.min.js' ); // You can substitute import statements
Rx . Observable . Of ( 1 , 2 , 3 ). Subscribe ( x => console . Log ( "The observer got a value:" + x ))
output
The observer got a value: 1
The observer got a value: 2
The observer got a value: 3
Something, it was a very short code, and I got similar output.
Except for the part that loads the library, there is only one line.
Let's see the contents of the code for a moment.
Rx . Observable . Of ( 1 , 2 , 3 ). Subscribe ( What is it? )
The whole shape is like this.
ofYou used Observable to subscribecall that method.
That means that the Observer object should be in the "What is it?" Part.
But in code
x => console . log ( "The observer got a value:" + x )
It contains. This is just a function. Moreover, it is unnamed.
Actually, subscribeif you give a function suddenly to an argument of a method, you will have specified the next function of Observer.
subscribeOf the argument can specify a function up to three, in turn next, error, completeit will be treated as.
This code means that instead of the Observer object, we nextjust subscribepassed the function .
completeSince no function is passed, the Observer got Completeline corresponding to the output which was output until a while is not output.
Summary
- Observable has subscribea method called
- Observable is createmake or use the Creation Operator
- You subscribecan pass functions directly to Observable
Benefits of Observable
The foundation in the real foundation has ended so far. For the time being, I can make Observable and move it.
But then, I do not know the meaning of using Observable.
I traced using Observable what I could do without using Observable at the very beginning.
Observable I do not need it! It is a story.
Well then what is amazing at Observable.
Benefits of being able to do asynchronous processing? What?
What is often said is asynchronous processing .
Is "the capability of asynchronous processing" a merit of Observable? What?
Please see the following code. Only "// important" is written is important.
code
Rx = require ( './rx.min.js' ); // You can substitute import statements
var Observer_1 = {
next : x => console . log ( 'Observer1 got a value:' + x ),
error : err => console . log ( 'Observer1 got an, error:' + err ),
complete : () => console . log ( 'Observer 1 got Complete' ),
}; // just the same
function Subscribe_1 ( observer ) {
observer . next ( 1 );
observer . next ( 2 );
setTimeout ( function () { // important
observer . next ( 3 ); // important
}, 1000 ); // important
}
var observable_1 = Rx . Observable . create ( subscribe_1 );
// Create an Observable object
Console . Log ( "Before" ); // important
Observable_1 . Subscribe ( Observer_1 ); // important
Console . Log ( "After" ); // important
before
Observer 1 got a value: 1
Observer 1 got a value: 2
after
Observer 1 got a value: 3
"After" is output before "3" is outputted.
setTimeout(処理,1000)Is a function that waits for 1 second (1000 milliseconds) before executing processing
"After" was output asynchronously without waiting for it. Indeed, asynchronous processing is done.
Let's look at the following code.
code
var Observer_1 = {
next : x => console . log ( 'Observer1 got a value:' + x ),
error : err => console . log ( 'Observer1 got an, error:' + err ),
complete : () => console . log ( 'Observer 1 got Complete' ),
}; // just the same
function Subscribe_1 ( observer ) {
observer . next ( 1 );
observer . next ( 2 );
setTimeout ( function () {
observer . next ( 3 );
}, 1000 );
}
console . log ( "before" );
subscribe_1 ( observer_1 ); // important
console . log ( "after" );
output
before
Observer 1 got a value: 1
Observer 1 got a value: 2
after
Observer 1 got a value: 3
The same result comes out, but the point is that you are not using RxJS .
that? You can process asynchronously, right? What? What? What?
Yes. You can do asynchronous processing itself without using RxJS separately.
RxJS is not a library for asynchronous processing.
Then what is it?
It is a library for refreshing asynchronous processing!
Operator is convenient! !
I used the Creation Operator earlier, but this is just one of the RxJS number of operators.
There are many other Operator categories, and there are many Operators in each category.
In order to introduce its convenience, let's consider the following example.
"Add the numbers flowing every second every even number, convert it to a character string at any time, add it back !! and output it"
Because it is an example made for explanation, it is like "there is such a scene ???", but let's do it well.
code
Rx = require ( './rx.min.js' ); // You can substitute import statements
Rx . Observable . Interval ( 1000 ) // generate one more integer every second
. Filter ( x => x % 2 === 0 ) // even number only leave
. Scan (( acc , curr ) => acc + Curr , 0 ) // plus from time to time
. Map ( X => string ( X ) Tasu "!!" ) is converted into a // string put !!
. Subscribe ( X => console . log ( x )); // output
output
0 !!
2 !!
6 !!
12 !!
20 !!
30 !!
(Hereafter, it will last forever until you stop with Ctrl + C etc.)
It will be output one by one every 2 seconds. You did it!
requireExcluding the line of, there are only 5 lines. It is amazing.
It is used here interval, filter, scan, mapbut is the Operator.
Starting with explanations of each Operator will not be explained as it will not be the category of "the foundation of RxJS foundation".
These operators return Observer as return values, so .you can link them as method chains using.
It gradually applies Operator and changes contents of Observer (content of subscribe method)!
This, how many lines will it be written if you do not use RxJS ...?
It is troublesome to think about how to write in the first place.
Anyway using RxJS, you can easily write this kind of handling.
Caution (With RxJS 6.x and above, the way the operator is written changes)
On April 24, 2018, RxJS version 6.0.0 was released.
With RxJS 6.x and above, the code in this article does not work as it is. (It's been half a year since I wrote the article and it will not move)
If you write the necessary changes easily
- operator observable1.pipe(map(...),reduce(...)).subscribe(...);write like
- import { Observable, of } from 'rxjs';Write an import statement like
- import { map, reduce } from 'rxjs/operators';Write import of operator as
Has become.
For details, please see the following site.
The Operator is similar to manipulating an array
mapIn filterfact, Operator, in fact Arrayオブジェクト, there is the same method in the array method.
Or scanis, the place referred to in the array reducehas been a movement similar to that of. (And Observable also has an reduceOperator.)
Those who say "Array operation is so familiar!" Thinks Observable as "an array in which values are sent one by one", it is easy to understand.
Summary
- Observable operates and operates more and more
- It is amazing to be able to write asynchronous processing with the sense of manipulating the array
Now. In this article, it is here to explain with this spirit.
It can be called "foundation of foundation" in the range up to this level.
"Operators operate" cores "such as Observable, Subscribe and Observer. This is the foundation underlying.
However, there are some important concepts that have not been mentioned yet, so I will introduce them crisply from here.
Subscription
Suddenly, in fact, the subscribe method has a return value.
subscribe_1There was no return value in the function that I made first, but that is just the fact that I did not write the return value, in fact it is the figure that should actually write the return value properly.
Then what kind of return value is required is "Subscription object".
The Subscription object has unsubscribea method called.
You can call this method to stop passing data from the Subscribe function to the Observer object.
Let's look at an example.
The Subscribe function of Observable created by Creation Operator will properly return Subscription object.
Let's stop the non-stop program that I made earlier by specifying the time.
code
Rx = require ( './rx.min.js' ); // You can substitute import statements
Subscription_1 = Rx . Observable . interval ( 1000 ) substitutes the // return value
. filter ( x => x % 2 === 0 )
. scan (( acc , curr ) => acc + curr , 0 )
. map ( x => String ( x ) + "!!" )
. Subscribe ( x => console .log ( x ));
setTimeout ( function () {
subscription_1 . unsubscribe (); // call unsubscribe
}, 10000 );
output
0 !!
2 !!
6 !!
12 !!
20 !!
It stopped at 10 seconds (10000 milliseconds). It is the expected movement!
Subject
There is an object called Subject.
This is explained as "Observable and Observer".
But when I say something, I think Observer is closer ~.
The fundamental image is that a group of multiple Observers is called Subject .
Register Observer in Subject
The Subject object has a subscribemethod.
Observable also had subscribemethods. This is why it is said that "Subject is Observable".
subscribePass Observer objects as well as methods.
code
Rx = require ( './rx.min.js' ); // You can substitute import statements
var Observer_1 = {
next : x => console . log ( 'Observer1 got a value:' + x ),
}; // almost the same Observer objects as earlier
was omitted // error and complete. Still it works so it's okay.
var subject = new Rx . Subject (); // Create Subject
subject . subscribe ( observer_1 ); // call subscribe
output
However, this code does not output anything.
If you look closely, in fact, the nextdata you pass to Observer has not come from anywhere.
subscribeJust calling a method and passing Observer does nothing, in particular.
This place is different from Observer.
So subscribewhat will happen if you pass Observer to the Subject method
The passed Observer is registered in that Subject.
Multiple Observer can be registered in one Subject.
Then, when nextyou call and execute the Subject function,
The registered Observer's nextfunctions are executed all at once.
You have nexta function called Subject as a method. This is why such a place says "Subject is also an Observer".
Let's look at an example.
code
x = require ( ' . / rx.min.js' ); // You can substitute import statements
var Observer_1 = {
next : x => console . log ( 'Observer1 got a value:' + x ),
complete : x => console . log ( 'Observer1 got Complete' ),
}; // just now substantially the same Observer object
/ / Since we did not use error, we omitted it.
var Observer_2 = {
next : x => console . log ( 'Observer2 got a value:' + x ),
complete : x => console . log ( 'Observer2 got Complete' ),
}; // just now substantially the same Observer object
var Observer_3 = {
next : x => console . log ( 'Observer3 got a value:' + x ),
complete : x => console . log ( 'Observer3 got Complete' ),
}; // just now substantially the same Observer object
var subject_1 = new Rx . Subject (); // Create Subject
Subject_1 . subscribe ( Observer_1 ); // Observer registration
Subject_1 . subscribe ( Observer_2 ); // Observer registration
Subject_1 . subscribe ( Observer_3 ); registering // Observer
subject_1 . next ( " oraora " ); // execute the next method on Subject
output
Observer 1 got a value: Orao laola
Observer 2 got a value: Orao laola
Observer 3 got a value: Orao laola
You can see that the "oraora" passed to Subject's next function was also passed to each Observer.
subscribePass Subject to Observable method
subscribeLet's pass this Subject to the method of Observable
The only important thing is the bottom line. The rest is almost the same as before.
code
Rx = require ( './rx.min.js' ); // You can substitute import statements
var Observer_1 = {
next : x => console . log ( 'Observer1 got a value:' + x ),
complete : x => console . log ( 'Observer1 got Complete' ),
}; // just now substantially the same Observer object
var Observer_2 = {
next : x => console . log ( 'Observer2 got a value:' + x ),
complete : x => console . log ( 'Observer2 got Complete' ),
}; // just now substantially the same Observer object
var Observer_3 = {
next : x => console . log ( 'Observer3 got a value:' + x ),
complete : x => console . log ( 'Observer3 got Complete' ),
}; // just now substantially the same Observer object
var subject_1 = new Rx . Subject (); // Create Subject
Subject_1 . subscribe ( Observer_1 ); // Observer registration
Subject_1 . subscribe ( Observer_2 ); // Observer registration
Subject_1 . subscribe ( Observer_3 ); registering // Observer
Rx . Observable . Of ( 1 , 2 ). Subscribe ( subject_ 1 ); // Important
output
Observer 1 got a value: 1
Observer 2 got a value: 1
Observer 3 got a value: 1
Observer 1 got a value: 2
Observer 2 got a value: 2
Observer 3 got a value: 2
Observer 1 got Complete
Observer 2 got Complete
Observer 3 got Complete
After passing the Subject to Observable, the data was duplicated and flowed to multiple Observer registered in that Subject.
In this way, using Subject, data coming from one Observable can be passed to multiple Observer.
Type of Subject
Subject has some variants with different timings to pass data to Observer.
Behavior Subject
When the last transmitted data is held and a new Observer is registered, it sends the data held in that Observer. (Initially, there is no "last sent data", so set the initial value)
In this case, if subscribeyou call the method of Subject and pass Observer, Observer nextwill be executed immediately .
Replay Subject
It is similar to Behavior Subject, but it holds the specified number of data.
When the Observer is newly registered, it passes all the data it holds to Observer in order of oldest.
Async Subject
Here next, even if you call the method of Subject, we do not pass data to Observer. ( nextDo not call the method of registered Observer )
When the Subject completeis called, it sends the last data and a completion notification to each Observer.
Scheduler
Finally, I will talk about the Scheduler object.
The Scheduler object, as its name suggests, adjusts the timing of sending data from Observable to Observer
For example, you can pass it as an argument when creating Observable with Creation Operator.
Let's look at an example.
code
Rx = require ( './rx.min.js' ); // You can substitute import statements
var scheduler_1 = Rx . Scheduler . async ; // Create Scheduler
var Observer_1 = {
next : x => console . log ( 'Observer1 got a value:' + x ),
complete : x => console . log ( 'Observer1 got Complete' ),
}; // just now substantially the same Observer object
Console . Log ( "Before" );
Rx . Observable . Of ( 1 , 2 , 3 , Scheduler_1 .) Subscribe ( Observer_1 ); // are passing the Scheduler
Console . Log ( "After" );
output
before
after
Observer 1 got a value: 1
Observer 1 got a value: 2
Observer 1 got a value: 3
Observer 1 got Complete
Points, afterbut Observer1 got a value: 1it has come earlier than, is a place called.
If you did not pass the Scheduler, afterteeth Observer1 got Completeshould come to the next.
I will not explain the specific movement of the Scheduler today, but anyway, you can adjust the execution order with Scheduler.
createWhen creating an Observable using, you onSubscribewill call the method called, passing Scheduler.
Next notice
Now.
Since Subject and Scheduler are also easily described in this article, there is no particular "next time"!
Well then!
I studied so much, I got a high school fee for my parents, entered the University of Tokyo, graduated, now I am cleaning the toilet to pay pensions and residence tax, while relying on my parents for meals and meals .