Home > Blockchain >  How can I deal with arrays created by groupBy&collect_list?
How can I deal with arrays created by groupBy&collect_list?

Time:11-04

I am trying to use numpy.cov on some arrays, which is actually created by collect_list(). Consider this as my data

data = [('a', 1.1, 2.2),
        ('a', 3.3, 4.4),
        ('b', 5.5, 6.6),
        ('b', 7.7, 8.8)]
df = spark.createDataFrame(data, ['id', 'col1', 'col2'])
df.show()
 --- ---- ---- 
| id|col1|col2|
 --- ---- ---- 
|  a| 1.1| 2.2|
|  a| 3.3| 4.4|
|  b| 5.5| 6.6|
|  b| 7.7| 8.8|
 --- ---- ---- 
df2 = df.groupBy('id').agg(
    F.collect_list('col1').alias('array1'),
    F.collect_list('col2').alias('array2'))
df2.show()
 --- ---------- ---------- 
| id|    array1|    array2|
 --- ---------- ---------- 
|  b|[5.5, 7.7]|[6.6, 8.8]|
|  a|[1.1, 3.3]|[2.2, 4.4]|
 --- ---------- ---------- 

And like so, i wanna use numpy.cov on array1 and array2. BTW, here was my code for single id 'a', i mean, what i wanna do to these two arrays.

df3 = df.filter("id = 'a'")
array1 = np.array(df3.select('col1').collect()).T.astype(float)
array2 = np.array(df3.select('col2').collect()).T.astype(float)
cov = np.cov( array1 - array2,
              array1   array2)
what_i_need = cov[0][1] / cov[1][1]

that's all i wanna do.i mean, i wanna my data being like this:

 --- --------------------- 
| id|cov[0][1] / cov[1][1]|
 --- --------------------- 
|  b|                x.xxx|
|  a|                x.xxx|
 --- --------------------- 

i really don't know how to accomplish this. whatever i tried, i always got the errors: 'float' object has no attribute 'shape' unexpected type:<type 'type'> oh, one more thing, the 'col1', 'col2' is actually 'decimal' type.

CodePudding user response:

i figured it out. check out this

dfrdds = df2.rdd.map(lambda df2: ({df2.id : [df2.array1, df2.array2]}))

def beta(rdd):
    id = list(rdd.keys())[0]
    values = rdd.get(id)
    array1 = np.array(values[0])
    array2 = np.array(values[1])
    cov = np.cov(array1 - array2,
                 array1   array2)
    beta = cov[0][1] / cov[1][1]
    return id, float(beta)

df4 = dfrdds.map(beta).toDF(['id', 'beta'])
df4.show()
 --- -------------------- 
| id|                beta|
 --- -------------------- 
|  b|-2.01858731750028...|
|  a|-1.00929365875014...|
 --- -------------------- 

use rdd and udf, that's all. and float(beta) is important.without it, there will be an error like Can not infer schema for type: <class 'numpy.float64'>

  • Related