2011-09-26 130 views
10

我刚刚发现只有一个NIO工具,即Java NIO Pipe,用于在线程之间传递数据。使用这种机制比通过队列传递更传统的消息(如ArrayBlockingQueue)有什么优势吗?Java NIO Pipe vs BlockingQueue

+0

管道经过的内核,因为选择具有唤醒很少有用...这是通过在Linux上实现管... – bestsss

+0

@bestsss护理阐述?您可以通过选择器注册管道来接收通知,这有什么问题? – raffian

+0

@raffian,简单地说 - 你不能真正使用IPC来管理IPC,并且在这个过程中有很多更有效的方式来传递信息。 – bestsss

回答

6

通常,将数据传递给另一个线程进行处理的最简单方法是使用ExecutorService。这种包装了两个队列和线程池(可以有一个线程)

您可以使用一个管道时,你有支持NIO通道库。如果你想在线程之间传递数据的ByteBuffers,这也很有用。

否则其通常使用ArrayBlockingQueue更简单/更快。

如果你想要一个更快的方式在线程之间交换数据,我建议你看看Exchanger,但它不像ArrayBlockingQueue那样具有通用性。

The Exchanger and GC-less Java

+0

谢谢,我从来没有考虑过使用交换器使GC开销最小化的事实。然而,换热器的缺点是它是同步的。通常你只是想将数据抽入另一个线程而不必等待它被拾取。 – Maxaon3000

+0

管道是固定的大小。如果制片人制作速度不得不停下来,问题是一样的。如果生产者从未在消费者完成之前填充缓冲区,则不必停止(无论是哪种情况) –

+1

管道用于实施Selector.wakeup,除此之外它们并不是非常有用,因为仅限于内存的解决方案更有效,不要通过内核。 – bestsss

1

我想管将有更好的延迟,因为它可以很可能与幕后协同程序来实现。因此,当数据可用时,生产者立即向消费者屈服,而不是在线程调度器决定时。

管道通常占一个消费者 - 生产者的问题,很可能使两个线程合作来实现这种方式,而不是外部抢占。

2

我相信NIO管的设计,让您可以将数据发送到在一个线程安全的方式选择内循环的通道,换句话说,任何线程可以写信给管和数据将在其他处理管道的极端,在选择器回路内部。当你写入管道时,你可以使另一侧的通道可读。

+0

我想知道在简单队列轮询上使用选择器循环的线程之间传递数据的性能特征。另外,通过管道传递数据似乎带来了不必将字节而不是对象传递给其他线程的不便。换句话说,它迫使你为线程间数据交换开发一个有线协议。 – Maxaon3000

+0

你的意思是在ConcurrentLinkedQueue上,对吧?这是一个很好的问题。我在ConcurrentLinkedQueue上下了我的筹码。 :)但是我看到管道的一个优点是:你发送一条消息,就像其他人正在做的一样,换句话说,你从一个通道读取消息,而不是从队列中读取一个对象。 – chrisapotek

3

所以有很多的麻烦管(check here)之后,我决定赞成非阻塞并发队列超过NIO管道。所以我在Java的ConcurrentLinkedQueue上做了一些基准测试。请看下图:

public static void main(String[] args) throws Exception { 

    ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); 

    // first test nothing: 

    for (int j = 0; j < 20; j++) { 

     Benchmarker bench = new Benchmarker(); 

     String s = "asd"; 

     for (int i = 0; i < 1000000; i++) { 
      bench.mark(); 
      // s = queue.poll(); 
      bench.measure(); 
     } 

     System.out.println(bench.results()); 

     Thread.sleep(100); 
    } 

    System.out.println(); 

    // first test empty queue: 

    for (int j = 0; j < 20; j++) { 

     Benchmarker bench = new Benchmarker(); 

     String s = "asd"; 

     for (int i = 0; i < 1000000; i++) { 
      bench.mark(); 
      s = queue.poll(); 
      bench.measure(); 
     } 

     System.out.println(bench.results()); 

     Thread.sleep(100); 
    } 

    System.out.println(); 

    // now test polling one element on a queue with size one 

    for (int j = 0; j < 20; j++) { 

     Benchmarker bench = new Benchmarker(); 

     String s = "asd"; 
     String x = "pela"; 

     for (int i = 0; i < 1000000; i++) { 
      queue.offer(x); 
      bench.mark(); 
      s = queue.poll(); 
      bench.measure(); 
      if (s != x) throw new Exception("bad!"); 
     } 

     System.out.println(bench.results()); 

     Thread.sleep(100); 
    } 

    System.out.println(); 

    // now test polling one element on a queue with size two 

    for (int j = 0; j < 20; j++) { 

     Benchmarker bench = new Benchmarker(); 

     String s = "asd"; 
     String x = "pela"; 

     for (int i = 0; i < 1000000; i++) { 
      queue.offer(x); 
      queue.offer(x); 
      bench.mark(); 
      s = queue.poll(); 
      bench.measure(); 
      if (s != x) throw new Exception("bad!"); 
      queue.poll(); 
     } 

     System.out.println(bench.results()); 

     Thread.sleep(100); 
    } 
} 

结果:

totalLogs=1000000, minTime=0, maxTime=85000, avgTime=58.61 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=5281000, avgTime=63.35 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=725000, avgTime=59.71 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=25000, avgTime=58.13 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=378000, avgTime=58.45 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=15000, avgTime=57.71 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=170000, avgTime=58.11 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=1495000, avgTime=59.87 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=232000, avgTime=63.0 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=184000, avgTime=57.89 (times in nanos) 

totalLogs=1000000, minTime=0, maxTime=2600000, avgTime=65.22 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=850000, avgTime=60.5 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=150000, avgTime=63.83 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=43000, avgTime=59.75 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=276000, avgTime=60.02 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=457000, avgTime=61.69 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=204000, avgTime=60.44 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=154000, avgTime=63.67 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=355000, avgTime=60.75 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=338000, avgTime=60.44 (times in nanos) 

totalLogs=1000000, minTime=0, maxTime=345000, avgTime=110.93 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=396000, avgTime=100.32 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=298000, avgTime=98.93 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=1891000, avgTime=101.9 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=254000, avgTime=103.06 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=1894000, avgTime=100.97 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=230000, avgTime=99.21 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=348000, avgTime=99.63 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=922000, avgTime=99.53 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=168000, avgTime=99.12 (times in nanos) 

totalLogs=1000000, minTime=0, maxTime=686000, avgTime=107.41 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=320000, avgTime=95.58 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=248000, avgTime=94.94 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=217000, avgTime=95.01 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=159000, avgTime=93.62 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=155000, avgTime=95.28 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=106000, avgTime=98.57 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=370000, avgTime=95.01 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=1836000, avgTime=96.21 (times in nanos) 
totalLogs=1000000, minTime=0, maxTime=212000, avgTime=98.62 (times in nanos) 

结论:

的MAXTIME很可怕,但我认为它是安全的结论,我们是在50毫微秒范围轮询并发队列。