Home > front end >  BigQueryIO.write() step in the Dataflow job runs before TextIO.Read() causing Null pointer exception
BigQueryIO.write() step in the Dataflow job runs before TextIO.Read() causing Null pointer exception

Time:12-17

I am trying to read the contents of .tsv file stored in GCS bucket and write each line to respective BigQuery table. While doing this I am getting Null pointer exception , this could be because of BigQueryIO.write() step in the dataflow job is starting before Reading the contents of the file using TextIO.read().

stacktrace

Error message from worker: java.lang.NullPointerException: Cannot invoke    gradle_inital_load.TableAndRow.getTab_name()" because "row" is null
    gradle_inital_load.ReadTarDir.getTableName(ReadTarDir.java:318)
    gradle_inital_load.ReadTarDir.lambda$0(ReadTarDir.java:287)
    org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$TableFunctionDestinations.getDestination(DynamicDestinationsHelpers.java:128)
    org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$TableFunctionDestinations.getDestination(DynamicDestinationsHelpers.java:114)
    org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:71)

Dataflow job graph

Below is my code


package gradle_inital_load;

    import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.DirectRunner;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.api.services.bigquery.model.TableRow;
import com.google.common.io.Files;

import org.apache.beam.sdk.annotations.*;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;

  public class ReadTarDir {

     private static final Logger LOG = LoggerFactory.getLogger(ReadTarDir.class);
    static String outputTableProject = "gpc-d-disc";
    static String outputTableDataset = "user_events";

    public static void main(String[] args) {
        // TODO Auto-generated method stub

          DataflowPipelineOptions dfOptions = PipelineOptionsFactory.as(MyOptions.class);


          dfOptions.setProject("gpc-d-disc");
        dfOptions.setStagingLocation("gs://crs_user_events/staging");
           dfOptions.setRegion("us-east4");
        dfOptions.setTempLocation("gs://crs_user_events/temp");
        dfOptions.setServiceAccount("[email protected]");
          dfOptions.setSubnetwork(
                "https://www.googleapis.com/compute/v1/projects/gpc-net-n-spoke-prj/regions/us-east4/subnetworks/n-subnet006-disc-use4");
         dfOptions.setUsePublicIps(false);
        

        

         dfOptions.setRunner(DataflowRunner.class);


         DataflowRunner.fromOptions(dfOptions);


         Pipeline p = Pipeline.create(dfOptions);


        

        
        PCollectionView<String[]> filecontents = (PCollectionView<String[]>) p.apply("Read column headers", TextIO.read()
                .from("gs://crs_user_events/initial_load/column_headers.tsv").withCompression(Compression.AUTO))
                .apply("Create column header Array", ParDo.of(new DoFn<String, String[]>(){
                @ProcessElement
                public void processElement(ProcessContext c)
                {
                String fileLines = c.element().toString();
                String[] Data = fileLines.split("\t");
                c.output(Data);
                }
                })).apply(View.asSingleton());

        PCollection<String> lines = p.apply("Read Files",
                TextIO.read().from("gs://crs_user_events/initial_load/test.tsv.gz").withCompression(Compression.GZIP)).setCoder(NullableCoder.of(StringUtf8Coder.of()));

        p.getCoderRegistry().registerCoderForClass(ReadTarDir.class, TableAndRowCoder.of());

        PCollection<TableAndRow> tablerows = lines
                .apply("Transform File lines into TableAndRow", ParDo.of(new DoFn<String, TableAndRow>() {

                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        int tabnam_idx, indx;
                        TableAndRow tbObj = null;
                        String tabName = null;
                        TableRow row =new TableRow();
                        //TableRow row = new TableRow();
                        String[] columns = c.sideInput(filecontents);
                        String[] arr = c.element().split("\t");

                        if (arr.length > 0) {

                            tabnam_idx = getIndex(columns, "channel");
                            indx = getIndex(columns, "page_event");

                            // ProductDetails
                            if ((arr[tabnam_idx].toString()).contains("productdetails")) {
                                tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
                                        .concat("detail_page_view_events_idl");
                                // tabName = String.format("%s:%s.%s", outputTableProject,
                                // outputTableDataset,"Detail_Page_View_Events");
                                row.set("eventType", "detail-page-view");
                                int index = getIndex(columns, "evar6");
                                if (arr[getIndex(columns, "evar6")] != "") {
                                    row.set("visitorId", arr[getIndex(columns, "evar6")]);
                                } else {
                                    row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
                                }
                                row.set("eventTime", arr[getIndex(columns, "date_time")]);
                                row.set("experimentIds", arr[getIndex(columns, "evar104")]);
                                row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
                                row.set("userInfo.userId", "1");
                                row.set("userInfo.ipAddress", arr[getIndex(columns, "ip")]);
                                row.set("userInfo.userAgent", arr[getIndex(columns, "user_agent")]);
                                row.set("userInfo.directUserRequest", "1");
                                row.set("uri", arr[getIndex(columns, "page_url")]);
                                if (arr[getIndex(columns, "visit_referrer")] == "") {
                                    row.set("referrerUri", "1");
                                } else {
                                    row.set("referrerUri", arr[getIndex(columns, "visit_referrer")]);
                                }
                            }

                            // Homepage
                            if ((arr[tabnam_idx].toString()).contains("homepage1")) {
                                tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
                                        .concat("home_page_view_events_idl");
                                // tabName = String.format("%s:%s.%s", outputTableProject,
                                // outputTableDataset,"Home_Page_View_Events");
                                row.set("eventType", "home-page-view");
                                if (arr[getIndex(columns, "evar6")] != " ") {
                                    row.set("visitorId", arr[getIndex(columns, "evar6")]);
                                } else {
                                    row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
                                }

                            }

                            // Search
                            indx = getIndex(columns, "page_event");
                            if ((arr[tabnam_idx].toString()).contains("search") && arr[indx] == "0") {
                                tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
                                        .concat("search_events_idl");
                                // tabName = String.format("%s:%s.%s", outputTableProject,
                                // outputTableDataset,"Pass Table Name here");
                                /* create row here */
                                row.set("eventType", "search");
                                if (arr[getIndex(columns, "evar6")] != " ") {
                                    row.set("visitorId", arr[getIndex(columns, "evar6")]);
                                } else {
                                    row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
                                }
                                if (arr[getIndex(columns, "evar6")] != " ") {
                                    row.set("searchQuery", arr[getIndex(columns, "evar1")]);
                                } else {
                                    row.set("searchQuery", arr[getIndex(columns, "evar2")]);
                                }
                                row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);

                            }

                            // Browse
                            indx = getIndex(columns, "page_event");
                            if ((arr[tabnam_idx].toString()).contains("category-landing") && arr[indx] == "0") {
                                tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
                                        .concat("category_page_view_events_idl");
                                /* create row here */
                                row.set("eventType", "category-page-view");
                                if (arr[getIndex(columns, "evar6")] != " ") {
                                    row.set("visitorId", arr[getIndex(columns, "evar6")]);
                                } else {
                                    row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
                                }
                                row.set("pageCategories", arr[getIndex(columns, "evar104")]);
                            }

                            // add-to-cart
                            if (arr[getIndex(columns, "product_list")] != null && arr[indx] == "12") {
                                tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
                                        .concat("add_to_cart_events_idl");
                                /* create row here */
                                row.set("eventType", "add-to-cart");
                                if (arr[getIndex(columns, "evar6")] != " ") {
                                    row.set("visitorId", arr[getIndex(columns, "evar6")]);
                                } else {
                                    row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
                                }
                                row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
                            }

                            // purchase complete
                            indx = getIndex(columns, "page_event");
                            if (arr[getIndex(columns, "product_list")] != null && arr[indx] == "1") {
                                tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
                                        .concat("purchase_complete_events_idl");

                                /* create row here */
                                row.set("eventType", "home-page-view");
                                if (arr[getIndex(columns, "evar6")] != " ") {
                                    row.set("visitorId", arr[getIndex(columns, "evar6")]);
                                } else {
                                    row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
                                }
                                row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
                                row.set("productDetails.product.quantity", arr[getIndex(columns, "product_list")]);
                                row.set("purchaseTransaction.revenue", arr[getIndex(columns, "product_list")]);
                                row.set("purchaseTransaction.currencyCode", arr[getIndex(columns, "product_list")]);
                            }

                        }

                        LOG.info("Row:"   row.toString());
                        if(row!=null && tabName!=null)
                        {
                            tbObj = new TableAndRow(row, tabName);
                        }
                        
                        c.output(tbObj);

                    }
                }).withSideInputs(filecontents)).setCoder(NullableCoder.of(TableAndRowCoder.of()));

        tablerows.apply("Write to BigQuery",
                BigQueryIO.<TableAndRow>write().to(line -> getTableName(line))
                        .withFormatFunction((TableAndRow line) -> convertToTableRow(line))
                        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(WriteDisposition.WRITE_APPEND));

        p.run().waitUntilFinish();
        System.out.println("Pipeline Executed");
    }

    private static TableRow convertToTableRow(TableAndRow line) {
        // TODO Auto-generated method stub
        TableRow row = line.getRow();
        return row;
    }

    public static int getIndex(String[] Data, String str) {
        int index = -1;
        for (int j = 0; j < Data.length; j  ) {
            if (Data[j].contains(str)) {
                index = j;
                break;
            }
        }

        return index;
    }

    public static TableDestination getTableName(ValueInSingleWindow<TableAndRow> line) {

        TableDestination destination = null;
        TableAndRow row = line.getValue();
       if(row.getTab_name()!=null)
       {
           destination = new TableDestination(row.getTab_name(), null);  
       }
        

        return destination;
    }
}

`


Can somebody please help as i am new to Dataflow Apache Beam programming.

The file contents should be read first and each line from the file must be converted into table row and return to the BigQuery table. Table name is also determined from the contents of each line from the file.

CodePudding user response:

Before to be executed your Dataflow job needs to be instantiated.

I think you have a problem in the code part where you calculating the TableDestination. Your row object seems to be null.

In this case your Dataflow job cannot be instantiated and will throw and display the NullPointerException in the log proposed in Dataflow GUI and in Cloud Logging.

Please try to understand why you have a NullPointerException in this code part.

You can also launch your job with Direct runner to debug your code locally.

CodePudding user response:

You are getting a NullPointerException because your Transform File lines into TableAndRow DoFn is emitting null objects.

You should probably change the lines at

if (row!=null && tabName!=null) {
    tbObj = new TableAndRow(row, tabName);
}
                        
c.output(tbObj);

to

if (row!=null && tabName!=null) {
    c.output(new TableAndRow(row, tabName));
}

If you do that you won't have to setCoder(NullableCoder.of(TableAndRowCoder.of())) either.

  • Related