2017-12-03 132 views
0

我有一个角度为5的应用程序,rxjs WebsocketSubject发送jsonrpc消息。rxjs首先完成整个流式链接

这是我sendRequest将功能

sendRequest(request: Request): Promise<Response> { 

    console.log(request); 

    this.socket.next(JSON.stringify(request)); 

    return this.onResponse().filter((response: Response) => { 
    return response.id === request.id; 
    }).first().toPromise().then((response) => { 

    console.log(response); 

    if (response.error) { 
     console.log('error'); 
     throw new RpcError(response.error); 
    } 

    return response; 

    }); 

} 

我使用的第一个()运算符来完成这个过滤规则订阅。但是,onResponse()直接来自我的WebSocketSubject,然后完成。

是否有解除原始主题的方法?

或者我应该创建一个新的Observale.create(...)?

写入的.filter函数会发生什么情况。它会持续到哪里?或者我必须在任何地方将其删除,以防止持续过滤呼叫?


编辑1

也是用这个没有帮助。

sendRequest(request: Request): Promise<Response> { 

    console.log(request); 

    this.socket.next(JSON.stringify(request)); 

    return new Promise<Response>((resolve, reject) => { 

    const responseSubscription = this.onResponse().filter((response: Response) => { 
     console.log('filter'); 
     return response.id === request.id; 
    }).subscribe((response: Response) => { 

     // responseSubscription.unsubscribe(); 
     resolve(response); 

    }); 

    }); 

} 

如果我执行取消订阅整个websocketSubject被关闭。不要这样做每个请求准时记录'过滤器'!


编辑2

这里是整个websocketService我写

import {Injectable} from "@angular/core"; 
import {WebSocketSubject, WebSocketSubjectConfig} from "rxjs/observable/dom/WebSocketSubject"; 
import {MessageFactory, Notification, Request, Response, RpcError} from "../misc/jsonrpc"; 
import {ReplaySubject} from "rxjs/ReplaySubject"; 
import {Observable} from "rxjs/Observable"; 
import 'rxjs/add/operator/toPromise'; 
import 'rxjs/add/operator/filter'; 
import 'rxjs/add/operator/first'; 
import 'rxjs/add/observable/from'; 

export enum ConnectionState { 
    CONNECTED = "Connected", 
    CONNECTING = "Connecting", 
    CLOSING = "Closing", 
    DISCONNECTED = "Disconnected" 
} 

@Injectable() 
export class WebsocketService { 

    private connectionState = new ReplaySubject<ConnectionState>(1); 
    private socket: WebSocketSubject<ArrayBuffer | Object>; 
    private config: WebSocketSubjectConfig; 

    constructor() { 

    console.log('ctor'); 

    const protocol = location.protocol === 'https' ? 'wss' : 'ws'; 
    const host = location.hostname; 
    const port = 3000; // location.port; 

    this.config = { 
     binaryType: "arraybuffer", 
     url: `${protocol}://${host}:${port}`, 
     openObserver: { 
     next:() => this.connectionState.next(ConnectionState.CONNECTED) 
     }, 
     closingObserver: { 
     next:() => this.connectionState.next(ConnectionState.CLOSING) 
     }, 
     closeObserver: { 
     next:() => this.connectionState.next(ConnectionState.DISCONNECTED) 
     }, 
     resultSelector: (e: MessageEvent) => { 

     try { 

      if (e.data instanceof ArrayBuffer) { 
      return e.data; 
      } else { 
      return JSON.parse(e.data); 
      } 

     } catch (e) { 

      console.error(e); 
      return null; 

     } 

     } 
    }; 

    this.connectionState.next(ConnectionState.CONNECTING); 
    this.socket = new WebSocketSubject(this.config); 

    this.connectionState.subscribe((state) => { 
     console.log(`WS state ${state}`); 
    }); 

    } 

    onBinaryData(): Observable<ArrayBuffer> { 
    return this.socket.filter((message: any) => { 
     return message instanceof ArrayBuffer; 
    }); 
    } 

    onMessageData(): Observable<Object> { 
    return this.socket.filter((message: any) => { 
     return !(message instanceof ArrayBuffer); 
    }); 
    } 

    onResponse(): Observable<Response> { 
    return this.onMessageData().filter((message) => { 
     return MessageFactory.from(message).isResponse(); 
    }).map((message): Response => { 
     return MessageFactory.from(message).toResponse(); 
    }); 
    } 

    sendRequest(request: Request): Promise<Response> { 

    console.log(request); 

    this.socket.next(JSON.stringify(request)); 

    return new Promise<Response>((resolve, reject) => { 

     const responseSubscription = this.onResponse().filter((response: Response) => { 
     console.log('filter'); 
     return response.id === request.id; 
     }).subscribe((response: Response) => { 

     responseSubscription.unsubscribe(); 
     resolve(response); 

     }); 

    }); 

    } 

    sendNotification(notification: Notification): void { 
    this.socket.next(JSON.stringify(notification)); 
    } 

} 

而结果在我的日志

Using Angular 5.0.2 
websocket.service.ts:27 ctor 
websocket.service.ts:69 WS state Connecting 
core.js:3565 Angular is running in the development mode. Call enableProdMode() to enable the production mode. 
websocket.service.ts:96 Request {jsonrpc: "2.0", id: "b042005c-5fbf-5ffc-fbd1-df68fae5882e", method: "appointment_list_get", params: undefined} 
websocket.service.ts:69 WS state Connected 
websocket.service.ts:103 filter 
websocket.service.ts:69 WS state Disconnected 

我需要找到一个方式decouplin g以某种方式从原始流中过滤掉我的过滤器。

+0

为什么你甚至需要使用'first()。toPromise()。然后'?只需订阅像this.onResponse()。filter(...)。subscribe(...);'? – martin

+0

是的,但在我的理解(也许我错了)我必须清理订阅,我期待一个单一的结论。所以使用Promise应该没问题。但是.toPromise只能在Observalbe完成时解决 – Pascal

+0

那么你现在的做法有什么问题? – martin

回答

1

这是行得通的。 关键是将消息处理与下层websocketSubject分离。

import {Injectable} from "@angular/core"; 
import {WebSocketSubject, WebSocketSubjectConfig} from "rxjs/observable/dom/WebSocketSubject"; 
import {MessageFactory, Notification, Request, Response, RpcError} from "../misc/jsonrpc"; 
import {ReplaySubject} from "rxjs/ReplaySubject"; 
import {Observable} from "rxjs/Observable"; 
import 'rxjs/add/operator/toPromise'; 
import 'rxjs/add/operator/filter'; 
import 'rxjs/add/operator/first'; 
import 'rxjs/add/observable/from'; 
import {Subject} from "rxjs/Subject"; 

export enum ConnectionState { 
    CONNECTED = "Connected", 
    CONNECTING = "Connecting", 
    CLOSING = "Closing", 
    DISCONNECTED = "Disconnected" 
} 

@Injectable() 
export class WebsocketService { 

    private connectionState = new ReplaySubject<ConnectionState>(1); 
    private socket: WebSocketSubject<ArrayBuffer | Object>; 
    private config: WebSocketSubjectConfig; 

    private messageObserver = new Subject<MessageFactory>(); 
    private binaryObserver = new Subject<ArrayBuffer>(); 

    constructor() { 

    const protocol = location.protocol === 'https' ? 'wss' : 'ws'; 
    const host = location.hostname; 
    const port = 3000; // location.port; 

    this.config = { 
     binaryType: "arraybuffer", 
     url: `${protocol}://${host}:${port}`, 
     openObserver: { 
     next:() => this.connectionState.next(ConnectionState.CONNECTED) 
     }, 
     closingObserver: { 
     next:() => this.connectionState.next(ConnectionState.CLOSING) 
     }, 
     closeObserver: { 
     next:() => this.connectionState.next(ConnectionState.DISCONNECTED) 
     }, 
     resultSelector: (e: MessageEvent) => { 

     try { 

      if (e.data instanceof ArrayBuffer) { 
      return e.data; 
      } else { 
      return JSON.parse(e.data); 
      } 

     } catch (e) { 

      console.error(e); 
      return null; 

     } 

     } 
    }; 

    this.connectionState.next(ConnectionState.CONNECTING); 
    this.socket = new WebSocketSubject(this.config); 

    this.socket.filter((message: any) => { 
     return message instanceof ArrayBuffer; 
    }).subscribe((message: ArrayBuffer) => { 
     this.binaryObserver.next(message); 
    }); 

    this.socket.filter((message: any) => { 
     return !(message instanceof ArrayBuffer); 
    }).subscribe((message: ArrayBuffer) => { 
     this.messageObserver.next(MessageFactory.from(message)); 
    }); 

    this.connectionState.subscribe((state) => { 
     console.log(`WS state ${state}`); 
    }); 

    } 

    onResponse(): Observable<Response> { 
    return this.messageObserver.filter((message: MessageFactory) => { 
     return message.isResponse(); 
    }).map((message: MessageFactory): Response => { 
     return message.toResponse(); 
    }); 
    } 

    sendRequest(request: Request): Promise<Response> { 

    console.log(request); 

    this.socket.next(JSON.stringify(request)); 

    return this.onResponse().filter((response: Response) => { 
     return request.id === response.id; 
    }).first().toPromise().then((response) => { 

     console.log(response); 

     if (response.error) { 
     console.log('error'); 
     throw new RpcError(response.error); 
     } 

     return response; 

    }); 

    } 

    sendNotification(notification: Notification): void { 
    this.socket.next(JSON.stringify(notification)); 
    } 

}