2016-09-18 482 views
1

我在测试spring-cloud-starter-stream-kafka。下面有错误。春云流应用程序,调度员没有订户通道'unknown.channel.name'

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers 
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) 
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) 
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69) 
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) 
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:43) 
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171) 
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50) 
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4$1.doWithRetry(KafkaMessageChannelBinder.java:607) 
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263) 
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154) 
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.onMessage(KafkaMessageChannelBinder.java:604) 
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221) 
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209) 
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) 
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers 
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153) 
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) 
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 
... 32 common frames omitted 

我StreamApplication.java

package de.codecentric; 

import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.cloud.stream.annotation.EnableBinding; 
import org.springframework.cloud.stream.annotation.StreamListener; 
import org.springframework.context.annotation.Bean; 
import org.springframework.integration.annotation.InboundChannelAdapter; 
import org.springframework.integration.annotation.Poller; 
import org.springframework.integration.core.MessageSource; 
import org.springframework.integration.support.MessageBuilder; 

@SpringBootApplication 
@EnableBinding({PersonProcessor.class, LogProcessor.class}) 
public class StreamApplication { 

    public static void main(String[] args) { 
     SpringApplication.run(StreamApplication.class, args); 
    } 

    @StreamListener(LogProcessor.CHANNEL) 
    public void logEvent(EventLog el) { 
     System.out.println("Received event log: " + el.id); 
    } 

    @StreamListener(PersonProcessor.CHANNEL) 
    public void logPerson(Person p) { 
     System.out.println("Received person: " + p.name); 
    } 

    @Bean 
    @InboundChannelAdapter(value = PersonProcessor.CHANNEL, poller = @Poller(fixedDelay = "3000", maxMessagesPerPoll = "1")) 
    public MessageSource<Person> timerMessageSource() { 
     return() -> MessageBuilder.withPayload(new Person()).build(); 
    } 

    @Bean 
    @InboundChannelAdapter(value = LogProcessor.CHANNEL, poller = @Poller(fixedDelay = "3000", maxMessagesPerPoll = "1")) 
    public MessageSource<EventLog> logMessageSource() { 
     return() -> MessageBuilder.withPayload(new EventLog()).build(); 
    } 

    public static class EventLog { 
     private static int seq = 0; 
     public String id = seq++ + ""; 
    } 

    public static class Person { 
     private static int seq = 0; 
     public String name = "hi " + seq++; 
    } 
} 

LogProcessor.java

package de.codecentric; 

import org.springframework.cloud.stream.annotation.Input; 
import org.springframework.cloud.stream.annotation.Output; 
import org.springframework.messaging.MessageChannel; 
import org.springframework.messaging.SubscribableChannel; 


public interface LogProcessor { 
    String CHANNEL = "logs"; 

    @Output(LogProcessor.CHANNEL) 
    MessageChannel output(); 

    @Input(LogProcessor.CHANNEL) 
    SubscribableChannel input(); 
} 

PersonProcessor.java

package de.codecentric; 

import org.springframework.cloud.stream.annotation.Input; 
import org.springframework.cloud.stream.annotation.Output; 
import org.springframework.messaging.MessageChannel; 
import org.springframework.messaging.SubscribableChannel; 


public interface PersonProcessor { 
    String CHANNEL = "person"; 

    @Output(PersonProcessor.CHANNEL) 
    MessageChannel output(); 

    @Input(PersonProcessor.CHANNEL) 
    SubscribableChannel input(); 
} 

我还可以看到输出:

接收人:喜0 收到的事件日志:0 收到的事件日志:4 接收人:喜4 收到的事件日志:9 接收人:喜9

感谢。

+0

不知道这是否有帮助,但我有同样的例外,我发现问题是在我的代码中@RefreshScope注释(我看到你没有包括在你的代码中......但只是如果你从发布的片段中删除,认为它不相关):请参阅https://github.com/spring-cloud/spring-cloud-stream/issues/461 – chrx

回答

4

我不确定这是否是问题,但您的输入和输出通道需要不同的目的地名称 - 例如,

CHANNELIN = personIn,CHANNELOUT = personOut

处理器不打算向自己发送消息;它旨在接收消息,处理消息并将结果发送到不同的目标。

处理器本身不生成消息 - 这是源的目的。

+1

只需添加上面所述的Gary:每个频道都是单独的bean和bean的名字是'@ Input/@ Output'注解的参数(并且默认为方法的名称),所以你的设置有效地让两个bean相互冲突。单独的渠道必须有单独的名称(我们应该更好地像您一样识别错误情况,而不是无法绑定)。但它们可以配置为相同的“目的地”。 –

0

我有同样的问题,我通过升级我的春季云版本(从Camden SR7到Dalston SR4)来解决它。