I have a dataframe in PySpark that has the following schema:
root
|-- value: array (nullable = true)
| |-- element: double (containsNull = true)
|-- id: long (nullable = true)
|-- timestamp: long (nullable = true)
|-- variable_name: string (nullable = true)
|-- Intensity: float (nullable = true)
And the dataframe itself looks like this (I will just show the columns value and intensity, since they are the only ones I need for what I want to do):
value | Intensity |
---|---|
[-0.01, 58] | 59 |
[47.2, -20.1] | 30 |
What I would like to do is the following: take the value of the column "Intensity", and look for the closest value to that one in the array of "value". This value will be added to a new column, called "nearest". So, in my example, I would get:
value | Intensity | nearest |
---|---|---|
[-0.01, 58] | 59 | 58 |
[47.2, -20.1] | 30 | 47.2 |
To do this, I have tried the following:
- First, I defined my find_nearest function:
def find_nearest(array, value):
array = np.array(array)
nearest_index = np.where(abs(array - value) == abs(array - value).min())[0]
nearest_value = array[abs(array - value) == abs(array - value).min()]
return nearest_index[0] ## returns just the index of the nearest value
- Then, I try to use my function in my dataframe. I tried:
df2 = df.withColumn("nearest", [find_nearest(a, b) for a, b in zip(df['value'], df['Intensity'])])
But I get an error:
TypeError: Column is not iterable
Could someone please give me a hand with this?
Thank you in advance.
CodePudding user response:
The error you get means you need to define an UDF.
However, here you can simply use Spark builtin functions. Here's one way using transform
and array_min
with structs ordering:
from pyspark.sql import functions as F
df = spark.createDataFrame([([-0.01, 58.0], 59), ([47.2, -20.1], 30)], ["value", "Intensity"])
result = df.withColumn(
"nearest",
F.array_min(
F.expr("transform(value, x -> struct(abs(x - Intensity), x as v))")
)["v"]
)
result.show()
# ------------- --------- -------
# | value|Intensity|nearest|
# ------------- --------- -------
# |[-0.01, 58.0]| 59| 58.0|
# |[47.2, -20.1]| 30| 47.2|
# ------------- --------- -------
CodePudding user response:
you can do it without creating custom function
>>> from pyspark.sql import functions as F
>>> df = spark.createDataFrame( [([-0.01, 58.0],59), ([47.2, -20.1],30)],['value', 'Intensity'])
>>> df1=df.withColumn("col1",df["value"].getItem(0)).withColumn("col2",df["value"].getItem(1))
>>> df1.withColumn("nearest",when(((df1["Intensity"] - F.abs(df1["col1"]))<(df1["Intensity"] - F.abs(df1["col2"]))),df1["col1"]).otherwise(df1["col2"])).drop(df1["col1"
------------- --------- -------
| value|Intensity|nearest|
------------- --------- -------
|[-0.01, 58.0]| 59| 58.0|
|[47.2, -20.1]| 30| 47.2|
------------- --------- -------