I have the following dataframe of HTTP requests with columns IP
, Timestamp
, user_agent
and hostname
:
IP | Timestamp | user_agent | hostname |
---|---|---|---|
1.1.1.1 | 1656758570 | ua1 | hostname1 |
1.1.1.1 | 1656758580 | ua2 | hostname2 |
1.1.1.1 | 1656758570 | ua3 | hostname1 |
1.1.1.1 | 1656758580 | ua1 | hostname3 |
2.2.2.2 | 1656758580 | ua3 | hostname2 |
2.2.2.2 | 1656758580 | ua1 | hostname1 |
2.2.2.2 | 1656758590 | ua3 | hostname1 |
2.2.2.2 | 1656758590 | ua2 | hostname3 |
2.2.2.2 | 1656758590 | ua1 | hostname2 |
I want to group the data by the IP
column, then inside each group to aggregate on the Timestamp
column, calculate countDistinct
values for the other columns (user_agent
and hostname
) that have the same Timestamp
value, and then transform the aggregated data into new columns. Namely, I want the result dataframe to look like this:
IP | 1656758570_uniue_user_agent | 1656758570_uniue_hostname | 1656758580_uniue_user_agent | 1656758580_uniue_hostname | 1656758590_uniue_user_agent | 1656758590_uniue_hostname |
---|---|---|---|---|---|---|
1.1.1.1 | 2 | 1 | 2 | 2 | 0 | 0 |
2.2.2.2 | 0 | 0 | 2 | 2 | 3 | 3 |
Is there a convenient way to do this instead of multiple withColumn
calls?
CodePudding user response:
You can insert a pivot
between groupBy
and agg
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('1.1.1.1', 1656758570, 'ua1', 'hostname1'),
('1.1.1.1', 1656758580, 'ua2', 'hostname2'),
('1.1.1.1', 1656758570, 'ua3', 'hostname1'),
('1.1.1.1', 1656758580, 'ua1', 'hostname3'),
('2.2.2.2', 1656758580, 'ua3', 'hostname2'),
('2.2.2.2', 1656758580, 'ua1', 'hostname1'),
('2.2.2.2', 1656758590, 'ua3', 'hostname1'),
('2.2.2.2', 1656758590, 'ua2', 'hostname3'),
('2.2.2.2', 1656758590, 'ua1', 'hostname2')],
['IP', 'Timestamp', 'user_agent', 'hostname'])
df = (df
.groupBy('IP')
.pivot('Timestamp')
.agg(F.countDistinct('user_agent').alias('unique_user_agent'),
F.countDistinct('hostname').alias('unique_hostname'))
.fillna(0)
)
df.show()
# ------- ---------------------------- -------------------------- ---------------------------- -------------------------- ---------------------------- --------------------------
# | IP|1656758570_unique_user_agent|1656758570_unique_hostname|1656758580_unique_user_agent|1656758580_unique_hostname|1656758590_unique_user_agent|1656758590_unique_hostname|
# ------- ---------------------------- -------------------------- ---------------------------- -------------------------- ---------------------------- --------------------------
# |2.2.2.2| 0| 0| 2| 2| 3| 3|
# |1.1.1.1| 2| 1| 2| 2| 0| 0|
# ------- ---------------------------- -------------------------- ---------------------------- -------------------------- ---------------------------- --------------------------
CodePudding user response:
Use list comprehension to pass a wildcard and pivot the count. Code below;
(df
.groupBy('IP')
.pivot('Timestamp')
.agg(*[F.countDistinct(col(x)).alias(f'unique_{x}') for x in df.drop('Timestamp').columns]
)
.fillna(0)
).show()
------- -------------------- ---------------------------- -------------------------- -------------------- ---------------------------- -------------------------- -------------------- ---------------------------- --------------------------
| IP|1656758570_unique_IP|1656758570_unique_user_agent|1656758570_unique_hostname|1656758580_unique_IP|1656758580_unique_user_agent|1656758580_unique_hostname|1656758590_unique_IP|1656758590_unique_user_agent|1656758590_unique_hostname|
------- -------------------- ---------------------------- -------------------------- -------------------- ---------------------------- -------------------------- -------------------- ---------------------------- --------------------------
|2.2.2.2| 0| 0| 0| 1| 2| 2| 1| 3| 3|
|1.1.1.1| 1| 2| 1| 1| 2| 2| 0| 0| 0|
------- -------------------- ---------------------------- -------------------------- -------------------- ---------------------------- -------------------------- -------------------- ---------------------------- --------------------------