Home > OS >  Convert Nested Json String into Spark Dataframe
Convert Nested Json String into Spark Dataframe

Time:01-03

I am currently trying to get a JSON response into a pyspark dataframe. When running this:

crypto_df = spark.read.option("inferSchema", "true").json(sc.parallelize([response.text]))

I am getting a dataframe split into two columns: data and status.

|data|status|
|----|------|
|{"1": {"circulating_supply": 18918568, "cmc_rank": 1, "date_added": "2013-04-28T00:00:00.000Z", "id": 1, "is_active": 1, "is_fiat": 0, "last_updated": "2022-01-03T07:49:00.000Z", "max_supply": 21000000, "name": "Bitcoin", "num_market_pairs": 8970, "platform": null, "quote": {"USD": {"fully_diluted_market_cap": 987234126025.89, "last_updated": "2022-01-03T07:49:00.000Z", "market_cap": 889383616435.3029, "market_cap_dominance": 39.675, "percent_change_1h": -0.05951781, "percent_change_24h": -0.3442455, "percent_change_30d": -0.1390473, "percent_change_60d": -24.29566082, "percent_change_7d": -7.36951384, "percent_change_90d": -4.92043025, "price": 47011.14885837569, "volume_24h": 28717292214.63145, "volume_change_24h": 17.9261}}, "slug": "bitcoin", "symbol": "BTC", "tags": ["mineable", "pow", "sha-256", "store-of-value", "state-channel", "coinbase-ventures-portfolio", "three-arrows-capital-portfolio", "polychain-capital-portfolio", "binance-labs-portfolio", "blockchain-capital-portfolio", "boostvc-portfolio", "cms-holdings-portfolio", "dcg-portfolio", "dragonfly-capital-portfolio", "electric-capital-portfolio", "fabric-ventures-portfolio", "framework-ventures-portfolio", "galaxy-digital-portfolio", "huobi-capital-portfolio", "alameda-research-portfolio", "a16z-portfolio", "1confirmation-portfolio", "winklevoss-capital-portfolio", "usv-portfolio", "placeholder-ventures-portfolio", "pantera-capital-portfolio", "multicoin-capital-portfolio", "paradigm-portfolio"], "total_supply": 18918568}, "52": {"circulating_supply": 47535964473, "cmc_rank": 8, "date_added": "2013-08-04T00:00:00.000Z", "id": 52, "is_active": 1, "is_fiat": 0, "last_updated": "2022-01-03T07:49:00.000Z", "max_supply": 100000000000, "name": "XRP", "num_market_pairs": 672, "platform": null, "quote": {"USD": {"fully_diluted_market_cap": 84323321434.55, "last_updated": "2022-01-03T07:49:00.000Z", "market_cap": 40083904119.58345, "market_cap_dominance": 1.7881, "percent_change_1h": -0.31709136, "percent_change_24h": 0.14142393, "percent_change_30d": 14.35083717, "percent_change_60d": -29.91965929, "percent_change_7d": -8.56249711, "percent_change_90d": -19.95921333, "price": 0.8432332143455455, "volume_24h": 1198632904.6630714, "volume_change_24h": 17.9728}}, "slug": "xrp", "symbol": "XRP", "tags": ["medium-of-exchange", "enterprise-solutions", "binance-chain", "arrington-xrp-capital-portfolio", "galaxy-digital-portfolio", "a16z-portfolio", "pantera-capital-portfolio"], "total_supply": 99989907034}|{"credit_count": 1, "elapsed": 28, "error_code": 0, "error_message": null, "notice": null, "timestamp": "2022-01-03T07:50:21.385Z"}|

I only need the information from the data column split like this:

id circulating_supply cmc_rank date_added etc...
1 18918568 1 2013-04-28T00:00:00.000Z .....

I have tried explode but that only works for arrays, I think I need a similar thing for JSON.

My Schema:

root
 |-- data: struct (nullable = true)
 |    |-- 1: struct (nullable = true)
 |    |    |-- circulating_supply: long (nullable = true)
 |    |    |-- cmc_rank: long (nullable = true)
 |    |    |-- date_added: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_active: long (nullable = true)
 |    |    |-- is_fiat: long (nullable = true)
 |    |    |-- last_updated: string (nullable = true)
 |    |    |-- max_supply: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_market_pairs: long (nullable = true)
 |    |    |-- platform: string (nullable = true)
 |    |    |-- quote: struct (nullable = true)
 |    |    |    |-- USD: struct (nullable = true)
 |    |    |    |    |-- fully_diluted_market_cap: double (nullable = true)
 |    |    |    |    |-- last_updated: string (nullable = true)
 |    |    |    |    |-- market_cap: double (nullable = true)
 |    |    |    |    |-- market_cap_dominance: double (nullable = true)
 |    |    |    |    |-- percent_change_1h: double (nullable = true)
 |    |    |    |    |-- percent_change_24h: double (nullable = true)
 |    |    |    |    |-- percent_change_30d: double (nullable = true)
 |    |    |    |    |-- percent_change_60d: double (nullable = true)
 |    |    |    |    |-- percent_change_7d: double (nullable = true)
 |    |    |    |    |-- percent_change_90d: double (nullable = true)
 |    |    |    |    |-- price: double (nullable = true)
 |    |    |    |    |-- volume_24h: double (nullable = true)
 |    |    |    |    |-- volume_change_24h: double (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- total_supply: long (nullable = true)
 |    |-- 1027: struct (nullable = true)
 |    |    |-- circulating_supply: double (nullable = true)
 |    |    |-- cmc_rank: long (nullable = true)
 |    |    |-- date_added: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_active: long (nullable = true)
 |    |    |-- is_fiat: long (nullable = true)
 |    |    |-- last_updated: string (nullable = true)
 |    |    |-- max_supply: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_market_pairs: long (nullable = true)
 |    |    |-- platform: string (nullable = true)
 |    |    |-- quote: struct (nullable = true)
 |    |    |    |-- USD: struct (nullable = true)
 |    |    |    |    |-- fully_diluted_market_cap: double (nullable = true)
 |    |    |    |    |-- last_updated: string (nullable = true)
 |    |    |    |    |-- market_cap: double (nullable = true)
 |    |    |    |    |-- market_cap_dominance: double (nullable = true)
 |    |    |    |    |-- percent_change_1h: double (nullable = true)
 |    |    |    |    |-- percent_change_24h: double (nullable = true)
 |    |    |    |    |-- percent_change_30d: double (nullable = true)
 |    |    |    |    |-- percent_change_60d: double (nullable = true)
 |    |    |    |    |-- percent_change_7d: double (nullable = true)
 |    |    |    |    |-- percent_change_90d: double (nullable = true)
 |    |    |    |    |-- price: double (nullable = true)
 |    |    |    |    |-- volume_24h: double (nullable = true)
 |    |    |    |    |-- volume_change_24h: double (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- total_supply: double (nullable = true)
 |    |-- 2010: struct (nullable = true)
 |    |    |-- circulating_supply: double (nullable = true)
 |    |    |-- cmc_rank: long (nullable = true)
 |    |    |-- date_added: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_active: long (nullable = true)
 |    |    |-- is_fiat: long (nullable = true)
 |    |    |-- last_updated: string (nullable = true)
 |    |    |-- max_supply: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_market_pairs: long (nullable = true)
 |    |    |-- platform: string (nullable = true)
 |    |    |-- quote: struct (nullable = true)
 |    |    |    |-- USD: struct (nullable = true)
 |    |    |    |    |-- fully_diluted_market_cap: double (nullable = true)
 |    |    |    |    |-- last_updated: string (nullable = true)
 |    |    |    |    |-- market_cap: double (nullable = true)
 |    |    |    |    |-- market_cap_dominance: double (nullable = true)
 |    |    |    |    |-- percent_change_1h: double (nullable = true)
 |    |    |    |    |-- percent_change_24h: double (nullable = true)
 |    |    |    |    |-- percent_change_30d: double (nullable = true)
 |    |    |    |    |-- percent_change_60d: double (nullable = true)
 |    |    |    |    |-- percent_change_7d: double (nullable = true)
 |    |    |    |    |-- percent_change_90d: double (nullable = true)
 |    |    |    |    |-- price: double (nullable = true)
 |    |    |    |    |-- volume_24h: double (nullable = true)
 |    |    |    |    |-- volume_change_24h: double (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- total_supply: double (nullable = true)
 |    |-- 52: struct (nullable = true)
 |    |    |-- circulating_supply: long (nullable = true)
 |    |    |-- cmc_rank: long (nullable = true)
 |    |    |-- date_added: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_active: long (nullable = true)
 |    |    |-- is_fiat: long (nullable = true)
 |    |    |-- last_updated: string (nullable = true)
 |    |    |-- max_supply: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_market_pairs: long (nullable = true)
 |    |    |-- platform: string (nullable = true)
 |    |    |-- quote: struct (nullable = true)
 |    |    |    |-- USD: struct (nullable = true)
 |    |    |    |    |-- fully_diluted_market_cap: double (nullable = true)
 |    |    |    |    |-- last_updated: string (nullable = true)
 |    |    |    |    |-- market_cap: double (nullable = true)
 |    |    |    |    |-- market_cap_dominance: double (nullable = true)
 |    |    |    |    |-- percent_change_1h: double (nullable = true)
 |    |    |    |    |-- percent_change_24h: double (nullable = true)
 |    |    |    |    |-- percent_change_30d: double (nullable = true)
 |    |    |    |    |-- percent_change_60d: double (nullable = true)
 |    |    |    |    |-- percent_change_7d: double (nullable = true)
 |    |    |    |    |-- percent_change_90d: double (nullable = true)
 |    |    |    |    |-- price: double (nullable = true)
 |    |    |    |    |-- volume_24h: double (nullable = true)
 |    |    |    |    |-- volume_change_24h: double (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- total_supply: long (nullable = true)
 |    |-- 5426: struct (nullable = true)
 |    |    |-- circulating_supply: double (nullable = true)
 |    |    |-- cmc_rank: long (nullable = true)
 |    |    |-- date_added: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_active: long (nullable = true)
 |    |    |-- is_fiat: long (nullable = true)
 |    |    |-- last_updated: string (nullable = true)
 |    |    |-- max_supply: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_market_pairs: long (nullable = true)
 |    |    |-- platform: string (nullable = true)
 |    |    |-- quote: struct (nullable = true)
 |    |    |    |-- USD: struct (nullable = true)
 |    |    |    |    |-- fully_diluted_market_cap: double (nullable = true)
 |    |    |    |    |-- last_updated: string (nullable = true)
 |    |    |    |    |-- market_cap: double (nullable = true)
 |    |    |    |    |-- market_cap_dominance: double (nullable = true)
 |    |    |    |    |-- percent_change_1h: double (nullable = true)
 |    |    |    |    |-- percent_change_24h: double (nullable = true)
 |    |    |    |    |-- percent_change_30d: double (nullable = true)
 |    |    |    |    |-- percent_change_60d: double (nullable = true)
 |    |    |    |    |-- percent_change_7d: double (nullable = true)
 |    |    |    |    |-- percent_change_90d: double (nullable = true)
 |    |    |    |    |-- price: double (nullable = true)
 |    |    |    |    |-- volume_24h: double (nullable = true)
 |    |    |    |    |-- volume_change_24h: double (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- total_supply: double (nullable = true)
 |    |-- 825: struct (nullable = true)
 |    |    |-- circulating_supply: double (nullable = true)
 |    |    |-- cmc_rank: long (nullable = true)
 |    |    |-- date_added: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_active: long (nullable = true)
 |    |    |-- is_fiat: long (nullable = true)
 |    |    |-- last_updated: string (nullable = true)
 |    |    |-- max_supply: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_market_pairs: long (nullable = true)
 |    |    |-- platform: struct (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- slug: string (nullable = true)
 |    |    |    |-- symbol: string (nullable = true)
 |    |    |    |-- token_address: string (nullable = true)
 |    |    |-- quote: struct (nullable = true)
 |    |    |    |-- USD: struct (nullable = true)
 |    |    |    |    |-- fully_diluted_market_cap: double (nullable = true)
 |    |    |    |    |-- last_updated: string (nullable = true)
 |    |    |    |    |-- market_cap: double (nullable = true)
 |    |    |    |    |-- market_cap_dominance: double (nullable = true)
 |    |    |    |    |-- percent_change_1h: double (nullable = true)
 |    |    |    |    |-- percent_change_24h: double (nullable = true)
 |    |    |    |    |-- percent_change_30d: double (nullable = true)
 |    |    |    |    |-- percent_change_60d: double (nullable = true)
 |    |    |    |    |-- percent_change_7d: double (nullable = true)
 |    |    |    |    |-- percent_change_90d: double (nullable = true)
 |    |    |    |    |-- price: double (nullable = true)
 |    |    |    |    |-- volume_24h: double (nullable = true)
 |    |    |    |    |-- volume_change_24h: double (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- total_supply: double (nullable = true)
 |-- status: struct (nullable = true)
 |    |-- credit_count: long (nullable = true)
 |    |-- elapsed: long (nullable = true)
 |    |-- error_code: long (nullable = true)
 |    |-- error_message: string (nullable = true)
 |    |-- notice: string (nullable = true)
 |    |-- timestamp: string (nullable = true)

CodePudding user response:

As the values of Id are in fact struct field names of your column data, you can first create an array of structs from theses Ids that you get from the schema of crypto_df.select("data.*"), then explode the resulting array and expand the inner structs, or simply use inline function which does both:

import pyspark.sql.functions as F

crypto_df = crypto_df.withColumn(
    "data",
    F.array(*[F.col(f"data.{i}") for i in crypto_df.select("data.*").columns])
).selectExpr(
    "inline(data)"
)

crypto_df.show()

# ------------------ -------- -------------------- --- --------- ------- -------------------- ------------ ------- ---------------- -------- -------------------- ------- ------ -------------------- ------------ 
#|circulating_supply|cmc_rank|          date_added| id|is_active|is_fiat|        last_updated|  max_supply|   name|num_market_pairs|platform|               quote|   slug|symbol|                tags|total_supply|
# ------------------ -------- -------------------- --- --------- ------- -------------------- ------------ ------- ---------------- -------- -------------------- ------- ------ -------------------- ------------ 
#|          18918568|       1|2013-04-28T00:00:...|  1|        1|      0|2022-01-03T07:49:...|    21000000|Bitcoin|            8970|    null|{{9.8723412602589...|bitcoin|   BTC|[mineable, pow, s...|    18918568|
#|       47535964473|       8|2013-08-04T00:00:...| 52|        1|      0|2022-01-03T07:49:...|100000000000|    XRP|             672|    null|{{8.432332143455E...|    xrp|   XRP|[medium-of-exchan...| 99989907034|
# ------------------ -------- -------------------- --- --------- ------- -------------------- ------------ ------- ---------------- -------- -------------------- ------- ------ -------------------- ------------ 
Edit:

From your comments, if the field platform can be string or struct, you can cast it as string using withField (available since Spark 3.1) like this:

crypto_df.withColumn(
    "data",
    F.array(*[
        F.col(f"data.{c}").withField("platform", F.col(f"data.{c}.platform").cast("string"))
        for c in crypto_df.select("data.*").columns
    ])
).selectExpr("inline(data)").show()

# -------------------- -------- -------------------- ---- --------- ------- -------------------- ------------ -------- ---------------- -------------------- -------------------- -------- ------ -------------------- -------------------- 
#|  circulating_supply|cmc_rank|          date_added|  id|is_active|is_fiat|        last_updated|  max_supply|    name|num_market_pairs|            platform|               quote|    slug|symbol|                tags|        total_supply|
# -------------------- -------- -------------------- ---- --------- ------- -------------------- ------------ -------- ---------------- -------------------- -------------------- -------- ------ -------------------- -------------------- 
#|         1.8918687E7|       1|2013-04-28T00:00:...|   1|        1|      0|2022-01-03T12:05:...|    21000000| Bitcoin|            8972|                null|{{9.9427790254864...| bitcoin|   BTC|[mineable, pow, s...|         1.8918687E7|
#|    1.190054996865E8|       2|2015-08-07T00:00:...|1027|        1|      0|2022-01-03T12:06:...|        null|Ethereum|            5377|                null|{{4.5577849537238...|ethereum|   ETH|[mineable, pow, s...|    1.190054996865E8|
#|  3.3485475547114E10|       6|2017-10-01T00:00:...|2010|        1|      0|2022-01-03T12:05:...| 45000000000| Cardano|             348|                null|{{6.161144465755E...| cardano|   ADA|[mineable, dpos, ...|  3.3927753982173E10|
#|     4.7535964473E10|       8|2013-08-04T00:00:...|  52|        1|      0|2022-01-03T12:05:...|100000000000|     XRP|             672|                null|{{8.448884837941E...|     xrp|   XRP|[medium-of-exchan...|     9.9989907034E10|
#| 3.092842565984485E8|       5|2020-04-10T00:00:...|5426|        1|      0|2022-01-03T12:05:...|        null|  Solana|             217|                null|{{8.937421274506E...|  solana|   SOL|[pos, platform, s...|  5.11616946142289E8|
#|7.833788250671762E10|       4|2015-02-25T00:00:...| 825|        1|      0|2022-01-03T12:06:...|        null|  Tether|           25203|{1027, Ethereum, ...|{{8.276917824101E...|  tether|  USDT|[payments, stable...|8.275899187310669E10|
# -------------------- -------- -------------------- ---- --------- ------- -------------------- ------------ -------- ---------------- -------------------- -------------------- -------- ------ -------------------- -------------------- 

  • Related