沉重的警告我从来没有使用过这个库,而且我对一些概念的底层知识有点......缺乏。大多数情况下,我通过the tutorial阅读。我很确定,任何做过异步工作的人都会阅读并大笑,但这对其他人来说可能是一个有用的起点。买者自负!
让我们用一些简单一点开始,演示如何Stream
的作品。我们可以的Result
的Iterator转换成流:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String,()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
这说明我们消费流的一种方式。我们使用and_then
对每个有效负载(这里只是将其打印出来)做一些事情,然后使用for_each
将Stream
转换回Future
。然后我们可以通过调用奇怪的名称forget
method来运行未来。
接下来是将Redis库绑定到混合中,只处理一条消息。由于get_message()
方法处于阻塞状态,因此我们需要在混合中引入一些线程。在这种类型的异步系统中执行大量工作并不是一个好主意,因为其他一切都将被阻止。 For example:
除另有安排是这样的,应当确保该功能完成的实现非常快。
在一个理想的世界里,redis板条箱将建立在像期货这样的图书馆之上,并将所有这些本地化。
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
我的理解在这里变得更模糊。在单独的线程中,我们阻止该消息,并在获取消息时将其推送到频道中。我不明白的是为什么我们需要抓住线程的手柄。我期望foo.forget
会阻止自己,等待流是空的。
在telnet连接到Redis的服务器,发送此:
publish rust awesome
你会看到它的工作原理。添加打印语句表明(对我来说)foo.forget
语句是在线程产生之前运行的。
多条消息比较棘手。 Sender
消耗自身,以防止发电方过于耗费方太多。这是通过从send
返回另一个未来!我们需要穿梭回离开那里重用它循环的下一次迭代:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
我相信会有这种类型的互操作的多个生态系统随着时间的推移。例如,futures-cpupool箱子可以可能进行扩展以支持类似的用例来此。
使用图书馆的当天它有大的公告;多么雄心勃勃!^_^ – Shepmaster