使用Spring Cloud DataFlow 1.3.0.M2与Spring Cloud Stream Starters Celsius.M1。Spring Cloud DataFlow:获取有效载荷作为列表<Map>
我有两个处理器。首先产生一个List<Map>
,它应该被另一个消耗掉。这里是简化的代码。
// Processor 1
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
// Note: had Object instead of List<> as the return, hoped perhaps using a
// specific type would help, but no difference.
public List<Map<String, Object>> process(final @Payload MyPojo payload) {
final List<Map<String, Object>> results = worker.doWork(payload);
LOG.debug("Returning " + results.size() + " objects");
return results;
}
// Processor 2
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object process(final @Payload List<Map<String, Object>> payload) {
LOG.debug("Received " + payload.size() + " objects");
final List<Map<String, Object>> results = worker.moreWork(payload);
return results;
}
我部署这两款处理器在使用新加坡民防部队壳管道:
<source> | otherProcessors | processor1 | processor2 | log
调试消息处理器1表示,它已在列表2级的对象。处理器2的调试消息表明它收到了40个对象(每个映射有20个键=值对) - 看起来这两个映射变成了一个key = value对的列表。
我已启用对org.spring.integration
调试日志记录和显示的消息具有的地图格式的列表(这是从处理器2):
preSend on channel 'input', message: GenericMessage
[payload=[{"m1key1":"val1","m1key2":"val2",...,"m1key20":"val20"},
{"m2key1":"val1","m2key2":"val2",...,"m2key20":"val20"}], headers={..}]
我想处理器2接收所产生的2个地图处理器1.我不知道这是否与泛型类型有关。有人可以指向我的配置,以实现这一目标吗?
----更新阿尔乔姆的评论----
处理器1具有这种在其application.properties
文件:
spring.cloud.stream.bindings.output.content-type=application/json
我还曾试图改变这样的流定义,但它没”不像是会有所作为:
<source> | otherProcessors | processor1 --outputType=application/json | processor2 --inputType=application/json | log
你知道这看起来不像JSON,但的确如'GenericMessage.toString()'。你会介意在这个问题上分享你的头像吗?至少'contentType' –
是的,你在这些处理器上有什么“输入/输出”配置?我的意思是'绑定'设置 –
更新后回答问题,阿尔乔姆,谢谢。 – user944849