Home > Software design >  How to remove values from file if N/A in beam
How to remove values from file if N/A in beam

Time:04-19

Just looking for advice, if I have a file like below. What function do I use in apache beam to remove any N/A value. I tried filter but it removes the whole row but I just want to remove that 'cell' if its N/A... I first read the file in and then split the rows using a template split row class which is called using pardo now want remove any N/A values...

Example file

Start_loc, Loc_2, loc_3, loc_4, end_loc
Loc 1, loc 2, N/A, loc 3, loc 4
Loc 1, N/A, N/A, N/A, loc 2

Any suggestions?

CodePudding user response:

I tried it out using the Create method to create a PCollection and then applied a ParDo on it to get the desired result. I have assumed that you wanted to replace the N/A string with an empty String. the DoFn reads the string splits it based on the delimiter "," and replaces the "N/A" with "" before adding them to a list. After that it returns the list values separated by the delimiter.

   import apache_beam as beam

class FilterFn(beam.DoFn):
  def __init__(self, delimiter, filter_item ):
    self.delimiter = delimiter
    self.filter_item = filter_item
  def process(self, text):
    a_list = []
    for word in text.split(self.delimiter):
      if( word.strip() == self.filter_item): # Replace the condition and output to your Requirment
        a_list.append("") 
      else:
        a_list.append(word)
    # print(",".join(a_list))
    yield self.delimiter.join(a_list)

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Create Dummy Input' >> beam.Create([ "Loc 1,loc 2,N/A,loc 3,loc 4"])
      | 'Split words and Remove N/A' >> beam.ParDo(FilterFn(',', 'N/A'))
      | beam.Map(print)#Do Further Processing
      
      )

The output String that I'm getting after this ParDo is

Loc 1,loc 2,,loc 3,loc 4

CodePudding user response:

I had tried the below before seeing these answers. It works, I am just aware it might not be the 'right' way of doing it... any feedback on the below?

    class Split(beam.DoFn):
    def process(self, element):
        """
        Splits each row on commas and returns a dictionary representing the
        row
        """
        Start_loc, loc_1, loc_2, loc_3, loc_4 ,End_loc = element.split(",")

        return [{
            'Start_loc': Start_loc,
            'Loc_1': loc_1,
            'Loc_2': loc_2,
            'Loc_3': loc_3,
            'Loc_4': loc_4,
            'End_loc': End_loc
        }]

class CleanFile(beam.DoFn):
    def process(self, element):
        for k, v  in list(element.items()):
                if v == 'N/A':
                        element[k] = None
        return [{
            'Start_loc': element['Start_loc'],
            'loc_1': element['Loc_1'],
            'loc_2': element['Loc_2'],
            'loc_3': element['Loc_3'],
            'loc_4': element['Loc_4'],
            'End_loc': element['End_Loc']
        }]
        
class CombineColumns(beam.DoFn):
    def process(self, element):
        """
        Prepares each row to be written in the csv
        """
        other_loc = ''
        for k, v  in list(element.items()):
                if v != None and k != 'Start_loc' and k != 'End_loc' and other_loc == '':
                        other_loc = '"'   v
                elif v != None and k != 'Start_loc' and k != 'End_loc':
                        other_loc = other_loc   ','   v
        other_loc = other_loc   '"'
        return [{
            'Start_loc': element['Start_loc'],
            'Other_loc': other_loc,
            'End_loc': element['End_loc']
        }]


class WriteToCSV(beam.DoFn):
    def process(self, element):
        """
        Prepares each row to be written in the csv
        """
        result = [
            "{},{},{}".format(
                element['Start_loc'],
                element['Other_loc'],
                element['End_loc']
            )
        ]
        return result

def process_file():
        pipeline_options = PipelineOptions()
        user_options = pipeline_options.view_as(UserOptions)
        tstmp = datetime.now().strftime("%Y%m%d%H")
        input = user_options.input
        output = user_options.output 
                                
        with beam.Pipeline(options=pipeline_options) as p:

            ROWS = p | 'Read from a File' >> beam.io.ReadFromText(input, skip_header_lines=1) | beam.ParDo(Split())

            CLEAN = ROWS | beam.ParDo(CleanFile())
             
            FORMAT = CLEAN |  beam.ParDo(CombineColumns())
             
            FORMAT |beam.ParDo(WriteToCSV()) |beam.io.WriteToText(output, file_name_suffix=".csv",  header='Start_loc,Other_loc,End_loc') 
  • Related