I'm trying to implement a element-wise multiplication of two ml.linalg.SparseVector
instances (also called a Hadamard product).
A SparseVector represents a vector, but rather than having space taken up by all the "0" values, they are omitted. The vector is represented as two lists of Indices and Values.
For example: SparseVector(indices: [0, 100, 100000], values: [0.25, 1, 0.8])
concisely represents an array of 100,000 elements, where only 3 values are non-zero.
I now need an element-wise multiplication of two of these, and there seems to be no built-in. Conceptually, it should be simple - any indices they don't have in common are dropped, and for the indices in common, the numbers are multiplied together.
For example: SparseVector(indices: [0, 500, 100000], values: [10, 1, 10])
when multiplied with the above should return: SparseVector(indices: [0, 100000], values: [2.5, 8])
Sadly, I've found no built-in for this. I have an approach for doing this in a single pass, but it isn't very scala-y, it has to build up the list in a loop as it discovers which indices are in common, and then grab the corresponding values for each index (which have the same cardinal position, but in a second array).
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.functions.udf
import scala.collection.mutable.ListBuffer
// Return a new SparseVector whose values are the element-wise product (Hadamard product)
val multSparseVectors = udf((v1: SparseVector, v2: SparseVector) => {
// val commonIndexes = v1.indices.intersect(v2.indices); // Missing scale factors are assumed to have a value of 0, so only common elements remain
// TODO: No clear way to map common indices to the values that go with those indices. E.g. no "valueForIndex" method
// new SparseVector(v1.size, commonIndexes, commonIndexes.map(i => v1.valueForIndex(i) * v2.valueForIndex(i)).toArray);
val indices = ListBuffer[Int](); // TODO: Some way to do this without mutable lists?
val values = ListBuffer[Double]();
var v1Pos = 0; // Current index of SparseVector v1 (we will be making a single pass)
var v2pos = 0; // Current index of SparseVector v2 (we will be making a single pass)
while(v1Pos < v1.indices.length && v2pos < v2.indices.length) {
while(v1.indices(v1Pos) < v2.indices(v2pos))
v2pos = 1; // Advance our position in SparseVector 2 until we've matched or passed the current SparseVector 1 index
if(v2pos > v2.indices.length && v1.indices(v1Pos) == v2.indices(v2pos)) {
indices = v1.indices(v1Pos);
values = v1.values(v1Pos) * v2.values(v2pos);
}
v1Pos = 1;
}
new SparseVector(v1.size, indices.toArray, values.toArray);
})
spark.udf.register("multSparseVectors", multSparseVectors)
Can anyone think of a way that I can do this using a map or similar? My main goal is I want to avoid having to make multiple O(N) passes over the second vector to "lookup" the position of a value in the indices
list so that I can grab the corresponding values
entry, because this would take O(K N*2) time when I know there's an O(K N) solution possible.
CodePudding user response:
I've come up with a solution by boiling this problem into a more general one:
Given an answer to the above question (where to the two arrays v1.indices
and v2.indices
intersect), we can trivially use those indices to extract back the new SparseVector indices, and the values from each vector to be multiplied together.
The solution is given below:
%scala
import scala.annotation.tailrec
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.functions.udf
// This fanciness from https://stackoverflow.com/a/71928709/529618 finds the indices at which two lists intersect
@tailrec
def indicesOfIntersection(left: List[Int], right: List[Int], lidx: Int = 0, ridx: Int = 0, result: List[(Int, Int)] = Nil): List[(Int, Int)] = (left, right) match {
case (Nil, _) | (_, Nil) => result.reverse
case (l::tail, r::_) if l < r => indicesOfIntersection(tail, right, lidx 1, ridx, result)
case (l::_, r::tail) if l > r => indicesOfIntersection(left, tail, lidx, ridx 1, result)
case (l::ltail, r::rtail) => indicesOfIntersection(ltail, rtail, lidx 1, ridx 1, (lidx, ridx) :: result)
}
// Return a new SparseVector whose values are the element-wise product (Hadamard product)
val multSparseVectors = udf((v1: SparseVector, v2: SparseVector) => {
val intersection = indicesOfIntersection(v1.indices.toList, v2.indices.toList);
new SparseVector(v1.size,
intersection.map{case (x1,_) => v1.indices(x1)}.toArray,
intersection.map{case (x1,x2) => v1.values(x1) * v2.values(x2)}.toArray);
})
spark.udf.register("multSparseVectors", multSparseVectors)