2016-08-11 57 views
7

我正在尝试创建一个系统,通过它我的应用程序可以从Redis PubSub渠道接收流式数据并对其进行处理。我使用,与其他所有的Redis驱动铁锈,我已经看到沿Redis driver,使用阻塞操作从只有在接收到的数据返回一个值的通道获得数据:如何使用futures.rs和Redis PubSub实现阻塞呼叫的期货流?

let msg = match pubsub.get_message() { 
     Ok(m) => m, 
     Err(_) => panic!("Could not get message from pubsub!") 
}; 
let payload: String = match msg.get_payload() { 
    Ok(s) => s, 
    Err(_) => panic!("Could not convert redis message to string!") 
}; 

我我想在将来使用futures-rs库来封装此阻塞函数调用,以便在等待输入时我可以在我的应用程序中执行其他任务。

我读了tutorial期货,并试图创建一个Stream,这将显示PubSub收到数据时的信号,但我无法弄清楚如何操作。

如何创建schedulepoll函数用于阻止pubsub.get_message()函数?

+5

使用图书馆的当天它有大的公告;多么雄心勃勃!^_^ – Shepmaster

回答

10

沉重的警告我从来没有使用过这个库,而且我对一些概念的底层知识有点......缺乏。大多数情况下,我通过the tutorial阅读。我很确定,任何做过异步工作的人都会阅读并大笑,但这对其他人来说可能是一个有用的起点。买者自负!


让我们用一些简单一点开始,演示如何Stream的作品。我们可以的Result的Iterator转换成流:

extern crate futures; 

use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let payloads: Vec<Result<String,()>> = vec![Ok("a".into()), Ok("b".into())]; 
    let payloads = stream::iter(payloads.into_iter()); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
} 

这说明我们消费流的一种方式。我们使用and_then对每个有效负载(这里只是将其打印出来)做一些事情,然后使用for_eachStream转换回Future。然后我们可以通过调用奇怪的名称forget method来运行未来。


接下来是将Redis库绑定到混合中,只处理一条消息。由于get_message()方法处于阻塞状态,因此我们需要在混合中引入一些线程。在这种类型的异步系统中执行大量工作并不是一个好主意,因为其他一切都将被阻止。 For example

除另有安排是这样的,应当确保该功能完成的实现非常快

在一个理想的世界里,redis板条箱将建立在像期货这样的图书馆之上,并将所有这些本地化。

extern crate redis; 
extern crate futures; 

use std::thread; 
use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); 

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); 
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); 

    let (tx, payloads) = stream::channel(); 

    let redis_thread = thread::spawn(move || { 
     let msg = pubsub.get_message().expect("Unable to get message"); 
     let payload: Result<String, _> = msg.get_payload(); 
     tx.send(payload).forget(); 
    }); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
    redis_thread.join().expect("unable to join to thread"); 
} 

我的理解在这里变得更模糊。在单独的线程中,我们阻止该消息,并在获取消息时将其推送到频道中。我不明白的是为什么我们需要抓住线程的手柄。我期望foo.forget会阻止自己,等待流是空的。

在telnet连接到Redis的服务器,发送此:

publish rust awesome 

你会看到它的工作原理。添加打印语句表明(对我来说)foo.forget语句是在线程产生之前运行的。


多条消息比较棘手。 Sender消耗自身,以防止发电方过于耗费方太多。这是通过从send返回另一个未来!我们需要穿梭回离开那里重用它循环的下一次迭代:

extern crate redis; 
extern crate futures; 

use std::thread; 
use std::sync::mpsc; 

use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); 

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); 
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); 

    let (tx, payloads) = stream::channel(); 

    let redis_thread = thread::spawn(move || { 
     let mut tx = tx; 

     while let Ok(msg) = pubsub.get_message() { 
      let payload: Result<String, _> = msg.get_payload(); 

      let (next_tx_tx, next_tx_rx) = mpsc::channel(); 

      tx.send(payload).and_then(move |new_tx| { 
       next_tx_tx.send(new_tx).expect("Unable to send successor channel tx"); 
       futures::finished(()) 
      }).forget(); 

      tx = next_tx_rx.recv().expect("Unable to receive successor channel tx"); 
     } 
    }); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
    redis_thread.join().expect("unable to join to thread"); 
} 

我相信会有这种类型的互操作的多个生态系统随着时间的推移。例如,futures-cpupool箱子可以可能进行扩展以支持类似的用例来此。

+0

感谢您的惊人答案!只有一个问题:不加入'redis_thread'否定了使得结果读取过程非阻塞的全部努力?也许有一些我不了解的东西。 – Ameo

+1

“我期望foo.forget会阻止自己,等到流为空”实际上,期货没有义务提供“阻止直到就绪”的方法。 '忘记()',就其描述而言,需要在未来丢失时防止自动取消,但与等待无关。例如,在Scala中,没有关于Future的方法,但是有一个独立的'Await.ready' /'Await.result'方法,在一定的时间内等待未来准备就绪。 –

+1

据我所知,未来-RS可以通过['Future :: select']实现类似的功能(http://alexcrichton.com/futures-rs/futures/trait.Future.html#method .select),第二个未来在固定超时后完成。 –