I am looking to change the "tweet_id" and "userID" of the pyspark dataframe I am constructed. I want the datatype for both to be of type Integer.
My code is below...
import findspark
from pyspark import SparkConf, SparkContext
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import from_json, col
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'
spark = SparkSession.builder.master("local[2]").appName('LearningDataframeWork').getOrCreate()
schema = StructType([
StructField("tweet_id", StringType(), True),
StructField("tweet_text" , StringType(), True),
StructField("userID" , StringType(), True),
StructField("username" , StringType(), True),
])
df_tweets = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "TwitterTweets") \
.option("startingOffsets", "earliest") \
.load() \
.select(from_json(col("value").cast("string"),schema).alias("converted")) \
.select(col("converted.*"))
df = df_tweets.select('*')
query = df.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
I get the output...
| tweet_id| tweet_text| userID| username|
------------------- -------------------- ------------------- --------------
|1545020704969695236|RT @dme_363: Wait...|1341441279905902593| dme__363|
|1545020704927625216|RT @DeadlineDayLi...| 634939603| mattlobooo|
|1545020703350685698|@10Junioor Ronald...| 126401539| IGAOC12|
|1545020702000128003|RT @ManUtdMEN: Po...|1247589115010351109| D_Nyeko|
|1545020700334981121|RT @AgoluaVictor:...| 757194951985856512| Iam_Bwoiralph|
|1545020699764576256|Pré-saison avec M...| 2255497760| sunusport|
|1545020696283209728|RT @ManuelMenacho...| 2305097051|SoetanAdebare1|
|1545020695477997570|@Ayden__x Ronaldo...| 4121380654| blaq_gem|
|1545020691220766720|RT @JamesSunday_:...| 757194951985856512| Iam_Bwoiralph|
|1545020689522073601|RT @CFCBlues_com:...|1542264394675109888| CBrayzay|
the json value I extracted the data from is presented like below...
{"tweet_id": 15450003263664388, "tweet_text": "RT @Lordd: Who wears the Number 7 better??\\nLike for Kante, Retweet for Ronaldo\\n\\n||Ronaldo to Chelsea", "userID": 1196913590, "username": "davo_matsa"}'
I have tried using .withColumn() and change the the datatype of the column, the column's data type for tweet_id and userID does change to Integer but I get null fields.
df= df.withColumn("tweet_id_coverted", col("tweet_id").cast("Integer"))
It yielded me ...
------------------- -------------------- ------------------- -------------- -----------------
| tweet_id| tweet_text| userID| username|tweet_id_coverted|
------------------- -------------------- ------------------- -------------- -----------------
|1545020704969695236|RT @dme_363: Wait...|1341441279905902593| dme__363| null|
|1545020704927625216|RT @DeadlineDayLi...| 634939603| mattlobooo| null|
|1545020703350685698|@10Junioor Ronald...| 126401539| IGAOC12| null|
|1545020702000128003|RT @ManUtdMEN: Po...|1247589115010351109| D_Nyeko| null|
|1545020700334981121|RT @AgoluaVictor:...| 757194951985856512| Iam_Bwoiralph| null|
|1545020699764576256|Pré-saison avec M...| 2255497760| sunusport| null|
|1545020696283209728|RT @ManuelMenacho...| 2305097051|SoetanAdebare1| null|
|1545020695477997570|@Ayden__x Ronaldo...| 4121380654| blaq_gem| null|
|1545020691220766720|RT @JamesSunday_:...| 757194951985856512| Iam_Bwoiralph| null|
|1545020689522073601|RT @CFCBlues_com:...|1542264394675109888| CBrayzay| null|
The main issue is the null fields I get when changing the datatype to Integer. As you can see from new column I tried creating. Any help with a solution would be appreciated!
CodePudding user response:
Try the following
kafkaDf = spark.read.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", 'TwitterTweets')\
.option("startingOffsets", "earliest")\
.load()
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *
schema = StructType([
StructField("tweet_id", LongType(), False),
StructField("tweet_text", StringType(), False),
StructField("userID", LongType(), False),
StructField("username", StringType(), False),
])
parsed_df = kafkaDf.select(
from_json(col("value").cast("string"), schema).alias("value")
).select("value.*")
parsed_df
root
|-- tweet_id: long (nullable = true)
|-- tweet_text: string (nullable = true)
|-- userID: long (nullable = true)
|-- username: string (nullable = true)
----------------- -------------------- ---------- ----------
| tweet_id| tweet_text| userID| username|
----------------- -------------------- ---------- ----------
|15450003263664388|RT @Lordd: Who we...|1196913590|davo_matsa|
----------------- -------------------- ---------- ----------