2017-01-23 74 views
1

我正在尝试使用Future.rs来管理一个单独的过程中的一些任务。我看到了如何等待每个创建的未来以及如何一个接一个地处理它们,但我无法在执行过程中调查未来以了解其状态。我总是有错误:如何在未等待的情况下轮询未来状态?

thread 'main' panicked at 'no Task is currently running'

我想在将来的处理过程中做某件事,直到它完成。也许我没有正确地使用它?我设法通过使用频道使其工作,但我认为应该有可能轮询未来,以及何时可以得到结果。 我用它来测试它的代码是:

fn main() { 
    println!("test future"); 
    let thread_pool = CpuPool::new(4); 
    let mut future_execution_list = vec![]; 
    let mutex = Arc::new(AtomicUsize::new(0)); 
    //create the future to process 
    for _ in 0..10 { 
     let send_mutex = mutex.clone(); 
     let future = thread_pool.spawn_fn(move || { 
      //Simulate long processing 
      thread::sleep(time::Duration::from_millis(10)); 
      let num = send_mutex.load(Ordering::Relaxed); 
      send_mutex.store(num + 1, Ordering::Relaxed); 
      let res: Result<usize,()> = Ok(num); 
      res 

     }); 
     future_execution_list.push(future); 
    } 
    // do the job 
    loop { 
     for future in &mut future_execution_list { 
      match future.poll() { 
       Ok(Async::NotReady) =>(), //do nothing 
       Ok(Async::Ready(num)) => { 
        //update task status 
        println!(" future {:?}", num); 
       } 
       Err(_) => { 
        //log error and set task status to err 
        () 
       } 
      }; 
     } 
     //do something else 
    } 
} 

所以我完成Shepmaster答案后,我的问题。您的评论非常有趣,但我仍无法找到解决我的问题的方案。我会添加一些关于我的问题的信息。我想在自动化计划任务上同时管理多个任务。有一个循环,其中管理事件并计算任务计划。当一个任务被安排时,它就产生了。当任务结束时,新的调度完成。在任务执行期间,管理事件。一个speudo代码可以是:

loop { 
    event.try_recv() { ...} //manage user command for exemple 
    if (schedule) { 
     let tasks_to_spawn = schedule_task(); 
     let futures = tasks_to_spawn.map(|task| { 
      thread_pool.spawn_fn(....)}); 
     let mut one = future::select_all(futures); 
     while let Ok((value, _idx, remaining)) = one.wait() {..} //wait here 
    } 
    //depend on task end state and event set schedule to true or false. 

} 

我可以联合调度,并像在未来的任务:

let future = schedule.and_them(|task| execute_task); 

但我仍然需要等待第一任务执行结束。 我可以把所有事情放在未来(事件管理,时间表,任务),并等待第一个像你提议的结束。我尝试了,但是我没有看到如何用不同的Item和Error类型来制作未来的vec。有了这个概念,我必须在线程之间管理更多的数据。事件管理和调度不必在不同的线程中执行。

我看到另一个问题,select_all采取vec的所有权。如果在执行另一个任务期间需要新的任务,我该如何改变vec并添加新的未来?

不知道你是否有一个简单的解决方案。我在想,使用isDone()等方法在执行过程中未来的状态很容易,而不用等待。也许是有计划的,我没有看到有关这方面的公关。 如果你有一个简单的解决方案,它会很好,否则我会重新思考我的概念。

+0

有一个99.9%的机会,你做**不**要以这种方式使用原子变量。相反,你希望'fetch_add'和绝大多数人不需要'Relaxed'排序。 – Shepmaster

+0

您的权利,我只是复制/粘贴一些代码,以表明我想从未来的执行中得到一个结果,这取决于其他期货的执行情况。 –

+0

这是[*非常糟糕的形式*在您收到答案后更改您的问题**,特别是当更改使这些答案无效时](http://meta.stackoverflow.com/q/309237/155423)。从一开始就提出一个包含任何相关细节的好问题是提问者的问题。 – Shepmaster

回答

2

要投票Future您必须有Task。要获得Task,您可以轮询Future传递给futures::executor::spawn()。如果你重写你的例子的loop像这样:

futures::executor::spawn(futures::lazy(|| { 
    // existing loop goes here 
})).wait_future(); 

它运行。

至于为什么 a Future只能在任务中进行轮询,我相信这样可以让轮询可以拨打Task::Unpark

0

I want to do something during the future processing

据我了解,这就是期货 - 东西,可以并行发生。如果你想做其他事情,那么再创造一个未来并投入其中!

你基本上已经在做这个 - 你的每个线程都是“做别的事情”。

poll the future and when it's ready get the result

使用future::select_all,您可以合并多个期货并获得取其完成第一。然后由你决定等待下一个。

一种可能实现:

extern crate rand; 
extern crate futures; 
extern crate futures_cpupool; 

use rand::Rng; 
use futures::{future, Future}; 
use futures_cpupool::CpuPool; 

use std::{thread, time}; 

fn main() { 
    let thread_pool = CpuPool::new(4); 

    let futures = (0..10).map(|i| { 
     thread_pool.spawn_fn(move || -> Result<usize,()> { 
      let mut rng = rand::thread_rng(); 
      // Simulate long processing 
      let sleep_time = rng.gen_range(10, 100); 
      let sleep_time = time::Duration::from_millis(sleep_time); 
      for _ in 0..10 { 
       println!("Thread {} sleeping", i); 
       thread::sleep(sleep_time); 
      } 
      Ok(i) 
     }) 
    }); 

    let mut one = future::select_all(futures); 
    while let Ok((value, _idx, remaining)) = one.wait() { 
     println!("Future #{} finished", value); 
     if remaining.is_empty() { 
      break; 
     } 
     one = future::select_all(remaining); 
    } 
} 

在通话过程中,以wait,多事情发生的同时!这可以通过对交织的输出中可以看出:

Thread 2 sleeping 
Thread 0 sleeping 
Thread 3 sleeping 
Thread 1 sleeping 
Thread 3 sleeping 
Thread 0 sleeping 
Thread 1 sleeping 
Thread 2 sleeping 
Thread 3 sleeping 

可以验证的东西被并行由睡眠时间设定为1秒为每个线程和定时的整体方案发生。由于有10个期货,需要1秒,并行度为4,整个程序需要3秒才能运行。


红利代码审查:

  1. 不拆装载并设置一个原子变量来实现递增 - 存储的值可能已经由两个电话之间另一个线程改变。使用fetch_add
  2. 您真的在使用它们之前应该知道what the orderings mean。我不知道他们,所以我总是使用SeqCst
  3. 由于这个例子并不重要,我完全删除了原子变量。
  4. 总是喜欢收集到Vec而不是在循环内推入。这允许更优化的分配。
  5. 在这种情况下,根本不需要Vec,因为select_all接受任何可以变成迭代器的东西。
相关问题