1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.functions.BiFunction; import io.reactivex.rxjava3.functions.Consumer; public class Main5 { // java项目 public static void main(String[] args) { // 同时组合多个被观察者(事件源),至少2个,最多9个。 Observable.zip( // 如果数据源的数据量不匹配,按最小数量保留 Observable.just("数学","语文"), Observable.just(90,80, 110), new BiFunction<String,Integer,Map<String,Integer>>() { // 数据合并器 @Override public @NonNull Map<String, Integer> apply(@NonNull String t1, @NonNull Integer t2) throws Throwable { Map<String, Integer> m = new HashMap<>(); m.put(t1, t2); return m; } } ) .subscribe(new Consumer<Map<String, Integer>>() { @Override public void accept(@NonNull Map<String, Integer> t) throws Throwable { System.err.println("-=-=-=> " + t); } }); // -=-=-=> {数学=90} // -=-=-=> {语文=80} Observable.merge( // 同时组合多个被观察者(事件源),至少2个,最多4个。 Observable.intervalRange(1, 4, 1, 2, TimeUnit.SECONDS), Observable.intervalRange(5, 4, 1, 2, TimeUnit.SECONDS), Observable.intervalRange(9, 4, 1, 2, TimeUnit.SECONDS) // 这些被观察者是同时执行的,即异步的。 ) .subscribe(new Consumer<Long>() { @Override public void accept(@NonNull Long t) throws Throwable { System.err.println(">>>>> " + t); } }); try { Thread.sleep(1000*15); } catch (InterruptedException e) { e.printStackTrace(); } // >>>>> 1 // >>>>> 5 // >>>>> 9 // >>>>> 6 // >>>>> 2 // >>>>> 10 // >>>>> 7 // >>>>> 3 // >>>>> 11 // >>>>> 4 // >>>>> 12 // >>>>> 8 Observable.concat( // 同时组合多个被观察者(事件源),至少2个,最多4个。 Observable.just("item1", "item2"), // create Observable.just("item3"), // fromArray Observable.just("item4", "item5"), // Observable.just("item6") // 加入的顺序即被处理的顺序。同步的 ) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String t) throws Throwable { System.err.println("+++> " + t); } }); // +++> item1 // +++> item2 // +++> item3 // +++> item4 // +++> item5 // +++> item6 Observable.just("item1", "item2") // 被观察者(事件源) .concatWith(Observable.just("item3", "item4")) // 加入另一个被观察者(事件源)。后执行 .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String t) throws Throwable { System.err.println("---> " + t); } }); // ---> item1 // ---> item2 // ---> item3 // ---> item4 Observable.just("item1", "item2") // 被观察者(事件源) .startWith(Observable.just("item3", "item4")) // 加入另一个被观察者(事件源)。先执行 .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String t) throws Throwable { System.err.println("===> " + t); } }); // ===> item3 // ===> item4 // ===> item1 // ===> item2 } } |
- end
声明
本文由崔维友 威格灵 cuiweiyou vigiles cuiweiyou 原创,转载请注明出处:http://www.gaohaiyan.com/3995.html
承接App定制、企业web站点、办公系统软件 设计开发,外包项目,毕设