I really want to understand the async with RxJS; I wrote this example to see how async function calls work inside RxJS observables; the result is:
func1 --- 10
func1 --- 20
func1 --- 40
func1 --- 30
switchMap ---1--- -->item: 0
switchMap ---1--- -->item: 1
switchMap ---1--- -->item: 2
switchMap ---1--- -->item: 3
switchMap ---1--- -->item: 4
switchMap ---1--- -->item: 5
switchMap ---1--- -->item: 6
switchMap ---1--- -->item: 7
END
switchMap ---2--- -->item: 1
switchMap ---3--- -->item: 0.111
switchMap ---3--- -->item: 1.111
switchMap ---3--- -->item: 2.111
switchMap ---3--- -->item: 3.111
switchMap ---3--- -->item: 4.111
switchMap ---3--- -->item: 5.111
switchMap ---3--- -->item: 6.111
switchMap ---3--- -->item: 7.111
switchMap ---3--- -->item: 8.111
switchMap ---3--- -->item: 9.111
MERGEMAP: item-->1000
MERGEMAP: item-->2000
N E X T --> 1000
N E X T --> 2000
MERGEMAP: item-->3000
MERGEMAP: item-->4000
N E X T --> 3000
N E X T --> 4000
MERGEMAP: item-->5000
N E X T --> 5000
C O M P L E T E
I don't understand why func1 --- 40 occurs befor func1 --- 30 and why END is printed in the middle of switchMap ---1--- and switchMap ---2---
Thanks.
import { bufferCount, from, mergeMap, Observable, switchMap } from "rxjs";
module async1 {
async function func1() {
console.log("func1 --- 10");
const loop0 = async () => {
return from([0, 1, 2, 3, 4, 5, 6, 7]);
};
const loop1 = async () => {
return 1;
};
const loop2 = async () => {
const arrayLoop2 = [];
for (let i = 0; i < 10; i ) {
arrayLoop2.push(i 0.111);
}
return arrayLoop2;
};
const loop3 = async () => {
const arrayLoop3 = [1000, 2000, 3000, 4000, 5000];
return from(arrayLoop3);
};
let myObservable: Observable<number>;
console.log("func1 --- 20");
loop0().then((value) => {
myObservable = value;
const myPipeline = myObservable!.pipe(
switchMap((item) => {
console.log(`switchMap ---1--- -->item: ${item}`);
const loop1Result = from(loop1());
return loop1Result;
}),
switchMap((item) => {
console.log(`switchMap ---2--- -->item: ${item}`);
const loop2Result = loop2();
return loop2Result;
}),
switchMap((items) => {
items.forEach((item) => {
console.log(`switchMap ---3--- -->item: ${item}`);
});
const loop3Result = loop3();
return loop3Result;
}),
switchMap((item) => {
return item;
}),
bufferCount(2),
mergeMap((items) => {
items.forEach((item) => {
console.log(`MERGEMAP: item-->${item}`);
});
return items;
}, 3)
);
console.log("func1 --- 30");
const mySubscription = myPipeline.subscribe({
next: (item) => {
console.log(`N E X T --> ${item}`);
},
error: () => {
console.log(`E R R O R`);
},
complete: () => {
console.log(`C O M P L E T E`);
mySubscription.unsubscribe();
},
});
});
console.log("func1 --- 40");
return true;
}
func1().then((resolve) => {
console.log("***END***");
});
}
CodePudding user response:
You don't have control, when code related to observables will be executed, so if you want to be sure that entire logic inside func1()
will be completed before calling next function
, you shouldn't use observables and subscriptions inside func1()
. You should use Promise<Type>
combined with await
instead. Then you can call func1
like that:
await func1();
If you want to execute functions console.log("func1 --- 40")
and console.log("***END***")
after RxJS work, you should add them in body of complete
:
complete: () => {
console.log("func1 --- 40")
console.log(`C O M P L E T E`);
console.log("***END***")
mySubscription.unsubscribe();
}