我在学习RxJava
,并且正在测试一个场景,我从一个数据库读取数据并将其发布到一个队列中。我只是做了整个过程的样本模拟,但我似乎没有找到Observable
正如我想要的那样工作。异步。Observable不是异步的
这是我的代码:
package rxJava;
import java.util.ArrayList;
import java.util.List;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
public class TestClass {
public static void main(String[] args) {
TestClass test = new TestClass();
System.out.println("---START---");
test.getFromDB().subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Publish complete.");
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onNext(String s) {
test.publishToQueue(s).subscribe(new Observer<Boolean>() {
@Override
public void onNext(Boolean b) {
if (b) {
System.out.println("Successfully published.");
}
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable arg0) {
}
});
};
});
System.out.println("---END---");
}
public Observable<String> getFromDB() {
List<String> list = new ArrayList<String>();
for (int i = 0; i < 30; i++) {
list.add(Integer.toString(i));
}
return Observable.from(list).doOnNext(new Action1<String>() {
@Override
public void call(String temp) {
if (temp.contains("2")) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
public Observable<Boolean> publishToQueue(String s) {
return Observable.defer(() -> {
try {
if (s.contains("7")) {
Thread.sleep(700);
}
System.out.println("Published:: " + s);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Observable.just(true);
});
}
}
假设我从DB异步获取列表,并希望将其发布到队列,。我已经使用Observable
从getFromDB
返回并已订阅它模仿我从数据库中获得的数据。每次我从DB获取数据时,我都想使用publishToQueue
将它推送到队列中,这也会返回Observable
。我想让这个队列调用也是异步的。现在,我正在返回(Observable<Boolean>
)队列中的积极确认,如Boolean
,我想打印一些内容。
所以基本上我只是希望这两个进程是异步的。对于来自数据库的每个数据,我将它异步推送到队列。
我在方法db调用和队列中都添加了Thread.sleep()
,以模仿延迟并测试异步操作。我认为这是造成问题的原因。但我也试过Obseravable.delay()
,但这甚至不会产生任何输出。
请帮我理解这是如何工作的,以及如何让它按照我想要的工作。
你能解释一下吗? – v1shnu