Just looking for advice, if I have a file like below. What function do I use in apache beam to remove any N/A value. I tried filter but it removes the whole row but I just want to remove that 'cell' if its N/A... I first read the file in and then split the rows using a template split row class which is called using pardo now want remove any N/A values...
Example file
Start_loc, Loc_2, loc_3, loc_4, end_loc
Loc 1, loc 2, N/A, loc 3, loc 4
Loc 1, N/A, N/A, N/A, loc 2
Any suggestions?
CodePudding user response:
I tried it out using the Create method to create a PCollection and then applied a ParDo on it to get the desired result. I have assumed that you wanted to replace the N/A string with an empty String.
the DoFn reads the string splits it based on the delimiter ","
and replaces the "N/A"
with ""
before adding them to a list. After that it returns the list values separated by the delimiter.
import apache_beam as beam
class FilterFn(beam.DoFn):
def __init__(self, delimiter, filter_item ):
self.delimiter = delimiter
self.filter_item = filter_item
def process(self, text):
a_list = []
for word in text.split(self.delimiter):
if( word.strip() == self.filter_item): # Replace the condition and output to your Requirment
a_list.append("")
else:
a_list.append(word)
# print(",".join(a_list))
yield self.delimiter.join(a_list)
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Create Dummy Input' >> beam.Create([ "Loc 1,loc 2,N/A,loc 3,loc 4"])
| 'Split words and Remove N/A' >> beam.ParDo(FilterFn(',', 'N/A'))
| beam.Map(print)#Do Further Processing
)
The output String that I'm getting after this ParDo is
Loc 1,loc 2,,loc 3,loc 4
CodePudding user response:
I had tried the below before seeing these answers. It works, I am just aware it might not be the 'right' way of doing it... any feedback on the below?
class Split(beam.DoFn):
def process(self, element):
"""
Splits each row on commas and returns a dictionary representing the
row
"""
Start_loc, loc_1, loc_2, loc_3, loc_4 ,End_loc = element.split(",")
return [{
'Start_loc': Start_loc,
'Loc_1': loc_1,
'Loc_2': loc_2,
'Loc_3': loc_3,
'Loc_4': loc_4,
'End_loc': End_loc
}]
class CleanFile(beam.DoFn):
def process(self, element):
for k, v in list(element.items()):
if v == 'N/A':
element[k] = None
return [{
'Start_loc': element['Start_loc'],
'loc_1': element['Loc_1'],
'loc_2': element['Loc_2'],
'loc_3': element['Loc_3'],
'loc_4': element['Loc_4'],
'End_loc': element['End_Loc']
}]
class CombineColumns(beam.DoFn):
def process(self, element):
"""
Prepares each row to be written in the csv
"""
other_loc = ''
for k, v in list(element.items()):
if v != None and k != 'Start_loc' and k != 'End_loc' and other_loc == '':
other_loc = '"' v
elif v != None and k != 'Start_loc' and k != 'End_loc':
other_loc = other_loc ',' v
other_loc = other_loc '"'
return [{
'Start_loc': element['Start_loc'],
'Other_loc': other_loc,
'End_loc': element['End_loc']
}]
class WriteToCSV(beam.DoFn):
def process(self, element):
"""
Prepares each row to be written in the csv
"""
result = [
"{},{},{}".format(
element['Start_loc'],
element['Other_loc'],
element['End_loc']
)
]
return result
def process_file():
pipeline_options = PipelineOptions()
user_options = pipeline_options.view_as(UserOptions)
tstmp = datetime.now().strftime("%Y%m%d%H")
input = user_options.input
output = user_options.output
with beam.Pipeline(options=pipeline_options) as p:
ROWS = p | 'Read from a File' >> beam.io.ReadFromText(input, skip_header_lines=1) | beam.ParDo(Split())
CLEAN = ROWS | beam.ParDo(CleanFile())
FORMAT = CLEAN | beam.ParDo(CombineColumns())
FORMAT |beam.ParDo(WriteToCSV()) |beam.io.WriteToText(output, file_name_suffix=".csv", header='Start_loc,Other_loc,End_loc')