2016-02-26 51 views
0

下面是我的确切代码。注意foo(o)在订阅lambda函数内被调用。这给Android提供了严格的模式错误。现在我当然可以在这里分解一个线程,但这是Rxjava做事的方式?我现在不得不再次考虑线程吗?想在这里知道我最好的选择。我怎么能这样做RxJava的方式,使其他部分的代码可以订阅新的事件,这将返回网络呼叫响应?所以基本上,我通过异步响应将一种类型的事件MyObj转换为NetObj响应事件。RxJava - 如何启动订阅的lambda函数内的异步网络调用? Observable

public static final PublishSubject<MyObj> myObjSubject = PublishSubject.create(); 
    public static final Observable<MyObj> observable = myObjSubject.asObservable(); 


protected CompositeSubscription myCompositeSubs = new CompositeSubscription(); 

myCompositeSubs.add(
      observable.subscribe((MyObj o) -> { 
       // `. Update UI with MyObject and 
       // 2. Kick off network call which will return JsonResponse type object which I would like to possibly process here and also publish to let others respond to it. 
       foo(o); // $$$$$ million dollar question what if I am kicking off something long running here? 
      })); 


private void foo(MyObj o){ 

// long running request. How to kick off long running from here? 
    JsonRespons resp = RestAdapter.makeQuest(o.url); 
    // I want this JsonResponse object to be an even others can listen for. I don't want to just process it right here. 

} 

更新:只是为了阐明我正在尝试做的是侦听类(Android Fragment)中的数据事件MyObject。一旦我得到了这个,我想做两件事情,用MyObject同步更新UI,这是我想要消费的东西,而不仅仅是转换。但我也想开始一个长期的请求。一个返回JsonObject响应的网络调用。这可能会在它返回时处理相同的片段,或者我可能希望其他类能够监听此事件。所以我并没有单独观察MyObject来进行网络调用并获得响应。我需要做两个操作,一个同步另一个异步。

+0

看起来像这种类型的问题可能已经在这里讨论:https://github.com/ReactiveX/RxJava/issues/1574 – FunctionallyReactive

回答

3

这是使用RxJava运算符(如flatMap)的绝佳机会。如果您需要更多文档,请参阅here,但flatMap允许您带一种类型的Observable(您的Observable<MyObj>)并返回另一种类型的Observable(在这种情况下,它可以是Observable<Void>Observable<Boolean>,如果您想知道您的长时间运行请求是否完成)。

看看这个代码:

Subscription sub = observable.flatMap((MyObj o) -> { 
     // this will be run on your subscribeOn thread -- in background 
     return Observable.just(foo(o)); 
    }).subscribe(aVoid -> { 
     // we have completed both operations here, emissions will be 
     // on our observeOn() thread 
    }); 

如果您foo(MyObj)功能现在结构为:

private Void foo(MyObj o) { 
    // long running stuff 
    return null; 
} 

它使返回类型Void,而不是void,使我们能够很重要使用flatMap运算符并获取返回的Observable<Void>

您所有的长期运行foo()操作将在您的subscribeOn()线程中运行,与您最初的观察到一起,并在操作完成后,它会发出Void项目到您subscribe()observeOn()线程(这可能是你的主要如果你想通知UI,则在Android中进行线程)。

更新,以允许在flatMap调用UI工作:

注:这是假定第一observable定义subscribeOn()在后台线程和observeOn()AndroidSchedulers.mainThread()

Subscription sub = observable.flatMap((MyObj o) -> { 
     // << do stuff with MyObj to UI HERE >> 
     return Observable.defer(() -> Observable.just(foo(o))) 
      .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); 
    }).subscribe(aVoid -> { 
     // foo() has completed, we're back on the UI thread 
    }); 

如果你想用JsonResponse做一些事情,你可以改变foo()函数返回该函数而不是Void,那么你就可以使用的结果在UI线程上。

更新2:更改Observable.create()Observable.defer()使其更清洁。

+0

只是为了澄清我在一个类(Android Fragment),我想以这种方式处理此问题,以便我可以在接收MyObject时采取一些同步操作,但也希望进行异步调用,并在完成时或出错时执行其他操作。 – FunctionallyReactive

+0

我有单身人士与各种观察员和主题,但让我们看看是否我返回在这一类内工作的Objservable.just(foo),但如果我想让响应在片段外部可见,该怎么办?例如,我想要映射到JSonResponse。所以外面的课我希望他们能够消费JSonResponse听。外来者听众如何利用JSonResponse对象? – FunctionallyReactive

+0

所以我希望Fragment能够对MyObj采取一些操作,并启动这个异步操作,它将返回一个JsonResponse。它是我想要能够倾听或观察的那种反应。我可能想要在将它踢出或者其他地方的片段中处理它。所以我说的是单个类将使用MyObj类来更新UI,并且它将启动异步。一旦返回,我希望它可以发布,或者有一种方法来监听流的JsonResponse类型。 – FunctionallyReactive