通常都是使用Observable创建事件,它发送数据好比是印度的火车站,车门一开一群人往上冲,车厢塞多少算多少。Flowable则好比非印度的火车站,车门口要排队,按座位量往里放行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.BackpressureStrategy; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.FlowableEmitter; import io.reactivex.rxjava3.core.FlowableOnSubscribe; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.internal.schedulers.ComputationScheduler; import io.reactivex.rxjava3.internal.schedulers.IoScheduler; import io.reactivex.rxjava3.schedulers.Schedulers; public class Main7 { static Subscription subscription; static long step_count = 50; static long step = 0; public static void main(String[] args) { Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(@NonNull FlowableEmitter<@NonNull String> emitter) throws Throwable { long last = emitter.requested(); // 下游的Subscription.request(N)决定 System.err.println("下游可接收数据量:" + last); for (int i = 0; i< 120; i++) { // 上游发射数据 emitter.onNext("ele_"+i); last = emitter.requested(); System.err.println("----下游可接收数据量:" + last); } emitter.onComplete(); } }, // BackpressureStrategy.BUFFER // 上游发送大量数据,下游来不及处理的(超出request(N))都放到缓存,直到java.lang.OutOfMemoryError: Java heap space或正常onComplete // BackpressureStrategy.DROP // 上游发送大量数据,下游来不及处理的都放到缓存,但缓存达到某值时,后来的数据抛弃,抛弃操作完毕后onComplete // BackpressureStrategy.ERROR // 上游发送大量数据,下游接收超出request(N)时报错 MissingBackpressureException: create: could not emit value due to lack of requests,同时阻塞抛弃 BackpressureStrategy.LATEST // 上游发送大量数据,下游来不及处理的,只保留128个?,后来的数据覆盖前面未处理的 // BackpressureStrategy.MISSING // // https://www.likecs.com/show-204871319.html ) .subscribe(new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { // 仅程序初始运行执行 subscription = s; s.request(step_count); // 下游只接收前10个数据-onNext,上游发射的后续数据根据策略缓存 System.err.println("onSubscribe"); } @Override public void onNext(String t) { System.err.println("onNext:"+t); step++; if(step>=step_count){ step = 0; System.err.println("下一波"); subscription.request(step_count); } } @Override public void onError(Throwable e) { System.err.println("onError:"+e); } @Override public void onComplete() { System.err.println("onComplete"); } }); /* // Flowable.fromArray("item", "tiem2") Flowable.just("item", "tiem2") .onBackpressureBuffer() // 根据需要设定模式。可不设置。对应BackpressureStrategy.BUFFER // .onBackpressureDrop() // 对应BackpressureStrategy.DROP // .onBackpressureLatest() // 对应BackpressureStrategy.LATEST .map(new Function<String, String>() { @Override public @NonNull String apply(@NonNull String t) throws Throwable { String tn = Thread.currentThread().getName(); System.err.println("线程:"+tn); Thread.sleep(1000); return t; } }) .subscribeOn(new IoScheduler()) // Schedulers.io() .observeOn(new ComputationScheduler()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String t) throws Throwable { String n = Thread.currentThread().getName(); // 主线程 System.err.println(n + "+++=> " + t); } }); */ } } |
- end
声明
本文由崔维友 威格灵 cuiweiyou vigiles cuiweiyou 原创,转载请注明出处:http://www.gaohaiyan.com/4000.html
承接App定制、企业web站点、办公系统软件 设计开发,外包项目,毕设