2016-02-29 327 views
1

我使用kafka perl api编写了kafka制作者,但是当我传递变量来发送消息,而如果我硬编码的消息数据它没有给予任何错误perl Kafka :: Producer,“Kafka :: Exception :: Producer”,“code”,-1000,“message”,“无效的参数

Perl程序,在这里我补充卡夫卡生产代码:

try { 
    $kafka_connection = Kafka::Connection->new(host => $hadoop_server, port => '6667'); 
    $producer = Kafka::Producer->new(Connection => $kafka_connection); 
    my $topic = 'test1'; 
    my $partition = 0; 
    my $message = $hadoop_str; 
    my $response = $producer->send(
     $topic,    # topic 
     $partition,     # partition 
     #"56b4b2b23c24c3608376d1ea,/obj/i386/ui/lib/access/daemon_map.So.gcda,1,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0"   # message 
     $hadoop_str 
     #"t1,f9,1,1,1" 
    ); 
} catch { 
    my $error = $_; 
    if (blessed($error) && $error->isa('Kafka::Exception')) { 
     warn 'Error: (', $error->code, ') ', $error->message, "\n"; 
     exit; 
    } else { 
     die $error; 
    } 
}; 
#    CCLib::run_system_cmd($cmd); 
} 

错误日志:

-bash-3.2$ ./stream_binary_hadoop.pl 
print (...) interpreted as function at ./stream_binary_hadoop.pl line 429. 
<UNKNOWN> <UNKNOWN> Invalid argument: message = 56b4b2b23c24c3608376d1ea,/obj/i386/junos/usr.sbin/lmpd/lmpd_repl_msg_idr.gcda,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 at /opt/adp/projects/code_coverage/perl//5.10/lib/site_perl/5.10.1/Exception/Class/Base.pm line 85. 
Exception::Class::Base::throw("Kafka::Exception::Producer", "code", -1000, "message", "Invalid argument: message = 56b4b2b23c24c3608376d1ea,/obj/i38"...) called at /opt/adp/projects/code_coverage/perl//5.10/lib/site_perl/5.10.1/Kafka/Producer.pm line 374 
Kafka::Producer::_error(Kafka::Producer=HASH(0x36955f8), -1000, "message = 56b4b2b23c24c3608376d1ea,/obj/i386/junos/usr.sbin/l"...) called at /opt/adp/projects/code_coverage/perl//5.10/lib/site_perl/5.10.1/Kafka/Producer.pm line 331 
Kafka::Producer::send(Kafka::Producer=HASH(0x36955f8), "test1", 0, "56b4b2b23c24c3608376d1ea,/obj/i386/junos/usr.sbin/lmpd/lmpd_r"...) called at ./stream_binary_hadoop.pl line 175 
main::try {...}() called at /opt/adp/projects/code_coverage/perl//5.10/lib/site_perl/5.10.1/Try/Tiny.pm line 81 
eval {...} called at /opt/adp/projects/code_coverage/perl//5.10/lib/site_perl/5.10.1/Try/Tiny.pm line 72 
Try::Tiny::try(CODE(0x3692888), Try::Tiny::Catch=REF(0x3692c78)) called at ./stream_binary_hadoop.pl line 190 
main::stream(HASH(0x3692708)) called at ./stream_binary_hadoop.pl line 354 
main::file_split(HASH(0x36927b0)) called at ./stream_binary_hadoop.pl line 413 

at ./stream_binary_hadoop.pl line 188. 
    main::catch {...} ("<UNKNOWN> Invalid argument: message = 56b4b2b23c24c3608376d1e"...) called at /opt/adp/projects/code_coverage/perl//5.10/lib/site_perl/5.10.1/Try/Tiny.pm line 104 
    Try::Tiny::try(CODE(0x3692888), Try::Tiny::Catch=REF(0x3692c78)) called at ./stream_binary_hadoop.pl line 190 
    main::stream(HASH(0x3692708)) called at ./stream_binary_hadoop.pl line 354 
    main::file_split(HASH(0x36927b0)) called at ./stream_binary_hadoop.pl line 413 

回答

1

我能够通过包装我的东西来解决用UTF8 ::降级。不知怎的,它把我的字符串作为utf-8,并且kafka生产者检查了需要非utf-8字符串。

我的工作代码:

my $arcs_val = join(',', @arc_a); 
     my $hadoop_str = $testid . ',' . $gcda_file_name . ',' . $arcs_val; 
     utf8::downgrade($hadoop_str); 
     try { 
       my $topic = $inputs[0]; 
       my $partition = $inputs[1]; 
       my $response = $producer->send(
        $topic,      # topic 
        $partition,     # partition 
        $hadoop_str 
       ); 
相关问题