2016-10-02 46 views
3

我正在构建一个从后端读取数据的网站。该数据即时计算并以缓冲方式发送回客户端。即一旦第一个块被计算出来,它被发送到客户端,然后计算下一个块并将其发送到客户端。整个过程发生在同一个HTTP请求中。客户端不应该等待完成的响应完成,而应该尽快处理每个块。通常可以使用XHR进度处理程序消耗这些响应(例如How to get progress from XMLHttpRequest)。使用Angular2/RxJS读取缓存的响应

如何使用RxJS和Observables在Angular2中使用HttpModule来响应此类响应?


编辑:peeskillet下面给出一个很好的和详细的解答。另外,我做了一些进一步的挖掘,发现了feature request for the HttpModule of AngularStackOverflow question with another approach on how to solve it

回答

4

注意:以下答案只是一个POC。它旨在教育Http的体系结构,并提供简单的工作POC实现。一个人应该看看XHRConnection的来源,看看在实现这个时你应该考虑什么。

当试图实现这一点时,我没有看到任何直接进入XHR的方法。看起来也许我们需要提供一些与使用Http有关的组件的自定义实现。我们应该考虑的三个主要组成部分是

  • Connection
  • ConnectionBackend
  • Http

Http需要ConnectionBackend作为参数传递给它的构造。当发出请求时,使用getHttpConnectionBackend.createConnection创建连接,并返回ObservableConnection(即从createConnection返回)的属性。在最精简(简化)视图,它看起来像这样

class XHRConnection implements Connection { 
    response: Observable<Response>; 
    constructor(request, browserXhr) { 
    this.response = new Observable((observer: Observer<Response>) => { 
     let xhr = browserXhr.create(); 
     let onLoad = (..) => { 
     observer.next(new Response(...)); 
     }; 
     xhr.addEventListener('load', onLoad); 
    }) 
    } 
} 

class XHRBackend implements ConnectionBackend { 
    constructor(private browserXhr) {} 
    createConnection(request): XHRConnection { 
    return new XHRConnection(request, this.broswerXhr).response; 
    } 
} 

class Http { 
    constructor(private backend: ConnectionBackend) {} 

    get(url, options): Observable<Response> { 
    return this.backend.createConnection(createRequest(url, options)).response; 
    } 
} 

因此,了解这个架构,我们可以尝试实行类似的东西。

对于Connection,这里是POC。为简洁起见,省略了进口货物,但大部分货物可以从@angular/http导入,Observable/Observer可以从rxjs/{Type}导入。

export class Chunk { 
    data: string; 
} 

export class ChunkedXHRConnection implements Connection { 
    request: Request; 
    response: Observable<Response>; 
    readyState: ReadyState; 

    chunks: Observable<Chunk>; 

    constructor(req: Request, browserXHR: BrowserXhr, baseResponseOptions?: ResponseOptions) { 
    this.request = req; 
    this.chunks = new Observable<Chunk>((chunkObserver: Observer<Chunk>) => { 
     let _xhr: XMLHttpRequest = browserXHR.build(); 
     let previousLen = 0; 
     let onProgress = (progress: ProgressEvent) => { 
     let text = _xhr.responseText; 
     text = text.substring(previousLen); 
     chunkObserver.next({ data: text }); 
     previousLen += text.length; 

     console.log(`chunk data: ${text}`); 
     }; 
     _xhr.addEventListener('progress', onProgress); 
     _xhr.open(RequestMethod[req.method].toUpperCase(), req.url); 
     _xhr.send(this.request.getBody()); 
     return() => { 
     _xhr.removeEventListener('progress', onProgress); 
     _xhr.abort(); 
     }; 
    }); 
    } 
} 

这里我们只是订阅XHR progress事件。由于XHR.responseText会将所有连接文字分开,因此我们只需要substring即可获得块,并通过Observer发出每个卡盘。

对于XHRBackend,我们有以下(没有什么特别的)。再次,一切都可以从@angular/http导入;

@Injectable() 
export class ChunkedXHRBackend implements ConnectionBackend { 
    constructor(
     private _browserXHR: BrowserXhr, private _baseResponseOptions: ResponseOptions, 
     private _xsrfStrategy: XSRFStrategy) {} 

    createConnection(request: Request): ChunkedXHRConnection { 
    this._xsrfStrategy.configureRequest(request); 
    return new ChunkedXHRConnection(request, this._browserXHR, this._baseResponseOptions); 
    } 
} 

对于Http,我们将其扩展,加入getChunks方法。如果你愿意,你可以添加更多的方法。

@Injectable() 
export class ChunkedHttp extends Http { 
    constructor(protected backend: ChunkedXHRBackend, protected defaultOptions: RequestOptions) { 
    super(backend, defaultOptions); 
    } 

    getChunks(url, options?: RequestOptionsArgs): Observable<Chunk> { 
    return this.backend.createConnection(
     new Request(mergeOptions(this.defaultOptions, options, RequestMethod.Get, url))).chunks; 
    } 
} 

mergeOptions方法可以在Http source找到。

现在我们可以为它创建一个模块。用户应该直接使用ChunkedHttp而不是Http。但是因为不要试图覆盖Http令牌,所以如果需要,仍然可以使用Http

@NgModule({ 
    imports: [ HttpModule ], 
    providers: [ 
    { 
     provide: ChunkedHttp, 
     useFactory: (backend: ChunkedXHRBackend, options: RequestOptions) => { 
     return new ChunkedHttp(backend, options); 
     }, 
     deps: [ ChunkedXHRBackend, RequestOptions ] 
    }, 
    ChunkedXHRBackend 
    ] 
}) 
export class ChunkedHttpModule { 
} 

我们进口的HttpModule因为它提供了我们需要注入其他服务,但我们不希望有重新实现这些,如果我们不需要。

要测试只需将ChunkedHttpModule导入AppModule。此外,测试我用下面的成分

@Component({ 
    selector: 'app', 
    encapsulation: ViewEncapsulation.None, 
    template: ` 
    <button (click)="onClick()">Click Me!</button> 
    <h4 *ngFor="let chunk of chunks">{{ chunk }}</h4> 
    `, 
    styleUrls: ['./app.style.css'] 
}) 
export class App { 
    chunks: string[] = []; 

    constructor(private http: ChunkedHttp) {} 

    onClick() { 
    this.http.getChunks('http://localhost:8080/api/resource') 
     .subscribe(chunk => this.chunks.push(chunk.data)); 
    } 
} 

我有一个后端端点设置它只是吐出每半秒"Message #x" 10块。这是结果

enter image description here

似乎是一个错误的地方。只有九个 :-)。我认为这是服务器端相关的。

+0

谢谢你,优秀的答案:)如果你有兴趣,你也可以看看我添加到问题的链接,因为我做了一些进一步的挖掘。 – str

+0

你介意把它作为一个可重用的模块吗?它可能是一个必须具备的重型数据角应用程序。 – CptEric