I'm trying to count the number of elements in a PCollection and store it in a variable which I want to use it for further calculations. Any guidance on how I can do it?
import apache_beam as beam
pipeline = beam.Pipeline()
total_elements = (
pipeline
| 'Create elements' >> beam.Create(['one', 'two', 'three', 'four', 'five', 'six'])
| 'Count all elements' >> beam.combiners.Count.Globally()
| beam.Map(print))
print(total_elements 10)
pipeline.run()
Printing the total_elements works, but my requirement is to add an integer to it. I tried using int(total_elements) and that doesn't work as well.
CodePudding user response:
you can use the beam.pvalue.AsSingleton
transform
import apache_beam as beam
pipeline = beam.Pipeline()
total_elements = (
pipeline
| 'Create elements' >> beam.Create(['one', 'two', 'three', 'four', 'five', 'six'])
| 'Count all elements' >> beam.combiners.Count.Globally()
| beam.pvalue.AsSingleton())
# This will print the total number of elements in the PCollection, plus 10.
print(total_elements 10)
pipeline.run()
Note that the beam.pvalue.AsSingleton
transform will only work if the input PCollection contains exactly one value. If the input PCollection is empty or contains more than one value, this transform will raise an error.
CodePudding user response:
import apache_beam as beam
with beam.Pipeline() as pipeline:
total_elements = (
pipeline
| 'Create plants' >> beam.Create(
['