2016-09-22 84 views
0

任何回应,我有这样一个简单的查询:西提我没有从汇总查询

define stream myEventStream (userId string, price int); 
define stream outputStream (avgPrice double, userId string); 

@info(name = 'aQuery') 
from myEventStream#window.time(5000) 
select avg(price) as avgPrice, userId group by userId 
insert into outputStream; 

当我添加查询回调,我不从它那里得到任何回应。

runtime.addCallback("aQuery", new QueryCallback() { 
     @Override 
     public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { 
      EventPrinter.print(timeStamp, inEvents, removeEvents); 
     } 
    }); 

我在另一个线程产生消息:

final AtomicInteger counter = new AtomicInteger(0); 
    final Random rnd = new Random(System.currentTimeMillis()); 
    ExecutorService executor = Executors.newSingleThreadExecutor(); 
    executor.submit(() -> { 
     while (counter.getAndIncrement() < 100) { 
      try { 
       handler.send(new Object[]{"user1", rnd.nextInt(100)}); 
       handler.send(new Object[]{"user2", rnd.nextInt(100)}); 
       handler.send(new Object[]{"user3", rnd.nextInt(100)}); 
       System.out.println("Sent: " + counter.get()); 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       throw new RuntimeException(e); 
      } 
     } 
    }); 

我期待导致每5秒。我在这里错过了什么?请帮忙。 谢谢。

回答

0

那么问题是,我已经关闭了运行时后,产生的事件流的代码。尽管如此,我仍然可以向运行时发送消息。没有例外。