2016-10-04 75 views
1

现在我正在使用Apache Kafka并且有任务: 我们在目录中有一些csv文件,它是一个小批量文件,每个文件大约为25-30 mb。所有我需要的 - 解析文件并将其放到kafka。我如何关闭Apache Kafka连接器任务?

正如我所看到的,卡夫卡有一些有趣的事情,如连接器。

我可以创建Source-Connector和SourceTask,但我不明白一件事: 当我处理文件时,我如何停止或删除我的任务?

,比如我有虚拟连接:

public class DummySourceConnector extends SourceConnector { 
private static final Logger logger = LogManager.getLogger(); 

@Override 
public String version() { 
    logger.info("version"); 

    return "1"; 
} 

@Override 
public ConfigDef config() { 
    logger.info("config"); 

    return null; 
} 

@Override 
public Class<? extends Task> taskClass() { 
    return DummySourceTask.class; 
} 

@Override 
public void start(Map<String, String> props) { 
    logger.info("start {}", props); 
} 

@Override 
public void stop() { 
    logger.info("stop"); 
} 

@Override 
public List<Map<String, String>> taskConfigs(int maxTasks) { 
    logger.info("taskConfigs {}", maxTasks); 

    return ImmutableList.of(ImmutableMap.of("key", "value")); 
} 

和任务:

public class DummySourceTask extends SourceTask { 
private static final Logger logger = LogManager.getLogger(); 

private long offset = 0; 

@Override 
public String version() { 
    logger.info("version"); 

    return "1"; 
} 

@Override 
public void start(Map<String, String> props) { 
    logger.info("start {}", props); 
} 


@Override 
public List<SourceRecord> poll() throws InterruptedException { 
    Thread.sleep(3000); 

    final String value = "Offset " + offset++ + " Timestamp " + Instant.now().toString(); 

    logger.info("poll value {}", value); 

    return ImmutableList.of(new SourceRecord(
      ImmutableMap.of("partition", 0), 
      ImmutableMap.of("offset", offset), 
      "topic-dummy", 
      SchemaBuilder.STRING_SCHEMA, 
      value 
    )); 
} 

public void stop() { 
    logger.info("stop"); 
} 

但我怎么能收我的任务时,这一切都做了什么? 或者,也许你可以帮助我完成此任务的另一个想法。

谢谢你的帮助!

回答

1

首先,我鼓励你看看现有的连接器here。我觉得spooldir连接器会对你有所帮助。甚至可能只是下载并安装它,而不必编写任何代码。

其次,如果我理解正确,你想停止一项任务。我相信this discussion是你想要的。

+0

您好! Thanx为您提供帮助! 这不完全是我想要的,但spooldir连接器很有趣。 不,我想在我想要的时候停止我的任务,让我们来想象一下情况 - 我的任务一行一行地读取文件,当我们在文件的末尾 - 我们不能停止任务。 'stop'方法只能被连接器调用(例如当它重新平衡时)。 – aarexer

+0

啊是的,你想根据某个事件从任务本身中停止任务。我不太确定,因为我们通常不希望任务能够停止或启动自己,因为根据定义,任务协调是连接器的工作。也许你可以将任务提供给下一个文件? – dawsaw

+0

是的,我知道,任务协调是连接器的工作。 可能是喂新文件是好决定... Thanx,这是很好的答案! – aarexer