2015-10-16 85 views
0

我很好奇,我想检查如果快速压缩与java卡夫卡客户端很好地工作。卡夫卡不压缩在快活

为了解决这个问题,我建立了一个小程序。该程序生成1024条消息可读数据。它们的大小是1024字节。我将这些消息发送给树新的主题,并且直接在代理文件系统上检查这些主题的大小后。

您可以通过下面的代码中找到这个程序:

package unit_test.testCompress; 

import java.util.HashMap; 
import java.util.Map; 
import java.util.Random; 
import java.util.concurrent.Future; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.clients.producer.RecordMetadata; 


/** 
* Can be use in order to execute some unit test on compression 
*/ 
public class TestCompress { 

    public static void compress(String type, String version){ 
     Map<String,Object> configs = new HashMap<String,Object>(); 
     configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     configs.put("producer.type", "async"); 
     configs.put("compression.type", type); 
     configs.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
     configs.put("partitioner.class", "com.kafkaproducer.RecordPartitioner"); 
     configs.put("bootstrap.servers", "kafka:9092"); 


     KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(configs); 

     Random r = new Random(15415485); 
     int size = 1024; //1 Ko 
     byte[] buffer = new byte[size]; 
     for(int i = 0; i < size; i++){ 
      buffer[i] = (byte) ('A' + (r.nextInt() % 26)); 
     } 
     buffer[size-1] = 0; 
     //System.out.println(new String(buffer)); 
     for(int i = 0; i < size; i++){ 
      Future<RecordMetadata> result = producer.send(new ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" + type , buffer)); 
     } 

     producer.close(); 
    } 

    public static void main(String[] args) { 

     String version = "v10"; 
     compress("snappy",version); 
     compress("gzip",version); 
     compress("none",version); 

    } 

} 

我编译此代码与本以下Maven POM文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>unit_test</groupId> 
    <artifactId>testCompress</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
    <packaging>jar</packaging> 

    <name>testCompress</name> 
    <url>http://maven.apache.org</url> 

    <properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    </properties> 

    <dependencies> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.8.2.2</version>  
    </dependency> 
    </dependencies> 
</project> 

该程序执行得非常好我的电脑上。

但是,当我直接检查我的卡夫卡经纪人结果它给了以下的输出: enter image description here

我认为这是意味着有上活泼的话题没有压缩(但gzip压缩的作品非常好)。我虽然检查vi文件

我知道卡夫卡8.2.1上的这个问题:https://issues.apache.org/jira/browse/KAFKA-2189 但我在生产者上使用Kafka 8.2.2和在代理上使用卡夫卡8.2.2。我也检查了Snappy的依赖性。我正在使用1.1.1.7

您是否了解如何在Kafak上启用快速压缩?我忘记了一个参数,以便在kafka上实现快速压缩?

回答

0

在Kafka ML交换之后,问题是我的Kafka Broker必须升级到8.2.2版本。它解决了我的问题。