Home > Net >  show the data collected in pyspark
show the data collected in pyspark

Time:06-17

Below code runs without any error where I am trying to read data from a JSON file in my storage account. I would like to know how can I see the Output, the contents here in Databricks notebook. The file is quite long, so I just need to verify the output is what I am looking for? so would like to see first 10 items.. How do we do that?

import re
import json
%pip install azure
import azure
from azure.storage.blob import AppendBlobService

abs = AppendBlobService(account_name="azurestorage", account_key="mykey")
base_path = "resourceId=/SUBSCRIPTIONS/5315MyId/RESOURCEGROUPS/AZURE-DEV/PROVIDERS/MICROSOFT.CONTAINERSERVICE/MANAGEDCLUSTERS/AZURE-DEV/y=2022/m=05/d=23/h=13/m=00/PT1H.json"
pattern = base_path   "/*/*/*/*/m=00/*.json"
filter = glob2re(pattern)
df1 = (
    spark.sparkContext.parallelize(
        [
            blob.name
            for blob in abs.list_blobs("insights-logs-kube-audit", prefix=base_path)
            if re.match(filter, blob.name)
        ]
    )
    .map(
        lambda blob_name: abs.get_blob_to_bytes("insights-logs-kube-audit", blob_name)
        .content.decode("utf-8")
        .splitlines()
    )
    .flatMap(lambda lines: [json.loads(l) for l in lines])
    .collect()
)

CodePudding user response:

collect() :- PySpark RDD/DataFrame collect() is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node. We should use the collect() on smaller dataset usually after filter(), group() e.t.c.

take(num) :-It returns the first num rows as a list of Row.

DataFrame.take(num)

import re
import json
%pip install azure
import azure
from azure.storage.blob import AppendBlobService

abs = AppendBlobService(account_name="azurestorage", account_key="mykey")
base_path = "resourceId=/SUBSCRIPTIONS/5315MyId/RESOURCEGROUPS/AZURE-DEV/PROVIDERS/MICROSOFT.CONTAINERSERVICE/MANAGEDCLUSTERS/AZURE-DEV/y=2022/m=05/d=23/h=13/m=00/PT1H.json"
pattern = base_path   "/*/*/*/*/m=00/*.json"
filter = glob2re(pattern)
df1 = (
    spark.sparkContext.parallelize(
        [
            blob.name
            for blob in abs.list_blobs("insights-logs-kube-audit", prefix=base_path)
            if re.match(filter, blob.name)
        ]
    )
    .map(
        lambda blob_name: abs.get_blob_to_bytes("insights-logs-kube-audit", blob_name)
        .content.decode("utf-8")
        .splitlines()
    )
    .flatMap(lambda lines: [json.loads(l) for l in lines])
    .df1.take(10)
)

Refer - https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.take.html

  • Related