Good news, I'm using batch, not streaming. I still don't understand what is going on. Apparently pvalue.AsDict() doesn't actually give you a dict value? Just a wrapper around a PCollection. How the heck do I get it to be a dict so I can use it?
This code fails on the init method, right when I try to access operations as if it were... a dictionary.
Error is
File "[path]", line 40, in process
location_indexes = [x[0] for x in self.operations["lookup_location_id"]]
TypeError: 'AsDict' object is not subscriptable [while running 'ParDo(Locations)']
Here's the culprit
class Locations(beam.DoFn): # Location_ID
def __init__(self, operations: dict):
self.locations: list = operations["lookup_location_id"]
def process(self, element: str):
# I have code here, not relevant
and this is where I call locations...
locations = (
csv_data
| beam.ParDo(Locations(operations=beam.pvalue.AsDict(operations)))
| "Dedup locations" >> beam.Distinct()
)
operations is a pcollection of tuples. Here's the pipeline:
operations = (
transforms
| ParDo(Semantics(headers))
| GroupByKey()
headers is in fact, a plain ol' list. So that works great as a SideInput. Semantics yields me some tuples.
class Semantics(beam.DoFn):
def __init__(self, headers: list):
self.headers = headers
def process(self, element: list):
key = element[0]
value: list = [self.headers.index(element[2]), element[3]]
yield key, value
I also peeked at the AsDict operations object in the debugger. It was a mess and I have no idea how I'm supposed to pull real values from it. Can anyone help?
CodePudding user response:
The __init__
part of the DoFn class is ran at creation time, so you cannot fetch the data generated from the side input.
What you need to do is to pass the side input view to the process
method. Using my own example:
class ChangeCurrency(beam.DoFn):
def process(self, value, ratios):
current = value["currency"]
exchanged = {"Original": current}
for key in ratios[current]:
exchanged[key] = value["amount"] * ratios[current][key]
return [exchanged]
{..}
pipeline | ParDo(ChangeCurrency(), ratios=beam.pvalue.AsDict(rates_pc))
Just FYI, you don't need a class for this, you can use a simple function:
def change_currency(value, ratios):
current = value["currency"]
exchanged = {"Original": current}
for key in ratios[current]:
exchanged[key] = value["amount"] * ratios[current][key]
return [exchanged]
{..}
pipeline | ParDo(change_currency, ratios=beam.pvalue.AsDict(rates_pc))