Home > Enterprise >  How to connect to HBase table using spark without scanning entire table and get specific known row k
How to connect to HBase table using spark without scanning entire table and get specific known row k

Time:12-21

I am having a massive number of row keys, need to get data of those row keys without scanning entire table or loading entire table into spark as table is very big

CodePudding user response:

You may try to use gimel library, according to md file filter are pushed down to Hbase.

What is gimel?

    Gimel is a Big Data Abstraction framework built on Apache Spark & other open source connectors in the industry.
    Gimel provides unified Data API to read & write data to various stores.
    Alongside, a unified SQL access pattern for all stores alike.
    The APIs are available in both scala & python (pyspark).

HBase md file

In this md file you will find some examples, maybe they are similar to you use case

    gsql("set gimel.hbase.columns.mapping=personal:name,personal:age")
val df = gsql("select * from udc.Hbase.ClusterName.default.test_emp where rowkey='1-MAC'")
df.show

df.explain
== Physical Plan ==
*(1) Filter isnotnull(rowkey#181)
 - *(1) Scan HBaseRelation(Map(catalog -> {"table":{"namespace":"default", "name":"test_emp", "tableCoder":"PrimitiveType"},
"rowkey":"rowkey",
"columns":{
"rowkey":{"cf":"rowkey", "col":"rowkey", "type":"string", "length":"50"},
"name":{"cf":"personal", "col":"name", "type":"string"},
"age":{"cf":"personal", "col":"age", "type":"string"}
}
}
    ),None) [rowkey#181,name#182,age#183] PushedFilters: [IsNotNull(rowkey), *EqualTo(rowkey,1-MAC)], ReadSchema: struct<rowkey:string,name:string,age:string>

CodePudding user response:

There are many HBase-Spark libraries, but many of them are old and they are not mantained. So I prefer old reliable methods.

Try the following.

I have 'word_counts' HBase table, with 'counts' column family. In my case there is 'counts:count' column which is Long.

I will use newAPIHadoopRDD method and HBase access by hbase-mapreduce library

Let's start (I am using Apache Spark 3.1.3, but it should work for any Spark version)

spark-shell --master yarn --packages org.apache.hbase:hbase-client:2.2.7,org.apache.hbase:hbase-common:2.2.7,org.apache.hbase:hbase-mapreduce:2.2.7

My code is as follows:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.filter.PrefixFilter
import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.client.Scan


// This function converts a Scan object to a string representation 
// that can be passed to HBase as a configuration value
def convertScanToString(scan: Scan) = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBase64String(proto.toByteArray)
}

// Create a new Scan object and set a PrefixFilter 
// to only return rows with keys that start with "s"
val scan = new Scan()
val filter = new PrefixFilter(Bytes.toBytes("s"))
scan.setFilter(filter)

// Create a new HBase configuration and set the input table, 
// zookeeper quorum, and scan
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "word_counts")
conf.set("hbase.zookeeper.quorum", "zookeeper-host")
conf.set(TableInputFormat.SCAN, convertScanToString(scan))

// Create an RDD using the HBase configuration and the TableInputFormat
val rdd = spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

// Convert the RDD to a DataFrame and set the column names
val df = rdd.map(x => {
  val rowKey = Bytes.toString(x._2.getRow)
  val columnValue = Option(x._2.getValue(Bytes.toBytes("counts"), 
       Bytes.toBytes("count"))).map(value => 
       Bytes.toLong(value).toString).getOrElse("")
  (rowKey, columnValue)
}).toDF("row_key", "column_value")
  • Related