I am trying to read the contents of .tsv file stored in GCS bucket and write each line to respective BigQuery table. While doing this I am getting Null pointer exception , this could be because of BigQueryIO.write() step in the dataflow job is starting before Reading the contents of the file using TextIO.read().
stacktrace
Error message from worker: java.lang.NullPointerException: Cannot invoke gradle_inital_load.TableAndRow.getTab_name()" because "row" is null
gradle_inital_load.ReadTarDir.getTableName(ReadTarDir.java:318)
gradle_inital_load.ReadTarDir.lambda$0(ReadTarDir.java:287)
org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$TableFunctionDestinations.getDestination(DynamicDestinationsHelpers.java:128)
org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$TableFunctionDestinations.getDestination(DynamicDestinationsHelpers.java:114)
org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:71)
Below is my code
package gradle_inital_load;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.DirectRunner;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.io.Files;
import org.apache.beam.sdk.annotations.*;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
public class ReadTarDir {
private static final Logger LOG = LoggerFactory.getLogger(ReadTarDir.class);
static String outputTableProject = "gpc-d-disc";
static String outputTableDataset = "user_events";
public static void main(String[] args) {
// TODO Auto-generated method stub
DataflowPipelineOptions dfOptions = PipelineOptionsFactory.as(MyOptions.class);
dfOptions.setProject("gpc-d-disc");
dfOptions.setStagingLocation("gs://crs_user_events/staging");
dfOptions.setRegion("us-east4");
dfOptions.setTempLocation("gs://crs_user_events/temp");
dfOptions.setServiceAccount("[email protected]");
dfOptions.setSubnetwork(
"https://www.googleapis.com/compute/v1/projects/gpc-net-n-spoke-prj/regions/us-east4/subnetworks/n-subnet006-disc-use4");
dfOptions.setUsePublicIps(false);
dfOptions.setRunner(DataflowRunner.class);
DataflowRunner.fromOptions(dfOptions);
Pipeline p = Pipeline.create(dfOptions);
PCollectionView<String[]> filecontents = (PCollectionView<String[]>) p.apply("Read column headers", TextIO.read()
.from("gs://crs_user_events/initial_load/column_headers.tsv").withCompression(Compression.AUTO))
.apply("Create column header Array", ParDo.of(new DoFn<String, String[]>(){
@ProcessElement
public void processElement(ProcessContext c)
{
String fileLines = c.element().toString();
String[] Data = fileLines.split("\t");
c.output(Data);
}
})).apply(View.asSingleton());
PCollection<String> lines = p.apply("Read Files",
TextIO.read().from("gs://crs_user_events/initial_load/test.tsv.gz").withCompression(Compression.GZIP)).setCoder(NullableCoder.of(StringUtf8Coder.of()));
p.getCoderRegistry().registerCoderForClass(ReadTarDir.class, TableAndRowCoder.of());
PCollection<TableAndRow> tablerows = lines
.apply("Transform File lines into TableAndRow", ParDo.of(new DoFn<String, TableAndRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
int tabnam_idx, indx;
TableAndRow tbObj = null;
String tabName = null;
TableRow row =new TableRow();
//TableRow row = new TableRow();
String[] columns = c.sideInput(filecontents);
String[] arr = c.element().split("\t");
if (arr.length > 0) {
tabnam_idx = getIndex(columns, "channel");
indx = getIndex(columns, "page_event");
// ProductDetails
if ((arr[tabnam_idx].toString()).contains("productdetails")) {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("detail_page_view_events_idl");
// tabName = String.format("%s:%s.%s", outputTableProject,
// outputTableDataset,"Detail_Page_View_Events");
row.set("eventType", "detail-page-view");
int index = getIndex(columns, "evar6");
if (arr[getIndex(columns, "evar6")] != "") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
row.set("eventTime", arr[getIndex(columns, "date_time")]);
row.set("experimentIds", arr[getIndex(columns, "evar104")]);
row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
row.set("userInfo.userId", "1");
row.set("userInfo.ipAddress", arr[getIndex(columns, "ip")]);
row.set("userInfo.userAgent", arr[getIndex(columns, "user_agent")]);
row.set("userInfo.directUserRequest", "1");
row.set("uri", arr[getIndex(columns, "page_url")]);
if (arr[getIndex(columns, "visit_referrer")] == "") {
row.set("referrerUri", "1");
} else {
row.set("referrerUri", arr[getIndex(columns, "visit_referrer")]);
}
}
// Homepage
if ((arr[tabnam_idx].toString()).contains("homepage1")) {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("home_page_view_events_idl");
// tabName = String.format("%s:%s.%s", outputTableProject,
// outputTableDataset,"Home_Page_View_Events");
row.set("eventType", "home-page-view");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
}
// Search
indx = getIndex(columns, "page_event");
if ((arr[tabnam_idx].toString()).contains("search") && arr[indx] == "0") {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("search_events_idl");
// tabName = String.format("%s:%s.%s", outputTableProject,
// outputTableDataset,"Pass Table Name here");
/* create row here */
row.set("eventType", "search");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("searchQuery", arr[getIndex(columns, "evar1")]);
} else {
row.set("searchQuery", arr[getIndex(columns, "evar2")]);
}
row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
}
// Browse
indx = getIndex(columns, "page_event");
if ((arr[tabnam_idx].toString()).contains("category-landing") && arr[indx] == "0") {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("category_page_view_events_idl");
/* create row here */
row.set("eventType", "category-page-view");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
row.set("pageCategories", arr[getIndex(columns, "evar104")]);
}
// add-to-cart
if (arr[getIndex(columns, "product_list")] != null && arr[indx] == "12") {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("add_to_cart_events_idl");
/* create row here */
row.set("eventType", "add-to-cart");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
}
// purchase complete
indx = getIndex(columns, "page_event");
if (arr[getIndex(columns, "product_list")] != null && arr[indx] == "1") {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("purchase_complete_events_idl");
/* create row here */
row.set("eventType", "home-page-view");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
row.set("productDetails.product.quantity", arr[getIndex(columns, "product_list")]);
row.set("purchaseTransaction.revenue", arr[getIndex(columns, "product_list")]);
row.set("purchaseTransaction.currencyCode", arr[getIndex(columns, "product_list")]);
}
}
LOG.info("Row:" row.toString());
if(row!=null && tabName!=null)
{
tbObj = new TableAndRow(row, tabName);
}
c.output(tbObj);
}
}).withSideInputs(filecontents)).setCoder(NullableCoder.of(TableAndRowCoder.of()));
tablerows.apply("Write to BigQuery",
BigQueryIO.<TableAndRow>write().to(line -> getTableName(line))
.withFormatFunction((TableAndRow line) -> convertToTableRow(line))
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
p.run().waitUntilFinish();
System.out.println("Pipeline Executed");
}
private static TableRow convertToTableRow(TableAndRow line) {
// TODO Auto-generated method stub
TableRow row = line.getRow();
return row;
}
public static int getIndex(String[] Data, String str) {
int index = -1;
for (int j = 0; j < Data.length; j ) {
if (Data[j].contains(str)) {
index = j;
break;
}
}
return index;
}
public static TableDestination getTableName(ValueInSingleWindow<TableAndRow> line) {
TableDestination destination = null;
TableAndRow row = line.getValue();
if(row.getTab_name()!=null)
{
destination = new TableDestination(row.getTab_name(), null);
}
return destination;
}
}
`
Can somebody please help as i am new to Dataflow Apache Beam programming.
The file contents should be read first and each line from the file must be converted into table row and return to the BigQuery table. Table name is also determined from the contents of each line from the file.
CodePudding user response:
Before to be executed your Dataflow
job needs to be instantiated.
I think you have a problem in the code part where you calculating the TableDestination
. Your row object seems to be null
.
In this case your Dataflow
job cannot be instantiated and will throw and display the NullPointerException
in the log proposed in Dataflow
GUI and in Cloud Logging
.
Please try to understand why you have a NullPointerException
in this code part.
You can also launch your job with Direct runner
to debug your code locally.
CodePudding user response:
You are getting a NullPointerException
because your Transform File lines into TableAndRow
DoFn is emitting null
objects.
You should probably change the lines at
if (row!=null && tabName!=null) {
tbObj = new TableAndRow(row, tabName);
}
c.output(tbObj);
to
if (row!=null && tabName!=null) {
c.output(new TableAndRow(row, tabName));
}
If you do that you won't have to setCoder(NullableCoder.of(TableAndRowCoder.of()))
either.