I have a following code in Scala for Python conversion
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{Column, DataFrame, Dataset}
object SearchTermReader {
def read(
searchTermsInputTable: DataFrame,
brand: String,
posa: String,
startDate: String,
endDate: String
): Dataset[SearchTerm] = {
import searchTermsInputTable.sparkSession.implicits._
val conditionsNoEndDate = getConditions(brand, posa, startDate)
val searchTermsNoEndDate = searchTermsInputTable
.where(conditionsNoEndDate)
.cache()
searchTermsNoEndDate.count()
val columnNames = SparkExtensions.getColumns[SearchTerm]
searchTermsNoEndDate
.filter(col("report_date").leq(lit(endDate)))
.select(columnNames: _*)
.as[SearchTerm]
}
def getConditions(
brand: String,
posa: String,
startDate: String
): Column = {
val filterByBrandCondition: Column = {
if (brand.equals("")) {
lit(true)
} else {
col("brand") === brand
}
}
val filterByPosaCondition: Column = {
if (posa.equals("")) {
lit(true)
} else {
col("account_name").rlike(getAccountPattern(posa))
}
}
filterByBrandCondition &&
filterByPosaCondition &&
col("search_engine") === "GOOGLE" &&
col("impressions") > 0 &&
col("report_date").geq(lit(startDate))
}
def getAccountPattern(countryCodes: String): String = {
countryCodes.split(",").map(cc => s":G:$cc:").mkString("|")
}
}
Seems to be two issues here for straight conversion.
- Dataset is used which is not supported by Pyspark
- === is used for Column which is also not supported
How I can overcome this and convert it to Python ??
CodePudding user response:
If you are referring to column of the dataframe then you can use it like below.
df.filter((col("brand") == "BRAND") & (...))
CodePudding user response:
Pyspark doesn't support using ===
just as Scala
.
In Scala, the ==
is using the equals methods which checks if the two references point to the same object. The definition of ===
depends on the context/object. For Spark , ===
is using the equalTo
method.
In Pyspark, you make use of =
or ==
.
Having said that, in Pyspark you do following implementations to get same result per your Scala
code -
df.filter("Brand = 'BRAND'")
Or,
df.filter(df.Brand == 'BRAND')
Or,
df.filter(df["Brand"] == 'BRAND')
Or,
from pyspark.sql.functions import *
df.filter(col("Brand") == 'BRAND')