二 RxJava基本元素
Rxjava1基本元素
1 Observable
2 Observer 接口
3 Subscription 接口
4 OnSubscribe 接口 实现了 Action1
5 Subscriber 抽象类,实现了Subscription, Observer
Observable
1 被观察者–观察得到的
2 通过Observable创建一个可观察的序列(create方法)
3 通过subscribe去注册一个观察者
Observer
1 观察者–用于接收数据
2 作为Observable的subscribe方法的参数
Subscription
1 订阅,用于描述被观察者和观察者之间的关系
2 用于取消订阅和获取当前的订阅状态
OnSubscribe
1 当订阅时会触发此接口调用
2 在Observable内部,实际作用是向订阅者发射数据
Subscriber
1 实现了Observer和Subscription
2 只有自己才能阻止自己
现实中的案例 打电话
Observable : 打电话的人
Observer : 接电话的人,用于接收数据
Subscription : 用于接电话的人随时挂断电话
OnSubscribe : 通线电路,向接电话的人发送数据
Subscriber : Observer的实现类, 通过subscribe了Subscriber—打电话的人给接电话的人打了电话
RxJava1图解
RxJava1代码举例
findViewById(R.id.btnRxJava1).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
Subscription tSubscription = Observable.
create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("test");
subscriber.onCompleted();
}
}
}).
subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("onNext:" + s);
}
});
}
});
Rxjava2基本元素
1 Observable和Flowable
2 Observer和Subscriber
3 Disposable和Subscription 接口
4 相应的OnSubscribe
5 Emitter 接口
背压概念
1 异步环境下产生的问题
2 发送和处理速度不同意
3 是一种流速控制解决策略
Observable
1 被观察者,不支持背压
2 通过Observable创建一个可观察的序列(creat方法)
3 通过subscribe去注册一个观察者
Observer
1 观察者
2 作为Observable的subscribe方法的参数
Disposable
1 和RxJava1的Subscription的作用相当
2 用于取消订阅和获取当前的订阅状态
ObservableOnSubscribe
1 和RxJava1的OnSubscribe的作用相当
2 当订阅时会触发此接口调用
3 在Observable内部,实际作用是想观察者发射数据
Emitter
1 一个发射数据的接口,和Observer的方法类似
2 本质是对Observer和Subscriber的包装
RxJava2图解
RxJava2代码举例
findViewById(R.id.btnRxJava2).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
if (!e.isDisposed()) {
e.onNext("1");
e.onNext("2");
e.onNext("3");
e.onNext("4");
e.onNext("5");
e.onNext("6");
e.onNext("7");
e.onNext("8");
e.onNext("9");
e.onNext("10");
e.onComplete();
}
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String value) {
System.out.println("onNext:" + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("onCompleted");
}
});
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
if (!e.isCancelled()) {
e.onNext("test");
e.onComplete();
}
}
}, BackpressureStrategy.DROP).subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
System.out.println("onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println("onNext:" + s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
System.out.println("onCompleted");
}
});
}
});