二 RxJava基本元素

作者: wxyass 分类: RxJava 发布时间: 2018-01-20 23:03

Rxjava1基本元素

1 Observable
2 Observer 接口
3 Subscription 接口
4 OnSubscribe 接口 实现了 Action1
5 Subscriber 抽象类,实现了Subscription, Observer

Observable
1 被观察者–观察得到的
2 通过Observable创建一个可观察的序列(create方法)
3 通过subscribe去注册一个观察者

Observer
1 观察者–用于接收数据
2 作为Observable的subscribe方法的参数

Subscription
1 订阅,用于描述被观察者和观察者之间的关系
2 用于取消订阅和获取当前的订阅状态

OnSubscribe
1 当订阅时会触发此接口调用
2 在Observable内部,实际作用是向订阅者发射数据

Subscriber
1 实现了Observer和Subscription
2 只有自己才能阻止自己

现实中的案例 打电话
Observable : 打电话的人
Observer : 接电话的人,用于接收数据
Subscription : 用于接电话的人随时挂断电话
OnSubscribe : 通线电路,向接电话的人发送数据
Subscriber : Observer的实现类, 通过subscribe了Subscriber—打电话的人给接电话的人打了电话

RxJava1图解

RxJava1代码举例

findViewById(R.id.btnRxJava1).setOnClickListener(new View.OnClickListener() {
    @Override
    public void onClick(View view) {

        Subscription tSubscription = Observable.
            create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onNext("test");
                        subscriber.onCompleted();
                    }
                }
            }).
            subscribe(new Observer<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted");
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(String s) {
                    System.out.println("onNext:" + s);
                }
            });
    }
});

Rxjava2基本元素

1 Observable和Flowable
2 Observer和Subscriber
3 Disposable和Subscription 接口
4 相应的OnSubscribe
5 Emitter 接口

背压概念
1 异步环境下产生的问题
2 发送和处理速度不同意
3 是一种流速控制解决策略

Observable
1 被观察者,不支持背压
2 通过Observable创建一个可观察的序列(creat方法)
3 通过subscribe去注册一个观察者

Observer
1 观察者
2 作为Observable的subscribe方法的参数

Disposable
1 和RxJava1的Subscription的作用相当
2 用于取消订阅和获取当前的订阅状态

ObservableOnSubscribe
1 和RxJava1的OnSubscribe的作用相当
2 当订阅时会触发此接口调用
3 在Observable内部,实际作用是想观察者发射数据

Emitter
1 一个发射数据的接口,和Observer的方法类似
2 本质是对Observer和Subscriber的包装

RxJava2图解

RxJava2代码举例

findViewById(R.id.btnRxJava2).setOnClickListener(new View.OnClickListener() {
    @Override
    public void onClick(View view) {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("1");
                    e.onNext("2");
                    e.onNext("3");
                    e.onNext("4");
                    e.onNext("5");
                    e.onNext("6");
                    e.onNext("7");
                    e.onNext("8");
                    e.onNext("9");
                    e.onNext("10");
                    e.onComplete();
                }
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(String value) {
                System.out.println("onNext:" + value);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                System.out.println("onCompleted");
            }
        });

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> e) throws Exception {
                if (!e.isCancelled()) {
                    e.onNext("test");
                    e.onComplete();
                }
            }
        }, BackpressureStrategy.DROP).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE);
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext:" + s);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {
                System.out.println("onCompleted");
            }
        });
    }
});

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注