I am trying to use numpy.cov on some arrays, which is actually created by collect_list(). Consider this as my data
data = [('a', 1.1, 2.2),
('a', 3.3, 4.4),
('b', 5.5, 6.6),
('b', 7.7, 8.8)]
df = spark.createDataFrame(data, ['id', 'col1', 'col2'])
df.show()
--- ---- ----
| id|col1|col2|
--- ---- ----
| a| 1.1| 2.2|
| a| 3.3| 4.4|
| b| 5.5| 6.6|
| b| 7.7| 8.8|
--- ---- ----
df2 = df.groupBy('id').agg(
F.collect_list('col1').alias('array1'),
F.collect_list('col2').alias('array2'))
df2.show()
--- ---------- ----------
| id| array1| array2|
--- ---------- ----------
| b|[5.5, 7.7]|[6.6, 8.8]|
| a|[1.1, 3.3]|[2.2, 4.4]|
--- ---------- ----------
And like so, i wanna use numpy.cov on array1 and array2. BTW, here was my code for single id 'a', i mean, what i wanna do to these two arrays.
df3 = df.filter("id = 'a'")
array1 = np.array(df3.select('col1').collect()).T.astype(float)
array2 = np.array(df3.select('col2').collect()).T.astype(float)
cov = np.cov( array1 - array2,
array1 array2)
what_i_need = cov[0][1] / cov[1][1]
that's all i wanna do.i mean, i wanna my data being like this:
--- ---------------------
| id|cov[0][1] / cov[1][1]|
--- ---------------------
| b| x.xxx|
| a| x.xxx|
--- ---------------------
i really don't know how to accomplish this.
whatever i tried, i always got the errors:
'float' object has no attribute 'shape'
unexpected type:<type 'type'>
oh, one more thing, the 'col1', 'col2' is actually 'decimal' type.
CodePudding user response:
i figured it out. check out this
dfrdds = df2.rdd.map(lambda df2: ({df2.id : [df2.array1, df2.array2]}))
def beta(rdd):
id = list(rdd.keys())[0]
values = rdd.get(id)
array1 = np.array(values[0])
array2 = np.array(values[1])
cov = np.cov(array1 - array2,
array1 array2)
beta = cov[0][1] / cov[1][1]
return id, float(beta)
df4 = dfrdds.map(beta).toDF(['id', 'beta'])
df4.show()
--- --------------------
| id| beta|
--- --------------------
| b|-2.01858731750028...|
| a|-1.00929365875014...|
--- --------------------
use rdd and udf, that's all.
and float(beta)
is important.without it, there will be an error like Can not infer schema for type: <class 'numpy.float64'>