2017-08-16 72 views
0

我试图通过使用静态响应实现我自己的hyper::client::Connect来测试一些使用hyper::Client的代码。我已经找到了类型,但无法找出运行时问题,tokio-proto抱怨说request/response mismatch。下面是我的代码的简化版本,展示了失败:实现hyper :: client :: Connect进行测试

extern crate futures; 
extern crate hyper; 
extern crate tokio_core; 
extern crate tokio_io; 

use futures::{future, Future, Stream}; 
use std::str::from_utf8; 
use std::io::Cursor; 

struct Client<'a, C: 'a> { 
    client: &'a hyper::Client<C>, 
    url: &'a str, 
} 

impl<'a, C: hyper::client::Connect> Client<'a, C> { 
    fn get(&self) -> Box<Future<Item = String, Error = hyper::Error>> { 
     Box::new(self.client.get(self.url.parse().unwrap()).and_then(|res| { 
      let body = Vec::new(); 
      res.body() 
       .fold(body, |mut acc, chunk| { 
        acc.extend_from_slice(chunk.as_ref()); 
        Ok::<_, hyper::Error>(acc) 
       }) 
       .and_then(move |value| Ok(String::from(from_utf8(&value).unwrap()))) 
     })) 
    } 
} 

struct StaticConnector<'a> { 
    body: &'a [u8], 
} 

impl<'a> StaticConnector<'a> { 
    fn new(body: &'a [u8]) -> StaticConnector { 
     StaticConnector { body: body } 
    } 
} 

impl<'a> hyper::server::Service for StaticConnector<'a> { 
    type Request = hyper::Uri; 
    type Response = Cursor<Vec<u8>>; 
    type Error = std::io::Error; 
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>; 

    fn call(&self, _: Self::Request) -> Self::Future { 
     Box::new(future::ok(Cursor::new(self.body.to_vec()))) 
    } 
} 

fn main() { 
    let mut core = tokio_core::reactor::Core::new().unwrap(); 
    let handle = core.handle(); 

    // My StaticConnector for testing 
    let hyper_client = hyper::Client::configure() 
     .connector(StaticConnector::new(
      b"\ 
       HTTP/1.1 200 OK\r\n\ 
       Content-Length: 8\r\n\ 
       \r\n\ 
       Maldives\ 
       ", 
     )) 
     .build(&handle); 

    // Real Connector 
    /* 
    let hyper_client = hyper::Client::configure().build(&handle); 
    */ 

    let client = Client { 
     client: &hyper_client, 
     url: "http://ifconfig.co/country", 
    }; 
    let result = core.run(client.get()).unwrap(); 
    println!("{}", result); 
} 

Playground

我猜这是我使用CursorIo这是不完整的在某些方面,但我没能调试并取得进展。一个想法是写入这Cursorhyper::Client大概使得不能按预期工作。也许我需要写入的sink和读取的静态内容的组合?所有想法我都未能取得进展!

回答

0

原始代码不起作用的原因是因为读者方在客户端发送请求之前提供了响应,因此tokio-protorequest/response mismatch错误。修复结果不是微不足道的,首先我们需要安排读者阻止,或者更具体地说,用std::io::ErrorKind::WouldBlock向事件循环指出还没有任何东西,但不认为它是EOF 。另外,一旦我们得到表明请求已被发送的写入,并且tokio-proto机器正在等待响应,我们使用futures::task::current.notify来解锁读取。这里有一个更新的实现,按预期工作:

extern crate futures; 
extern crate hyper; 
extern crate tokio_core; 
extern crate tokio_io; 

use futures::{future, Future, Stream, task, Poll}; 
use std::str::from_utf8; 
use std::io::{self, Cursor, Read, Write}; 
use tokio_io::{AsyncRead, AsyncWrite}; 

struct Client<'a, C: 'a> { 
    client: &'a hyper::Client<C>, 
    url: &'a str, 
} 

impl<'a, C: hyper::client::Connect> Client<'a, C> { 
    fn get(&self) -> Box<Future<Item = String, Error = hyper::Error>> { 
     Box::new(self.client.get(self.url.parse().unwrap()).and_then(|res| { 
      let body = Vec::new(); 
      res.body() 
       .fold(body, |mut acc, chunk| { 
        acc.extend_from_slice(chunk.as_ref()); 
        Ok::<_, hyper::Error>(acc) 
       }) 
       .and_then(move |value| Ok(String::from(from_utf8(&value).unwrap()))) 
     })) 
    } 
} 

struct StaticStream { 
    wrote: bool, 
    body: Cursor<Vec<u8>>, 
} 

impl Read for StaticStream { 
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 
     if self.wrote { 
      self.body.read(buf) 
     } else { 
      Err(io::ErrorKind::WouldBlock.into()) 
     } 
    } 
} 

impl Write for StaticStream { 
    fn write<'a>(&mut self, buf: &'a [u8]) -> io::Result<usize> { 
     self.wrote = true; 
     task::current().notify(); 
     Ok(buf.len()) 
    } 

    fn flush(&mut self) -> io::Result<()> { 
     Ok(()) 
    } 
} 

impl AsyncRead for StaticStream {} 

impl AsyncWrite for StaticStream { 
    fn shutdown(&mut self) -> Poll<(), io::Error> { 
     Ok(().into()) 
    } 
} 

struct StaticConnector<'a> { 
    body: &'a [u8], 
} 

impl<'a> StaticConnector<'a> { 
    fn new(body: &'a [u8]) -> StaticConnector { 
     StaticConnector { body: body } 
    } 
} 

impl<'a> hyper::server::Service for StaticConnector<'a> { 
    type Request = hyper::Uri; 
    type Response = StaticStream; 
    type Error = std::io::Error; 
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>; 

    fn call(&self, _: Self::Request) -> Self::Future { 
     Box::new(future::ok(StaticStream { 
      wrote: false, 
      body: Cursor::new(self.body.to_vec()), 
     })) 
    } 
} 

fn main() { 
    let mut core = tokio_core::reactor::Core::new().unwrap(); 
    let handle = core.handle(); 

    // My StaticConnector for testing 
    let hyper_client = hyper::Client::configure() 
     .connector(StaticConnector::new(
      b"\ 
       HTTP/1.1 200 OK\r\n\ 
       Content-Length: 8\r\n\ 
       \r\n\ 
       Maldives\ 
       ", 
     )) 
     .build(&handle); 

    // Real Connector 
    /* 
    let hyper_client = hyper::Client::configure().build(&handle); 
    */ 

    let client = Client { 
     client: &hyper_client, 
     url: "http://ifconfig.co/country", 
    }; 
    let result = core.run(client.get()).unwrap(); 
    println!("{}", result); 
} 

Playground

注:此实现适用于简单的情况,但我还没有测试更复杂的情况。例如,我不确定的是有多大的请求/响应行为,因为它们可能涉及多于一个读/写呼叫。