create、empty、just、fromArray、range接收原始数据,创建一个事件流“上游”的源头-“被观察者”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableEmitter; import io.reactivex.rxjava3.core.ObservableOnSubscribe; import io.reactivex.rxjava3.core.Observer; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.functions.Consumer; public class Main { // java项目:事件的创建(源集合数据的接收) public static void main(String[] args) { Observable.range(3, 3) // 从数字3开始,累计执行3次。3、4、5 .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer index) throws Throwable { String n = Thread.currentThread().getName(); // main System.err.println(n + " >>--> " + index); } }); // main >>--> 3 // main >>--> 4 // main >>--> 5 } private static void empty() { Observable.empty() // 无数据发送,只发送一个onComplete事件。通常用于模拟耗时操作 .subscribe( // // new Consumer<Object>() { // 只响应有数据的onNext事件 // @Override // public void accept(@NonNull Object t) throws Throwable { // // empty事件是无数据的,这里永远不会执行 // } // } new Observer<Object>() { @Override public void onSubscribe(@NonNull Disposable d) { String n = Thread.currentThread().getName(); // main System.err.println(n + " >>----> onSubscribe"); } @Override public void onNext(@NonNull Object t) { // 无数据发送,所以不会执行这里 } @Override public void onError(@NonNull Throwable e) { String n = Thread.currentThread().getName(); // main System.err.println(n + " >>----> onError"); } @Override public void onComplete() { String n = Thread.currentThread().getName(); // main System.err.println(n + " >>----> onComplete"); } } ); // main >>----> onSubscribe // main >>----> onComplete } private static void fromArray() { String[] arr = new String[] { "a", "b", "c" }; Observable.fromArray(arr) // 要处理一个数据数组。数据集合List的用fromIterable。依次onNext发射元素 .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String t) throws Throwable { // 数组有几个元素,accept就执行几次 String n = Thread.currentThread().getName(); // main System.err.println(n + " >>--> " + t); } }); // main >>--> a // main >>--> b // main >>--> c } private static void just() { // 明确要处理的数据就这么几个。依次onNext发射参数 // Observable.just(item1, item2, item3, item4, item5, item6, item7, item8, item9, item10) // Observable.just(item1, item2, item3, item4, item5, item6, item7, item8, item9) // Observable.just(item1, item2, item3, item4, item5, item6, item7, item8) // Observable.just(item1, item2, item3, item4, item5, item6, item7) // Observable.just(item1, item2, item3, item4, item5, item6) // Observable.just(item1, item2, item3, item4, item5) // Observable.just(item1, item2, item3, item4) // Observable.just(item1, item2, item3) // Observable.just(item1, item2) // Observable.just(item1) Observable.just("a", "b", "c") .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String t) throws Throwable { // just接收几个参数,accept就执行几次 String n = Thread.currentThread().getName(); // main System.err.println(n + " >>--> " + t); } }); // main >>--> a // main >>--> b // main >>--> c } private static void create() { Observable.create( // 创建一个事件流。数据由ObservableEmitter生产 new ObservableOnSubscribe<Integer>() { // 上游,被观察者 @Override public void subscribe(@NonNull ObservableEmitter<@NonNull Integer> emitter) throws Throwable { String n = Thread.currentThread().getName(); // main System.err.println(n + " ------> onNext"); emitter.onNext(1); // 发射数据 System.err.println(n + " ------> onComplete"); emitter.onComplete(); } }) .subscribe( // 订阅 new Observer<Integer>() { // 下游,观察者 @Override public void onSubscribe(@NonNull Disposable d) { // .subscribe( 后立即执行 String n = Thread.currentThread().getName(); // main System.err.println(n + " >>----> onSubscribe"); } @Override public void onNext(@NonNull Integer t) { // emitter.onNext一次,就onNext一次 String n = Thread.currentThread().getName(); // main System.err.println(n + " >>----> onNext " + t); } @Override public void onError(@NonNull Throwable e) { String n = Thread.currentThread().getName(); // main System.err.println(n + " >>----> onError"); } @Override public void onComplete() { // emitter.onComplete()了,就onComplete String n = Thread.currentThread().getName(); // main System.err.println(n + " >>----> onComplete"); } } // new Consumer<Integer>() { // 下游,消费者-观察者 // @Override // public void accept(@NonNull Integer t) throws Throwable { // emitter.onNext一次,就accept一次 // String n = Thread.currentThread().getName(); // main // System.err.println(n + " >>----> " + t); // } // } ); // main >>----> onSubscribe // main ------> onNext // main >>----> onNext 1 // main ------> onComplete // main >>----> onComplete } } |
- end
声明
本文由崔维友 威格灵 cuiweiyou vigiles cuiweiyou 原创,转载请注明出处:http://www.gaohaiyan.com/3986.html
承接App定制、企业web站点、办公系统软件 设计开发,外包项目,毕设