我打电话火花提交经过MAXRATE,我有一个室壁运动接收器,以及1秒星火流+室壁运动:接收机MAXRATE违反
spark-submit --conf spark.streaming.receiver.maxRate=10 ....
批次但是一个批次可以大大超过的配合建立MAXRATE。即:我得到300条记录。
我是否缺少任何设置?
我打电话火花提交经过MAXRATE,我有一个室壁运动接收器,以及1秒星火流+室壁运动:接收机MAXRATE违反
spark-submit --conf spark.streaming.receiver.maxRate=10 ....
批次但是一个批次可以大大超过的配合建立MAXRATE。即:我得到300条记录。
我是否缺少任何设置?
这看起来像一个错误给我。从代码中的角度来看,它看起来像Kinesis完全忽略了spark.streaming.receiver.maxRate
配置。
如果你看里面0,你会看到:
val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
此构造结束调用其对于配置了很多默认值的另一个构造:
public KinesisClientLibConfiguration(String applicationName,
String streamName,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) {
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}
您所关心的一个是DEFAULT_MAX_RECORDS
,它不断设置为10,000条记录。您调用KinesisClientLibConfiguration
时调用withMaxRecords
的方法来设置实际的记录数。这应该是一个简单的修复。
但现在看来,Kinesis接收器似乎并不尊重该参数。
供将来参考。
这是一个已知bug固定在Spark 2.2.0
发布
肯定,这是对问题的原因!谢谢 –