Home > front end >  When using for loop, to write looping PTransform in Apache Beam by Python SDK. All loops use the las
When using for loop, to write looping PTransform in Apache Beam by Python SDK. All loops use the las

Time:10-01

The event of the title occurred. The example code is below. Is this a bug, or is it written incorrectly?

import apache_beam as beam


def main():
    with beam.Pipeline() as p:
        p_f = (
                p
                | beam.Create([
            1, 2, 3, 4, 5
        ]))
        for i in range(3):
            p_f = (p_f
                   | f"{i}" >> beam.Map(lambda x: x   i)
                   )

        p_f | beam.Map(print)


main()

In the example code, I expect [4, 5, 6, 7, 8] because adding 0 1 2=3.

But I got [7, 8, 9, 10]. All elements are added to 6. That is looks like only using the last value in for-loop. (2 is last value. loop times is 3.)

Thanks.

CodePudding user response:

Sorry I made a small mistake.

But I got [7, 8, 9, 10].

[7, 8, 9, 10, 11] is correct.

CodePudding user response:

I propose your code with some updates :

def main():
    with beam.Pipeline() as p:
            p_f = (
                    p
                    | beam.Create([
                1, 2, 3, 4, 5
            ]))

            p_f = (p_f
                   | "Map" >> beam.Map(self.add_ranges_to_value))

            p_f | beam.Map(print)

    def add_ranges_to_value(self, value:int) -> int:
        for i in range(3):
            value = value   i

        return value

I moved the for loop and incrementation logic inside the pure transformation in the Beam Map operation.

The result matches the expected : [4, 5, 6, 7, 8]

  • Related