Home > OS >  Access In-Memory Spark Dataframe from different nodes
Access In-Memory Spark Dataframe from different nodes

Time:08-21

I am prototyping a Spark based data ingestion system. Essentially I need spark to watch a datalake directory and as data comes in, add this data to an in-memory dataframe. I understand that memory is meant for debuging purposes, but since this is a prototype I am trying to get this working in memory first before the more standard kafka.

Here is my first python script that is supposed to getOrCreate a SparkSession, read from the datalake, then write to a data table located in memory:

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

APP_NAME = "spark.data_processing_engine"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName(APP_NAME) \
    .getOrCreate()
    
spark.sparkContext.broadcast([1])

schemaSnmp = StructType([
    StructField("node-hostname", StringType(), True),
    StructField("event-time", TimestampType(), True),
    StructField("oid", StringType(), True),
    StructField("value", StringType(), True)
])

df = spark.readStream.format("json") \
    .option("sourceArchiveDir", "/tmp/datalake-archive") \
    .option("cleanSource", "archive") \
    .schema(schemaSnmp) \
    .load("/var/datalake/snmp-get")

result = df.writeStream.queryName("snmpget").format("memory").start()
result.awaitTermination();

This appears to be running just fine. I see my data get archived and the logs. Here is a small sample:

$ spark-submit spark-start-stream-for-snmpget.py

22/08/18 11:18:33 WARN Utils: Your hostname, rhel8.localdomain resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0)
22/08/18 11:18:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/08/18 11:18:35 INFO SparkContext: Running Spark version 3.3.0
22/08/18 11:18:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/18 11:18:35 INFO ResourceUtils: ==============================================================
22/08/18 11:18:35 INFO ResourceUtils: No custom resources configured for spark.driver.
22/08/18 11:18:35 INFO ResourceUtils: ==============================================================
22/08/18 11:18:35 INFO SparkContext: Submitted application: spark.data_processing_engine
22/08/18 11:18:35 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
22/08/18 11:18:35 INFO ResourceProfile: Limiting resource is cpu
22/08/18 11:18:35 INFO ResourceProfileManager: Added ResourceProfile id: 0
22/08/18 11:18:35 INFO SecurityManager: Changing view acls to: root
22/08/18 11:18:35 INFO SecurityManager: Changing modify acls to: root
22/08/18 11:18:35 INFO SecurityManager: Changing view acls groups to:
22/08/18 11:18:35 INFO SecurityManager: Changing modify acls groups to:
22/08/18 11:18:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
22/08/18 11:18:36 INFO Utils: Successfully started service 'sparkDriver' on port 42657.
22/08/18 11:18:36 INFO SparkEnv: Registering MapOutputTracker
22/08/18 11:18:36 INFO SparkEnv: Registering BlockManagerMaster
22/08/18 11:18:36 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
22/08/18 11:18:36 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
22/08/18 11:18:36 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
22/08/18 11:18:36 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-364e57a6-0f38-43ea-abcc-428e0ca8684f
22/08/18 11:18:36 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB
22/08/18 11:18:36 INFO SparkEnv: Registering OutputCommitCoordinator

...

22/08/18 11:17:55 INFO InMemoryFileIndex: It took 5 ms to list leaf files for 1 paths.
22/08/18 11:17:55 INFO BlockManagerInfo: Removed broadcast_224_piece0 on 10.0.2.15:36859 in memory (size: 34.0 KiB, free: 434.3 MiB)

In a separate process, I fire up pyspark and try to capture this data, but I cannot:

In [1]: APP_NAME = "spark.data_processing_engine"
    ...:
    ...: spark = SparkSession \
    ...: ^I.builder \
    ...: ^I.master("local[*]") \
    ...: ^I.appName(APP_NAME) \
    ...: ^I.getOrCreate()


In [2]: spark.sql("SELECT * FROM snmpget ORDER BY `event-time` DESC").count()
AnalysisException: Table or view not found: snmpget; line 1 pos 14;
'Sort ['event-time DESC NULLS LAST], true
 - 'Project [*]
    - 'UnresolvedRelation [snmpget], [], false

I tried the following already

  • Use the same app name
  • set spark.sparkContext.broadcast([1])
  • scope variables in the pyspark instance
  • instead of using pyspark I created a script to run:
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

APP_NAME = "spark.data_processing_engine"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName(APP_NAME) \
    .getOrCreate()
    
    
spark.sql("SELECT * FROM snmpget ORDER BY `event-time` DESC").count()

But I get the same error this way too. I was hoping the getOrCreate method would have been able to re-cycle the SparkSession object and allow other threads to access the data. Obviously there is more to it than that.

Essentially, I need to fire up a spark process to read from the datalake and then fire up other jobs that can read from this data.

CodePudding user response:

Okay I think I figured out the way to get this data coordinated. Maybe there is a better way of doing this, but for now it fits the bill.

First I create a long running spark-submit task that continually reads from a datalake and outputs to a parquet database:

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

APP_NAME = "spark.data_processing_engine"
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName(APP_NAME) \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")

schemaSnmp = StructType([
    StructField("node-hostname", StringType(), True),
    StructField("event-time", TimestampType(), True),
    StructField("oid", StringType(), True),
    StructField("value", StringType(), True)
])

result = spark.readStream \
    .option("sourceArchiveDir", "/var/datalake-archive") \
    .option("cleanSource", "archive") \
    .schema(schemaSnmp) \
    .json("/var/datalake/snmp-get") \
    .writeStream \
    .queryName("snmpget") \
    .format("parquet") \
    .option("checkpointLocation", "/var/spark-map/snmp-get") \
    .option("path", "/var/spark-map/snmp-get") \
    .start()

result.awaitTermination()
result.stop()

After this is running, I fire up a new spark job that looks like this:

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time 
import datetime 
import logging


APP_NAME = "spark.data_processing_engine"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName(APP_NAME) \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")

schema = StructType([
    StructField("node-hostname", StringType(), True),
    StructField("event-time", TimestampType(), True),
    StructField("oid", StringType(), True),
    StructField("value", StringType(), True)
])

results = spark.readStream \
        .schema(schema) \
        .parquet("/var/spark-map/snmp-get/") \
        .writeStream \
        .queryName("snmpget") \
        .format("memory") \
        .start()

i = 0
while i < 60:
    x = spark.table("snmpget").select("value", "oid", "`node-hostname`", "`event-time`").orderBy("`event-time`", ascending=False).head()
    if (x == None):
        print("Data may still be loading, give this approx 30 seconds")
    else: 
        print(f"x = {x}")
    i = i 1
    time.sleep(1)

results.stop()

That second script can take a few moment to load (so it will print None at first) but then after this, the data comes in! This works along multiple spark jobs, each reading from that same parquet database.

Ideally all this would just exist in memory for demo purposes, but this works nonetheless. If I want more speed, I could always look into making the referenced directories into ram-disks.

If anyone out there knows how to tie this together with only in-memory datastore (i.e., no parquet step) I'd be interested in knowing.

  • Related