Home > Net >  Read CSV to a class Dataflow Java from GCS
Read CSV to a class Dataflow Java from GCS

Time:08-24

I'm a newby on Java Dataflow and I want to read a CSV file from GCS and have in output a PCollection.

How can I do it efficiently?

Thanks a lot for your help.

CodePudding user response:

Ideally you would have to convert that CSV file to something else. If you can share more details on what you are trying to do with that PCollection, we could provide more insight.

Here is a very generic example that might help. Suppose that you have a CSV gs://sample-bucket-name/teams.csv:

id,name,score
1,Team A,35
2,Team B,40

You could create a pipeline and a component to convert those lines to a POJO in which Beam knows the schema, for example:

  @DefaultSchema(JavaFieldSchema.class)
  public class Team implements Serializable {
    int id;
    String name;
    int score;

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }

      return o != null
          && o instanceof Team
          && id == ((Team) o).id;
    }
  }

A ParDo function could look like this (although you can use any tools you'd like to read and parse CSV, instead of using String.split):

public class TeamCSVtoBeanFunction extends DoFn<String, Team> {
    @ProcessElement
    public void processElement(@Element String line, OutputReceiver<Team> outputReceiver) {

      // ignore header or empty lines
      if (line.equals("id,name,score") || line.isEmpty()) {
        return;
      }
      String[] split = line.split(",");

      Team team = new Team();
      team.id = Integer.parseInt(split[0]);
      team.name = split[1];
      team.score = Integer.parseInt(split[2]);
      outputReceiver.output(team);
    }
  }

Finally, the pipeline, which uses TextIO to read the text file into a PCollection of Team. Ideally, you would want to use other IOs to fulfill your needs, and write your results somewhere.

    PCollection<Team> teams = pipeline
        .apply("ReadCSV", TextIO.read().from("gs://sample-bucket-name/teams.csv"))
        .apply("ConvertToBean", ParDo.of(new TeamCSVtoBeanFunction()));
  • Related