Home > Back-end >  How do I split a file of json elements in Apache Beam
How do I split a file of json elements in Apache Beam

Time:02-23

I'm using Apache beam with Python and I have a ppl file which looks something like this:
FileExample.ppl:

{"name":"Julio"} 
{"name":"Angel", "Age":35} 
{"name":"Maria","cellphone":NULL} 
{"name":NULL,"cellphone":"3451-14-12"} 

etc...

I need to split the file not for each line but for each json (in the real file the jsons are not only of one line but multiple and undefined amount of lines).

And then I need to validate the content of each json (because in the file there are 6 types of jsons, the ones that have all the keys with a value, the ones that don't, etc.). After that I need different pcollection for each type of json. I'm thinking about using beam.flatmap() to achieve this last step but first I need to have something like this:

jsons = pipeline | "splitElements"  >> ReadFromText(file)

Thank you in advance, Keep in mind that I am new to this.

CodePudding user response:

ReadFromText always reads text files one line at a time; if your json objects are split across lines you'll have to do a different kind of read.

One options is to read each file in its entirety in a DoFn, e.g.

with beam.Pipeline() as p:
    readable_files = (
        p
        | beam.Create([...set of files to read...]). # or use fileio.MatchAll
        | fileio.ReadMatches)
    file_contents = paths | beam.ParDo(ReadFileDoFn())

where ReadFileDoFn could use the same underlying libraries that ReadFromText does, e.g.

class ReadFileDoFn(beam.DoFn):
  def process(self, readable_file):
    with readable_file.open() as handle:
      yield handle.read()

This will result in a PCollection whose elements are the entire contents of each file. Now to split up your text file into individual json objects, you can do something like

def text_blob_to_json_objects(text):
  # Turns a concatenated set of objects like '{...} {...}' into
  # a single json array '[{...}, {...}]'.
  as_json_array = '[%s]' % re.sub(r'}\S*{', '},{', text, re.M)
  # Returns the parsed array.
  return json.loads(as_json_array)

file_contents | beam.FlatMap(text_blob_to_json_objects)

You can then follow this with a multi-output DoFn to separate out the various types.

  • Related