2016-07-29 75 views
0

我在学习RxJava,并且正在测试一个场景,我从一个数据库读取数据并将其发布到一个队列中。我只是做了整个过程的样本模拟,但我似乎没有找到Observable正如我想要的那样工作。异步。Observable不是异步的

这是我的代码:

package rxJava; 

import java.util.ArrayList; 
import java.util.List; 

import rx.Observable; 
import rx.Observer; 
import rx.functions.Action1; 

public class TestClass { 

    public static void main(String[] args) { 

     TestClass test = new TestClass(); 
     System.out.println("---START---"); 

     test.getFromDB().subscribe(new Observer<String>() { 

      @Override 
      public void onCompleted() { 
       System.out.println("Publish complete."); 
      } 

      @Override 
      public void onError(Throwable t) { 
       System.out.println(t.getMessage()); 
      } 

      @Override 
      public void onNext(String s) { 
       test.publishToQueue(s).subscribe(new Observer<Boolean>() { 

        @Override 
        public void onNext(Boolean b) { 
         if (b) { 
          System.out.println("Successfully published."); 
         } 
        } 

        @Override 
        public void onCompleted() { 
        } 

        @Override 
        public void onError(Throwable arg0) { 
        } 
       }); 
      }; 
     }); 
     System.out.println("---END---"); 
    } 

    public Observable<String> getFromDB() { 

     List<String> list = new ArrayList<String>(); 
     for (int i = 0; i < 30; i++) { 
      list.add(Integer.toString(i)); 
     } 
     return Observable.from(list).doOnNext(new Action1<String>() { 
      @Override 
      public void call(String temp) { 
       if (temp.contains("2")) { 
        try { 
         Thread.sleep(200); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
        } 
       } 
      } 
     }); 

    } 

    public Observable<Boolean> publishToQueue(String s) { 

     return Observable.defer(() -> { 
      try { 
       if (s.contains("7")) { 
        Thread.sleep(700); 
       } 
       System.out.println("Published:: " + s); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      return Observable.just(true); 
     }); 
    } 
} 

假设我从DB异步获取列表,并希望将其发布到队列,。我已经使用ObservablegetFromDB返回并已订阅它模仿我从数据库中获得的数据。每次我从DB获取数据时,我都想使用publishToQueue将它推送到队列中,这也会返回Observable。我想让这个队列调用也是异步的。现在,我正在返回(Observable<Boolean>)队列中的积极确认,如Boolean,我想打印一些内容。

所以基本上我只是希望这两个进程是异步的。对于来自数据库的每个数据,我将它异步推送到队列。

我在方法db调用和队列中都添加了Thread.sleep(),以模仿延迟并测试异步操作。我认为这是造成问题的原因。但我也试过Obseravable.delay(),但这甚至不会产生任何输出。

请帮我理解这是如何工作的,以及如何让它按照我想要的工作。

回答

1

默认情况下,RxJava是同步。这意味着默认情况下,所有内容都将在同一个线程(和当前线程)中执行。您可以在另一个线程感谢observeOn/subscribeOn方法执行任务,或使用一些运营商,在另一个作业执行任务(因为它使用其他调度,像delayinterval ...)

在你的榜样,你有以明确设置预订将在哪个调度程序中执行。 (这里,在哪个线程Observable.from会散发出你的列表)

test.getFromDb() 
    .subscribeOn(Schedulers.io()) 
    .subscribe(); 

然后你可以使用flatMap运营商,并要求您publishToQueue方法。此方法将在前一个调度程序中执行,但您可以强制它使用另一个调度程序,这要归功于observeOn方法。 observeOn方法之后的所有内容都将在另一个线程中执行。

test.fromDb() 
    .subscribeOn(Schedulers.io()) 
    .observeOn(Schedulers.computation()) 
    .flatMap(l -> test.publishToqueue(l)) 
    .subscribe();