2017-10-10 69 views
0

我已经与下列mapPartition功能的程序异步Web服务调用:阿帕奇弗林克:做正确的MapReduce内()

public void mapPartition(Iterable<Tuple> values, Collector<Tuple2<Integer, String>> out) 

我收集的100从输入values &分批送他们到一个web服务进行转换。结果我加回到out集合。

为了加快这个过程,我通过使用Executors做了网络服务调用async。这造成了问题,要么我得到taskManager released exceptionAskTimeoutException。我增加了内存&超时,但它没有帮助。有相当多的输入数据。我相信这导致了很多工作排在了ExecutorService &之间,因此占用了大量内存。

什么是最好的方法呢?

我也在看taskManager vs taskSlot的配置,但对这两者之间的区别有些困惑(我猜他们与进程vs线程相似?)。不知道我在什么时候增加taskManagers vs taskSlots?例如如果我有三台每台机器有4cpus的机器,那么我的taskManager=3应该是我的taskSlot=4

我还在考虑仅增加mapPartition的并行度来说10以获得更多的线程来访问web服务。意见或建议?

回答

1

您应该查看Flink Asyncio,这将使您能够在流式应用程序中以异步方式查询您的web服务。

需要注意的一件事是,Asyncio函数不被称为多线程,并且每个分区每个分区被连续调用一次,因此您的Web应用程序需要确定性地返回并且可能快速返回以便不被占用。

而且,潜在的更高的分区数量,将有助于你的情况,但再次你的web服务需要满足足够快

示例代码块的请求从Flinks网站:

// This example implements the asynchronous request and callback with Futures that have the 
// interface of Java 8's futures (which is the same one followed by Flink's Future) 

/** 
* An implementation of the 'AsyncFunction' that sends requests and sets the callback. 
*/ 
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> { 

/** The database specific client that can issue concurrent requests with callbacks */ 
private transient DatabaseClient client; 

@Override 
public void open(Configuration parameters) throws Exception { 
    client = new DatabaseClient(host, post, credentials); 
} 

@Override 
public void close() throws Exception { 
    client.close(); 
} 

@Override 
public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception { 

    // issue the asynchronous request, receive a future for result 
    Future<String> resultFuture = client.query(str); 

    // set the callback to be executed once the request by the client is complete 
    // the callback simply forwards the result to the collector 
    resultFuture.thenAccept((String result) -> { 

     asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result))); 

    }); 
    } 
} 

// create the original stream (In your case the stream you are mappartitioning) 
DataStream<String> stream = ...; 

// apply the async I/O transformation 
DataStream<Tuple2<String, String>> resultStream = 
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100); 

编辑:

由于用户想要创建大小为100的批次,并且asyncio特定于Streaming API,因此最好的方法是创建大小为100的countwindows。

此外,要清除最后一个窗口,可能没有100个事件,自定义Triggers可与计数触发器和基于时间的触发器组合使用,以便在元素数或每隔几分钟后启动触发器。

一个很好的跟进,请在这里Flink Mailing List,其中用户“克斯特亚”创建一个自定义的触发器,它可here

+0

不幸的是这是在流API。我正在使用批处理API。我需要将这些项目收集到100个批次中 - 没有看到在Streaming API中进行这种转换的方法。 – Vineet

+0

为什么不创建一个计数窗口,你的计数大小是100,并在那工作?更多信息在这里:https://ci.apache.org/projects/flink/flink-docs-release-1。3/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#countWindow-long- –

+0

最后一个问题 - 当我收集100个批次时,最后一批会有剩余的元素小于100)。我怎么能为此开启窗户? – Vineet