Home > Software design >  Does Spark configs spark.streaming.receiver.maxRate has any effect in a Kafka Beam pipeline
Does Spark configs spark.streaming.receiver.maxRate has any effect in a Kafka Beam pipeline

Time:03-16

I was wondering if somebody has any experience with rate limiting in Beam KafkaIO component when the runner is a SparkRunner. The versions I am using are:Beam 2.29, Spark 3.2.0 and Kafka client 2.5.0?

I have the Beam parameter maxRecordsPerBatch set to a large number, 100000000. But even when the pipeline stops for 45 minutes, this value is never hit. But when there is a high burst of data above the normal, the Kafka lag increases till it eventually catches up. In the SparkUI I see that parameter batchIntervalMillis=300000 (5 min) is not reached, batches take a maximum of 3 min. It looks like the KafkaIO stops reading at some point, even when the lag is very large. My Kafka parameters --fetchMaxWaitMs=1000 --maxPollRecords=5000 should be able to bring plenty of data. Specially because KafkaIO creates one consumer per partition. In my system there are multiple topics with a total of 992 partitions and my spark.default.parallelism=600. Some partitions have very little data, while others have a large number. Topics are per region and when a region goes down the data is sent through another region/topic. That is when the lag happens.

Does the configuration values for spark.streaming.receiver.maxRate and spark.streaming.receiver.maxRatePerPartition plus spark.streaming.backpressure.enabled play any role at all? For what I have seen, it looks like Beam controls the whole reading from Kafka with the operator KafkaIO. This component creates its own consumers, therefore the rate of the consumer can only be set by using consumer configs which include fetchMaxWaitMs and maxPollRecords. The only way those Spark parameters could have any effect if in the rest of the pipeline after the IO source. But I am not sure.

CodePudding user response:

So I finally figure out how it all works. First I want to state that the Spark configuration values: spark.streaming.receiver.maxRate, spark.streaming.receiver.maxRatePerPartition, spark.streaming.backpressure.enabled do not play a factor in Beam because they only work if you are using the source operators from Spark to read from Kafka. Since Beam has its own operator KafkaIO they do not play a roll.

So Beam has a set of parameters defined in the class SparkPipelineOptions that are used in the SparkRunner to setup reading from Kafka. Those parameters are:

  @Description("Minimum time to spend on read, for each micro-batch.")
  @Default.Long(200)
  Long getMinReadTimeMillis();

  @Description(
  "A value between 0-1 to describe the percentage of a micro-batch dedicated to reading from UnboundedSource.")
  @Default.Double(0.1)
  Double getReadTimePercentage();

Beam create a SourceDStream object that it will pass to spark to use as a source to read from Kafka. In this class the method boundReadDuration returns the result of calculating the larger of two reading values: proportionalDuration and lowerBoundDuration. The first one is calculated by multiplying BatchIntervalMillis from readTimePercentage. The second is just the value in mills from minReadTimeMillis. Below is the code from SourceDStream. The time duration returned from this function will be used to read from Kafka alone the rest of the time will be allocated to the other tasks in the pipeline.

Last but no least the following parameter also control how many records are process during a batch maxRecordsPerBatch. The pipeline would not process more than those records in a single batch.

  private Duration boundReadDuration(double readTimePercentage, long minReadTimeMillis) {
long batchDurationMillis = ssc().graph().batchDuration().milliseconds();
Duration proportionalDuration =
    new Duration(Math.round(batchDurationMillis * readTimePercentage));
Duration lowerBoundDuration = new Duration(minReadTimeMillis);
Duration readDuration =
    proportionalDuration.isLongerThan(lowerBoundDuration)
        ? proportionalDuration
        : lowerBoundDuration;
LOG.info("Read duration set to: "   readDuration);
return readDuration;

}

  • Related