My RDD (From ElasticSearch) looks like this.
[
('rty456ui', {'@timestamp': '2022-10-10T24:56:10.000259 0000', 'host': {'id': 'test-host-id-1'}, 'watchlists': {'ioc': {'summary': '127.0.0.1', 'tags': ('Dummy Tag',)}}, 'source': {'ip': '127.0.0.1'}, 'event': {'created': '2022-10-10T13:56:10 00:00', 'id': 'rty456ui'}, 'tags': ('Mon',)}),
('cxs980qw', {'@timestamp': '2022-10-10T13:56:10.000259 0000', 'host': {'id': 'test-host-id-2'}, 'watchlists': {'ioc': {'summary': '0.0.0.1', 'tags': ('Dummy Tag',)}}, 'source': {'ip': '0.0.0.1'}, 'event': {'created': '2022-10-10T24:56:10 00:00', 'id': 'cxs980qw'}, 'tags': ('Mon', 'Tue')})
]
(What I find interesting is Lists in ES are converted to Tuples in RDD)
I am trying to convert it into something like this.
--------------- ----------- ----------- --------------------------- ----------------------- ----------------------- ---------------
|host.id |event.id |source.ip |event.created |watchlists.ioc.summary |watchlists.ioc.tags |tags |
--------------- ----------- ----------- --------------------------- ----------------------- ----------------------- ---------------
|test-host-id-1 |rty456ui |127.0.0.1 |2022-10-10T13:56:10 00:00 |127.0.0.1 |[Dummy Tag] |[Mon] |
|test-host-id-2 |cxs980qw |0.0.0.1 |2022-10-10T24:56:10 00:00 |127.0.0.1 |[Dummy Tag] |[Mon, Tue] |
--------------- ----------- ----------- --------------------------- ----------------------- ----------------------- ---------------
However, getting this.
------- -------- --------- ------------- ---------------------- ------------------- -------------------------------
|host.id|event.id|source.ip|event.created|watchlists.ioc.summary|watchlists.ioc.tags|tags |
------- -------- --------- ------------- ---------------------- ------------------- -------------------------------
|null |null |null |null |null |null |[Ljava.lang.Object;@6c704e6e |
|null |null |null |null |null |null |[Ljava.lang.Object;@701ea4c8 |
------- -------- --------- ------------- ---------------------- ------------------- -------------------------------
Code
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
StructField("host.id",StringType(), True),
StructField("event.id",StringType(), True),
StructField("source.ip",StringType(), True),
StructField("event.created", StringType(), True),
StructField("watchlists.ioc.summary", StringType(), True),
StructField("watchlists.ioc.tags", StringType(), True),
StructField("tags", StringType(), True)
])
df = spark.createDataFrame(es_rdd.map(lambda x: x[1]),schema)
df.show(truncate=False)
I'm trying to convert an rdd into Dataframe. Additionally, I want to define the schema for it. However, pyspark.createDataFrame(rdd, schema)
returns just null values, even though the rdd has data. Further, I get [Ljava.lang.Object;@701ea4c8
in the output too. So what am I missing here?
CodePudding user response:
Your post cover 2 questions:
Why all columns will be null even I declare the schema when I transform the RDD to dataframe: In your
schema
, you use StructTypeColumn.StructFiedColumn (eghost.id
) to get the value in RDD. However, this type of selection statement could only work when you use Spark SQL select statement and I think there is no such parsing here. To achieve your goal, you might have to update your lambda function insidemap
function to extract the exact element likerdd_trans = rdd.map(lambda x: (x[1]['host']['id'], x[1]['event']['id'], ))
Why the output of
tag
column is not shown as expected: It's because when you declare yourtag
column, you declare it as a string column, you should use ArrayType instead.