2016-07-07 76 views
3

我试图通过rxJava向设备发送命令列表。这是我的代码:使用RxAndroidBle发送命令列表(rxJava)

public void startWriteCommucation(final ArrayList<byte[]> b) { 
    if (isConnected()){ 
      connectionObservable 
        .flatMap(new Func1<RxBleConnection, Observable<Observable<byte[]>>>() { 
         @Override 
         public Observable<Observable<byte[]>> call(final RxBleConnection rxBleConnection) { 
          final List<Observable<byte[]>> list = new ArrayList<>(); 
          for (byte[] bytes: b){ 
           Log.e("Observer", Arrays.toString(bytes)); 
           list.add(rxBleConnection 
             .writeCharacteristic(BleDevice.characteristicWrite, bytes)); 
          } 
          return Observable.from(list); 
         } 
        }) 
        .concatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() { 
         @Override 
         public Observable<byte[]> call(Observable<byte[]> observable) { 
          return observable; 
         } 
        }) 
        .observeOn(AndroidSchedulers.mainThread()) 
        .subscribe(new Action1<byte[]>() { 
         @Override 
         public void call(byte[] bytes) { 
          view.setTextStatus("Write success"); 
          Log.e("Subscriber", Arrays.toString(bytes)); 
         } 
        }); 
     } 
} 

它的工作原理,然后我点击按钮一次。例如,我的方法clikc:

public void onClick(){ 
     ArrayList<byte[]> listCmd = new ArrayList<>(); 
     listCmd.add(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); 
     listCmd.add(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}); 
     startWriteCommucation(listCmd); 
} 

而且myLogs在logcat中:

E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 
E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 

E/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 
E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 

但是当我使用快速双击按钮时发生问题。然后,第一次点击observable仍然有效,我再次点击以再次调用startWriteCommunication方法。在此之后,我的日志看起来如此:

E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 
E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 
E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 
E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 

E/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 
E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 
E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 
E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 

主要问题,他们不顺序,我的设备工作不正确。你能帮忙找到一个问题吗?

回答

2

问题是一个RxAndroidBle库错误(它使得响应不匹配请求)并且在有状态的两个通信流之间共享一个连接(要求在没有任何通信的情况下进行两次写入)。

错误:应该写入BluetoothGattCharacteristic的值(byte [])设置得太早。如果有两个并行写入器用于相同的特性 - 其中一个可能会覆盖另一个由于竞争条件而设置的字节[]。我已经修复了目前正在进行代码审查的库,并且应该在不久的将来应用到SNAPSHOT版本。

随着变化的输出将是这样的:

D/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 
D/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 
D/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 
D/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 

D/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 
D/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 
D/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 
D/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 

可能的解决方法

如果你不感兴趣的射击流的两倍,如果用户点击按钮快速两次 - 你可以创建一个可共享的流程:

Observable<byte[]> theSharedFlow = rxBleConnection 
    .writeCharacteristic(uuid, data1) 
    .flatMap(writtenBytes -> rxBleConnection.writeCharacteristic(uuid, data2)) 
    .share() 

当订阅多次只会执行一次,直到它将完成。在上面的代码片段中,第二个writeCharacteristic()将在第一个发出写入字节之后被订阅(并且排队等待通信)。

如果应用程序的目的是在共享连接的任意时间发送任意命令集,则应用程序确定前一集已完成。

我希望我已经回答了你的问题。如果您想提供更多关于用例的信息,我会尽力改进我的答案。

问候

编辑:

替代解决方案:

要让被保存下来,所有的观测量需要,以便他们应该到达订阅的顺序。 Observable的合同是Observable(如果它很冷)在订阅之前不会执行。并且当使用flatMap()时,第二个Observable在第一个已经发射时被订阅。

中都有顺序写入传输它们以相同的顺序进行认购,因此流动可能看起来像这样:

connectionObservable 
      .flatMap(rxBleConnection -> { 
       Observable<byte[]> mergedObservable = null; 
       for (byte[] bytes : b) { 
        Log.d("Observer", Arrays.toString(bytes)); 
        final Observable<byte[]> writeObservable = rxBleConnection 
          .writeCharacteristic(uuid, bytes); 

        if (mergedObservable == null) { 
         mergedObservable = writeObservable; 
        } else { 
         // merging two Observables to be subscribed at the same time when subscribed 
         mergedObservable = mergedObservable.mergeWith(writeObservable); 
        } 
       } 
       return mergedObservable; 
      }) 
      // removed .concatMap() 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(
        bytes -> Log.d("Subscriber", Arrays.toString(bytes)), 
        throwable -> Log.e("Subscriber", "error", throwable) 
      ); 

RxJava具有明显的更多的方式来达到同样的行为,但它不是这个问题的一部分。

+0

谢谢,我会明天再试 – KolinLoures

+0

但是我想用户的输出和我编辑回复的观察者 – KolinLoures

+0

的顺序是一样的。替代解决方案修复了行为,尽管在我看来,这只是一个错误流的表现(两个并行执行的状态流)。 –