2010-02-01 93 views
40

类似的问题:包装纸异步计算到同步(阻塞)计算

我有一个方法的对象我想暴露到库客户机(尤其是脚本客户端)如下所示:

interface MyNiceInterface 
{ 
    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg); 
    public Future<Baz> doSomething(Foo fooArg, Bar barArg); 
    // doSomethingAndBlock is the straightforward way; 
    // doSomething has more control but deals with 
    // a Future and that might be too much hassle for 
    // scripting clients 
} 

但原始的“东西”我可以是一组事件驱动类:

interface BazComputationSink 
{ 
    public void onBazResult(Baz result); 
} 

class ImplementingThing 
{ 
    public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink); 
} 

其中ImplementingThing需要投入,做一些神秘的东西,就像一个任务队列进行排队的东西,再后来当结果出现,sink.onBazResult()被调用的线程可能与调用ImplementingThing.doSomethingAsync()的线程相同或不同。

有没有一种方法可以使用事件驱动的函数以及并发基元来实现MyNiceInterface,以便脚本客户端可以愉快地等待阻塞线程?

编辑:我可以使用FutureTask吗?

回答

41

使用自己的未来实行:

public class BazComputationFuture implements Future<Baz>, BazComputationSink { 

    private volatile Baz result = null; 
    private volatile boolean cancelled = false; 
    private final CountDownLatch countDownLatch; 

    public BazComputationFuture() { 
     countDownLatch = new CountDownLatch(1); 
    } 

    @Override 
    public boolean cancel(final boolean mayInterruptIfRunning) { 
     if (isDone()) { 
      return false; 
     } else { 
      countDownLatch.countDown(); 
      cancelled = true; 
      return !isDone(); 
     } 
    } 

    @Override 
    public Baz get() throws InterruptedException, ExecutionException { 
     countDownLatch.await(); 
     return result; 
    } 

    @Override 
    public Baz get(final long timeout, final TimeUnit unit) 
      throws InterruptedException, ExecutionException, TimeoutException { 
     countDownLatch.await(timeout, unit); 
     return result; 
    } 

    @Override 
    public boolean isCancelled() { 
     return cancelled; 
    } 

    @Override 
    public boolean isDone() { 
     return countDownLatch.getCount() == 0; 
    } 

    public void onBazResult(final Baz result) { 
     this.result = result; 
     countDownLatch.countDown(); 
    } 

} 

public Future<Baz> doSomething(Foo fooArg, Bar barArg) { 
    BazComputationFuture future = new BazComputationFuture(); 
    doSomethingAsync(fooArg, barArg, future); 
    return future; 
} 

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { 
    return doSomething(fooArg, barArg).get(); 
} 

该解决方案在内部创建一个CountDownLatch,一旦收到回调就会清除它。如果用户调用get,则CountDownLatch用于阻止调用线程,直到计算完成并调用onBazResult回调。 CountDownLatch将确保如果在调用get()之前发生回调,get()方法将立即返回结果。

+0

(+1)很好的解决方案 – skaffman 2010-02-01 22:27:34

+0

+1,因为我认为我理解它......但是您能解释并发性方面吗? (使用倒计时锁存器) – 2010-02-01 22:58:10

+0

你也在onBazResult()中的某个地方缺少“done = true”...... – 2010-02-01 23:01:15

14

那么,有做类似的简单的解决方案:

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { 
    final AtomicReference<Baz> notifier = new AtomicReference(); 
    doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
    public void onBazResult(Baz result) { 
     synchronized (notifier) { 
     notifier.set(result); 
     notifier.notify(); 
     } 
    } 
    }); 
    synchronized (notifier) { 
    while (notifier.get() == null) 
     notifier.wait(); 
    } 
    return notifier.get(); 
} 

当然,这里假设你的Baz结果永远不会为空...

+0

这个工程,是一个不错的优雅模式。谢谢。 – 2013-12-11 00:23:44

+0

内部回调(BazComputationSink(){}),它会抱怨 - 初始化通知程序。如果初始化为null,则会为第一次命中引发NullPointerException,导致我的结果还没有准备好回到异步任务中。 – 2014-06-19 01:46:06

+1

这是不推荐的:如果你的回调立即返回,在等待之前会通知通知,并且你会遇到死锁(如果你有一个缓存逻辑,这可能会发生) – for3st 2015-09-14 14:05:48

11

谷歌guava library有一个易于使用的SettableFuture,使这个问题很简单(约10行代码)。

public class ImplementingThing { 

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { 
    try { 
     return doSomething(fooArg, barArg).get(); 
    } catch (Exception e) { 
     throw new RuntimeException("Oh dear"); 
    } 
}; 

public Future<Baz> doSomething(Foo fooArg, Bar barArg) { 
    final SettableFuture<Baz> future = new SettableFuture<Baz>(); 
    doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
     @Override 
     public void onBazResult(Baz result) { 
      future.set(result); 
     } 
    }); 
    return future; 
}; 

// Everything below here is just mock stuff to make the example work, 
// so you can copy it into your IDE and see it run. 

public static class Baz {} 
public static class Foo {} 
public static class Bar {} 

public static interface BazComputationSink { 
    public void onBazResult(Baz result); 
} 

public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) { 
    new Thread(new Runnable() { 
     @Override 
     public void run() { 
      try { 
       Thread.sleep(4000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      Baz baz = new Baz(); 
      sink.onBazResult(baz); 
     } 
    }).start(); 
}; 

public static void main(String[] args) { 
    System.err.println("Starting Main"); 
    System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null)); 
    System.err.println("Ending Main"); 
} 
3

一个很简单的例子,只是为了了解CountDownLatch没有任何 额外的代码。

A java.util.concurrent.CountDownLatch是一个并发结构,它允许一个或多个线程等待给定的一组操作来完成。

A CountDownLatch用给定的计数初始化。该计数通过调用countDown()方法递减。等待此计数达到零的线程可以调用await()方法之一。调用await()会阻塞该线程,直到计数达到零。

下面是一个简单的例子。在减价者在CountDownLatch上呼叫countDown()三次后,等待的服务员从await()呼叫中释放。

你也可以提一些TimeOut等待。

CountDownLatch latch = new CountDownLatch(3); 

Waiter  waiter  = new Waiter(latch); 
Decrementer decrementer = new Decrementer(latch); 

new Thread(waiter)  .start(); 
new Thread(decrementer).start(); 

Thread.sleep(4000); 
public class Waiter implements Runnable{ 

    CountDownLatch latch = null; 

    public Waiter(CountDownLatch latch) { 
     this.latch = latch; 
    } 

    public void run() { 
     try { 
      latch.await(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     System.out.println("Waiter Released"); 
    } 
} 

// --------------

public class Decrementer implements Runnable { 

    CountDownLatch latch = null; 

    public Decrementer(CountDownLatch latch) { 
     this.latch = latch; 
    } 

    public void run() { 

     try { 
      Thread.sleep(1000); 
      this.latch.countDown(); 

      Thread.sleep(1000); 
      this.latch.countDown(); 

      Thread.sleep(1000); 
      this.latch.countDown(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

Reference

如果你不想使用CountDownLatch或你的要求是什么与Facebook相似并且与功能不同。意思是如果一个方法被调用,那么不要调用其他方法。

在这种情况下,你可以声明

private volatile Boolean isInprocessOfLikeOrUnLike = false; 

然后你就可以在你的方法调用的开头,如果它是false然后调用方法,否则返回..取决于你的实施检查。

+0

volatile布尔值?我想你想要一个易变的布尔值(lowecase b),或者更好:AtomicReference – 2016-06-02 15:36:58

2

下面是基于保罗Wagland的回答更通用的解决方案:

public abstract class AsyncRunnable<T> { 
    protected abstract void run(AtomicReference<T> notifier); 

    protected final void finish(AtomicReference<T> notifier, T result) { 
     synchronized (notifier) { 
      notifier.set(result); 
      notifier.notify(); 
     } 
    } 

    public static <T> T wait(AsyncRunnable<T> runnable) { 
     final AtomicReference<T> notifier = new AtomicReference<>(); 

     // run the asynchronous code 
     runnable.run(notifier); 

     // wait for the asynchronous code to finish 
     synchronized (notifier) { 
      while (notifier.get() == null) { 
       try { 
        notifier.wait(); 
       } catch (InterruptedException ignore) {} 
      } 
     } 

     // return the result of the asynchronous code 
     return notifier.get(); 
    } 
} 

下面是一个例子,如何使用它::

String result = AsyncRunnable.wait(new AsyncRunnable<String>() { 
     @Override 
     public void run(final AtomicReference<String> notifier) { 
      // here goes your async code, e.g.: 
      new Thread(new Runnable() { 
       @Override 
       public void run() { 
        finish(notifier, "This was a asynchronous call!"); 
       } 
      }).start(); 
     } 
    }); 

代码的更详细的版本可以在这里找到: http://pastebin.com/hKHJUBqE

编辑: 与问题相关的示例将是:

public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) { 
    return AsyncRunnable.wait(new AsyncRunnable<Baz>() { 
     @Override 
     protected void run(final AtomicReference<Baz> notifier) { 
      doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
       public void onBazResult(Baz result) { 
        synchronized (notifier) { 
         notifier.set(result); 
         notifier.notify(); 
        } 
       } 
      }); 
     } 
    }); 
} 
+0

我理解你的答案,但我不知道如何将它映射到我六年前的问题。 – 2016-03-15 03:12:11

+0

我更新了答案,所以你可以将它映射到问题 – 2016-03-15 03:31:18

+0

顺便说一句,如果你必须在可能的地方这样做,我会考虑使用这个:https://github.com/ReactiveX/RxJava。尽管它的学习曲线陡峭(至少对我而言),但ReactiveX仍然是您的问题的完美选择。这是一个很好的介绍:https://www.youtube.com/watch?v = _t06LRX0DV0。 – 2016-03-15 03:41:04

2

这是死的简单与RxJava 2.X:

try { 
    Baz baz = Single.create((SingleEmitter<Baz> emitter) -> 
      doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result))) 
      .toFuture().get(); 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} catch (ExecutionException e) { 
    e.printStackTrace(); 
} 

或者不LAMBDA符号:

Baz baz = Single.create(new SingleOnSubscribe<Baz>() { 
       @Override 
       public void subscribe(SingleEmitter<Baz> emitter) { 
        doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
         @Override 
         public void onBazResult(Baz result) { 
          emitter.onSuccess(result); 
         } 
        }); 
       } 
      }).toFuture().get(); 

更简单:

Baz baz = Single.create((SingleEmitter<Baz> emitter) -> 
       doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result))) 
       .blockingGet();