The event of the title occurred. The example code is below. Is this a bug, or is it written incorrectly?
import apache_beam as beam
def main():
with beam.Pipeline() as p:
p_f = (
p
| beam.Create([
1, 2, 3, 4, 5
]))
for i in range(3):
p_f = (p_f
| f"{i}" >> beam.Map(lambda x: x i)
)
p_f | beam.Map(print)
main()
In the example code, I expect [4, 5, 6, 7, 8] because adding 0 1 2=3.
But I got [7, 8, 9, 10]. All elements are added to 6. That is looks like only using the last value in for-loop. (2 is last value. loop times is 3.)
Thanks.
CodePudding user response:
Sorry I made a small mistake.
But I got [7, 8, 9, 10].
[7, 8, 9, 10, 11] is correct.
CodePudding user response:
I propose your code with some updates :
def main():
with beam.Pipeline() as p:
p_f = (
p
| beam.Create([
1, 2, 3, 4, 5
]))
p_f = (p_f
| "Map" >> beam.Map(self.add_ranges_to_value))
p_f | beam.Map(print)
def add_ranges_to_value(self, value:int) -> int:
for i in range(3):
value = value i
return value
I moved the for loop
and incrementation logic inside the pure transformation in the Beam
Map
operation.
The result matches the expected : [4, 5, 6, 7, 8]