1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
import java.util.concurrent.TimeUnit; import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableEmitter; import io.reactivex.rxjava3.core.ObservableOnSubscribe; import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.functions.Predicate; public class Main { // java项目 public static void main(String[] args) { Observable.just("item", "tiem2") .filter(new Predicate<String>() { // 这里的泛型,即just的参数数据类型 // 此test方法依次处理just接收的参数 @Override public boolean test(@NonNull String t) throws Throwable { // 这里的参数即just传入的数据 String n = Thread.currentThread().getName(); // 主线程 System.err.println(n + "+++> " + t); if (t.length() > 4) return true; return false; // true表示放行。false表示数据拦截 } }) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String t) throws Throwable { String n = Thread.currentThread().getName(); // 主线程 System.err.println(n + "+++=> " + t); } }); // main+++> item // main+++> tiem2 // main+++=> tiem2 // interval启动子线程,须要主线程阻塞看效果 Observable.interval(1, TimeUnit.SECONDS) .take(2) // 执行次数 .subscribe(new Consumer<Long>() { @Override public void accept(@NonNull Long t) throws Throwable { String n = Thread.currentThread().getName(); // 子线程 System.err.println(n + " =-=-> " + t); } }); // 配合查看interval效果。如果是Android项目,则无需阻塞 try { String n = Thread.currentThread().getName(); System.err.println(n); Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } // main // RxComputationThreadPool-1 =-=-> 0 // RxComputationThreadPool-1 =-=-> 1 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<@NonNull Integer> emitter) throws Throwable { String n = Thread.currentThread().getName(); // 主线程 emitter.onNext(1); System.err.println(n + "------> onNext1"); emitter.onNext(2); System.err.println(n + "------> onNext2"); emitter.onNext(3); System.err.println(n + "------> onNext3"); emitter.onNext(3); // 又发送‘3’,重复的 System.err.println(n + "------> onNext3"); emitter.onComplete(); System.err.println(n + "------> onComplete"); } }) .distinct() // 过滤重复的 .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer t) throws Throwable { String n = Thread.currentThread().getName(); // 主线程 System.err.println(n + ">>------> " + t); } }); // main>>------> 1 // main------> onNext1 // main>>------> 2 // main------> onNext2 // main>>------> 3 // main------> onNext3 // main------> onNext3 // main------> onComplete Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<@NonNull String> emitter) throws Throwable { String n = Thread.currentThread().getName(); // 主线程 emitter.onNext("坐标0"); System.err.println(n + ">>>>> onNext 0"); emitter.onNext("坐标1"); System.err.println(n + ">>>>> onNext 1"); emitter.onNext("坐标2"); System.err.println(n + ">>>>> onNext 2"); emitter.onComplete(); System.err.println(n + ">>>>> onComplete"); } }) // .elementAt(1) // 只放行这个坐标的数据。如果不存在这个坐标,则输出默认值 .elementAt(3, "默认值") .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String t) throws Throwable { String n = Thread.currentThread().getName(); // 主线程 System.err.println(n + ">>>>==>" + t); } }); // main>>>>> onNext 0 // main>>>>> onNext 1 // main>>>>> onNext 2 // main>>>>==>默认值 // main>>>>> onComplete } } |
- end
声明
本文由崔维友 威格灵 cuiweiyou vigiles cuiweiyou 原创,转载请注明出处:http://www.gaohaiyan.com/3980.html
承接App定制、企业web站点、办公系统软件 设计开发,外包项目,毕设