1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
import java.util.List; import java.util.concurrent.TimeUnit; import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableSource; import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.observables.GroupedObservable; public class Main3 { // java项目。数据的转换 public static void main(String[] args) { map(); flatMap(); concatMap(); groupBy(); buffer(); } private static void buffer() { Observable.just(666666, 1,333) // 共3个数据 .buffer(2) // 按量发射。每次发射2个,如此,共发射2次 .subscribe(new Consumer<List<Integer>>() { // 下游每次接收一个GroupedObservable @Override public void accept(@NonNull List<Integer> bufs) throws Throwable { String n = Thread.currentThread().getName(); System.err.println(n + "-----> " + bufs); } }); // main-----> [666666, 1] // main-----> [333] } private static void groupBy() { Observable.just(666666, 1,333) // 分组 .groupBy(new Function<Integer, String>() { @Override public @NonNull String apply(@NonNull Integer t) throws Throwable { String n = Thread.currentThread().getName(); System.err.println(n + "---> " + t); return t<6000 ? "小于6k" : "大于6k"; // 返回一个组名 } }) .subscribe(new Consumer<GroupedObservable<String, Integer>>() { // 下游每次接收一个GroupedObservable @Override public void accept(@NonNull GroupedObservable<String, Integer> groupedObservable) throws Throwable { String n = Thread.currentThread().getName(); System.err.println(n + "-组> " + groupedObservable.getKey()); groupedObservable.subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer t) throws Throwable { String n = Thread.currentThread().getName(); System.err.println(n + "------值> " + t); } }); } }); // main---> 666666 // main-组> 大于6k // main------值> 666666 // main---> 1 // main-组> 小于6k // main------值> 1 // main---> 333 // main------值> 333 } private static void concatMap() { // flatMap转换改为concatMap Observable.just(666666, 1,333) .concatMap(new Function<Integer, ObservableSource<String>>(){ @Override public @NonNull ObservableSource<String> apply(@NonNull Integer t) throws Throwable { String n = Thread.currentThread().getName(); System.err.println(n + "------> " + t); // return 有序。 return Observable.just(t+"-11", t+"-22", t+"-33").delay(4, TimeUnit.SECONDS); } }) .subscribe(new Consumer<String>() { // 下游。接收最终数据 @Override public void accept(@NonNull String t) throws Throwable { String n = Thread.currentThread().getName(); System.err.println(n + "====> " + t); } }); try { Thread.sleep(1000*15); } catch (InterruptedException e) { e.printStackTrace(); } // main------> 666666 // RxComputationThreadPool-1====> 666666-11 // RxComputationThreadPool-1====> 666666-22 // RxComputationThreadPool-1====> 666666-33 // RxComputationThreadPool-1------> 1 // RxComputationThreadPool-2====> 1-11 // RxComputationThreadPool-2====> 1-22 // RxComputationThreadPool-2====> 1-33 // RxComputationThreadPool-2------> 333 // RxComputationThreadPool-3====> 333-11 // RxComputationThreadPool-3====> 333-22 // RxComputationThreadPool-3====> 333-33 } private static void flatMap() { Observable.just(666666, 1,333) // flatMap转换。just接收几个参数,flatMap就执行几次。但是,return的是一个被观察者 .flatMap(new Function<Integer, ObservableSource<String>>(){ @Override public @NonNull ObservableSource<String> apply(@NonNull Integer t) throws Throwable { String n = Thread.currentThread().getName(); System.err.println(n + "------> " + t); // return Observable.create(new ...); // 返回一个被观察者。这个被观察者里可能onNext多次 // return Observable.just(t.length()); // delay启动的是子线程 // return无序。 return Observable.just(t+"-11", t+"-22", t+"-33").delay(4, TimeUnit.SECONDS); } }) .subscribe(new Consumer<String>() { // 下游。接收最终数据 @Override public void accept(@NonNull String t) throws Throwable { String n = Thread.currentThread().getName(); System.err.println(n + "====> " + t); } }); try { Thread.sleep(1000*15); } catch (InterruptedException e) { e.printStackTrace(); } // main------> 666666 // main------> 1 // main------> 333 // RxComputationThreadPool-2====> 1-11 // RxComputationThreadPool-2====> 1-22 // RxComputationThreadPool-2====> 1-33 // RxComputationThreadPool-1====> 666666-11 // RxComputationThreadPool-1====> 666666-22 // RxComputationThreadPool-1====> 666666-33 // RxComputationThreadPool-3====> 333-11 // RxComputationThreadPool-3====> 333-22 // RxComputationThreadPool-3====> 333-33 } private static void map() { Observable.just("666666", "1","333") // 上游。接收原始数据 // 中游。数据拦截处理 // map转换。just接收几个参数,map就执行几次 .map(new Function<String, Integer>() { // 第一个泛型是接收的数据类型,第二个是目标数据类型 @Override public @NonNull Integer apply(@NonNull String t) throws Throwable { String n = Thread.currentThread().getName(); System.err.println(n + "----> " + t); return t.length(); } }) // .map() // 继续处理 .subscribe(new Consumer<Integer>() { // 下游。接收最终数据 @Override public void accept(@NonNull Integer t) throws Throwable { String n = Thread.currentThread().getName(); System.err.println(n + "====> " + t); } }); // main----> 666666 // main====> 6 // main----> 1 // main====> 1 // main----> 333 // main====> 3 } } |
- end
声明
本文由崔维友 威格灵 cuiweiyou vigiles cuiweiyou 原创,转载请注明出处:http://www.gaohaiyan.com/3991.html
承接App定制、企业web站点、办公系统软件 设计开发,外包项目,毕设