2016-11-29 63 views
2

我打电话火花提交经过MAXRATE,我有一个室壁运动接收器,以及1秒星火流+室壁运动:接收机MAXRATE违反

spark-submit --conf spark.streaming.receiver.maxRate=10 ....

批次但是一个批次可以大大超过的配合建立MAXRATE。即:我得到300条记录。

我是否缺少任何设置?

回答

2

这看起来像一个错误给我。从代码中的角度来看,它看起来像Kinesis完全忽略了spark.streaming.receiver.maxRate配置。

如果你看里面0​​,你会看到:

val kinesisClientLibConfiguration = 
    new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId) 
    .withKinesisEndpoint(endpointUrl) 
    .withInitialPositionInStream(initialPositionInStream) 
    .withTaskBackoffTimeMillis(500) 
    .withRegionName(regionName) 

此构造结束调用其对于配置了很多默认值的另一个构造:

public KinesisClientLibConfiguration(String applicationName, 
     String streamName, 
     AWSCredentialsProvider kinesisCredentialsProvider, 
     AWSCredentialsProvider dynamoDBCredentialsProvider, 
     AWSCredentialsProvider cloudWatchCredentialsProvider, 
     String workerId) { 
    this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, 
      dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, 
      DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, 
      DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, 
      DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, 
      new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), 
      DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, 
      DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null); 
} 

您所关心的一个是DEFAULT_MAX_RECORDS,它不断设置为10,000条记录。您调用KinesisClientLibConfiguration时调用withMaxRecords的方法来设置实际的记录数。这应该是一个简单的修复。

但现在看来,Kinesis接收器似乎并不尊重该参数。

+0

肯定,这是对问题的原因!谢谢 –

2

供将来参考。

这是一个已知bug固定在Spark 2.2.0发布