Home > Back-end >  How do I make a useful side input I can access for Apache Beam in Python? AsDict object not subscrip
How do I make a useful side input I can access for Apache Beam in Python? AsDict object not subscrip

Time:11-24

Good news, I'm using batch, not streaming. I still don't understand what is going on. Apparently pvalue.AsDict() doesn't actually give you a dict value? Just a wrapper around a PCollection. How the heck do I get it to be a dict so I can use it?

This code fails on the init method, right when I try to access operations as if it were... a dictionary.

Error is

File "[path]", line 40, in process
    location_indexes = [x[0] for x in self.operations["lookup_location_id"]]
TypeError: 'AsDict' object is not subscriptable [while running 'ParDo(Locations)']

Here's the culprit

class Locations(beam.DoFn):  # Location_ID
    def __init__(self, operations: dict):
        self.locations: list = operations["lookup_location_id"]

    def process(self, element: str):
        # I have code here, not relevant

and this is where I call locations...

locations = (
            csv_data
            | beam.ParDo(Locations(operations=beam.pvalue.AsDict(operations)))
            | "Dedup locations" >> beam.Distinct()
        )

operations is a pcollection of tuples. Here's the pipeline:

operations = (
            transforms
            | ParDo(Semantics(headers))
            | GroupByKey()

headers is in fact, a plain ol' list. So that works great as a SideInput. Semantics yields me some tuples.

class Semantics(beam.DoFn):
    def __init__(self, headers: list):
        self.headers = headers

    def process(self, element: list):
        key = element[0]
        value: list = [self.headers.index(element[2]), element[3]]
        yield key, value

I also peeked at the AsDict operations object in the debugger. It was a mess and I have no idea how I'm supposed to pull real values from it. Can anyone help?

CodePudding user response:

The __init__ part of the DoFn class is ran at creation time, so you cannot fetch the data generated from the side input.

What you need to do is to pass the side input view to the process method. Using my own example:

class ChangeCurrency(beam.DoFn):    
    def process(self, value, ratios):
        current = value["currency"]
        exchanged = {"Original": current}
        for key in ratios[current]:
            exchanged[key] = value["amount"] * ratios[current][key]
        return [exchanged]  


{..}
pipeline | ParDo(ChangeCurrency(), ratios=beam.pvalue.AsDict(rates_pc))

Just FYI, you don't need a class for this, you can use a simple function:

def change_currency(value, ratios):
    current = value["currency"]
    exchanged = {"Original": current}
    for key in ratios[current]:
        exchanged[key] = value["amount"] * ratios[current][key]
    return [exchanged]

{..}

pipeline | ParDo(change_currency, ratios=beam.pvalue.AsDict(rates_pc))
  • Related