Home > Mobile >  Change the datatype for two columns of a pyspark dataframe after receiving json from Kafka server bu
Change the datatype for two columns of a pyspark dataframe after receiving json from Kafka server bu

Time:07-08

I am looking to change the "tweet_id" and "userID" of the pyspark dataframe I am constructed. I want the datatype for both to be of type Integer.

My code is below...

import findspark
from pyspark import SparkConf, SparkContext
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import from_json, col
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'
spark = SparkSession.builder.master("local[2]").appName('LearningDataframeWork').getOrCreate()

schema = StructType([ 
        StructField("tweet_id", StringType(), True),
        StructField("tweet_text" , StringType(), True),
        StructField("userID" , StringType(), True),
        StructField("username" , StringType(), True),
        ])


df_tweets = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "TwitterTweets") \
    .option("startingOffsets", "earliest") \
    .load() \
    .select(from_json(col("value").cast("string"),schema).alias("converted")) \
    .select(col("converted.*"))

df = df_tweets.select('*')


query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

I get the output...

|           tweet_id|          tweet_text|             userID|      username|
 ------------------- -------------------- ------------------- -------------- 
|1545020704969695236|RT @dme_363: Wait...|1341441279905902593|      dme__363|
|1545020704927625216|RT @DeadlineDayLi...|          634939603|    mattlobooo|
|1545020703350685698|@10Junioor Ronald...|          126401539|       IGAOC12|
|1545020702000128003|RT @ManUtdMEN: Po...|1247589115010351109|       D_Nyeko|
|1545020700334981121|RT @AgoluaVictor:...| 757194951985856512| Iam_Bwoiralph|
|1545020699764576256|Pré-saison avec M...|         2255497760|     sunusport|
|1545020696283209728|RT @ManuelMenacho...|         2305097051|SoetanAdebare1|
|1545020695477997570|@Ayden__x Ronaldo...|         4121380654|      blaq_gem|
|1545020691220766720|RT @JamesSunday_:...| 757194951985856512| Iam_Bwoiralph|
|1545020689522073601|RT @CFCBlues_com:...|1542264394675109888|      CBrayzay|

the json value I extracted the data from is presented like below...

{"tweet_id": 15450003263664388, "tweet_text": "RT @Lordd: Who wears the Number 7 better??\\nLike for Kante, Retweet for Ronaldo\\n\\n||Ronaldo to Chelsea", "userID": 1196913590, "username": "davo_matsa"}'

I have tried using .withColumn() and change the the datatype of the column, the column's data type for tweet_id and userID does change to Integer but I get null fields.

df= df.withColumn("tweet_id_coverted", col("tweet_id").cast("Integer"))

It yielded me ...

 ------------------- -------------------- ------------------- -------------- ----------------- 
|           tweet_id|          tweet_text|             userID|      username|tweet_id_coverted|
 ------------------- -------------------- ------------------- -------------- ----------------- 
|1545020704969695236|RT @dme_363: Wait...|1341441279905902593|      dme__363|             null|
|1545020704927625216|RT @DeadlineDayLi...|          634939603|    mattlobooo|             null|
|1545020703350685698|@10Junioor Ronald...|          126401539|       IGAOC12|             null|
|1545020702000128003|RT @ManUtdMEN: Po...|1247589115010351109|       D_Nyeko|             null|
|1545020700334981121|RT @AgoluaVictor:...| 757194951985856512| Iam_Bwoiralph|             null|
|1545020699764576256|Pré-saison avec M...|         2255497760|     sunusport|             null|
|1545020696283209728|RT @ManuelMenacho...|         2305097051|SoetanAdebare1|             null|
|1545020695477997570|@Ayden__x Ronaldo...|         4121380654|      blaq_gem|             null|
|1545020691220766720|RT @JamesSunday_:...| 757194951985856512| Iam_Bwoiralph|             null|
|1545020689522073601|RT @CFCBlues_com:...|1542264394675109888|      CBrayzay|             null|

The main issue is the null fields I get when changing the datatype to Integer. As you can see from new column I tried creating. Any help with a solution would be appreciated!

CodePudding user response:

Try the following

kafkaDf = spark.read.format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("subscribe", 'TwitterTweets')\
  .option("startingOffsets", "earliest")\
  .load()

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

schema = StructType([
    StructField("tweet_id", LongType(), False),
    StructField("tweet_text", StringType(), False),
    StructField("userID", LongType(), False),
    StructField("username", StringType(), False),
])

parsed_df = kafkaDf.select(
    from_json(col("value").cast("string"), schema).alias("value")
).select("value.*")

parsed_df

root
 |-- tweet_id: long (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- userID: long (nullable = true)
 |-- username: string (nullable = true)

 ----------------- -------------------- ---------- ---------- 
|         tweet_id|          tweet_text|    userID|  username|
 ----------------- -------------------- ---------- ---------- 
|15450003263664388|RT @Lordd: Who we...|1196913590|davo_matsa|
 ----------------- -------------------- ---------- ---------- 
  • Related