Home > Net >  PySpark DataFrame showing different results when using .select()
PySpark DataFrame showing different results when using .select()

Time:08-24

Why is .select() showing/parsing values differently to I don't use it?

I have this CSV:

CompanyName, CompanyNumber,RegAddress.CareOf,RegAddress.POBox,RegAddress.AddressLine1, RegAddress.AddressLine2,RegAddress.PostTown,RegAddress.County,RegAddress.Country,RegAddress.PostCode,CompanyCategory,CompanyStatus,CountryOfOrigin,DissolutionDate,IncorporationDate,Accounts.AccountRefDay,Accounts.AccountRefMonth,Accounts.NextDueDate,Accounts.LastMadeUpDate,Accounts.AccountCategory,Returns.NextDueDate,Returns.LastMadeUpDate,Mortgages.NumMortCharges,Mortgages.NumMortOutstanding,Mortgages.NumMortPartSatisfied,Mortgages.NumMortSatisfied,SICCode.SicText_1,SICCode.SicText_2,SICCode.SicText_3,SICCode.SicText_4,LimitedPartnerships.NumGenPartners,LimitedPartnerships.NumLimPartners,URI,PreviousName_1.CONDATE, PreviousName_1.CompanyName, PreviousName_2.CONDATE, PreviousName_2.CompanyName,PreviousName_3.CONDATE, PreviousName_3.CompanyName,PreviousName_4.CONDATE, PreviousName_4.CompanyName,PreviousName_5.CONDATE, PreviousName_5.CompanyName,PreviousName_6.CONDATE, PreviousName_6.CompanyName,PreviousName_7.CONDATE, PreviousName_7.CompanyName,PreviousName_8.CONDATE, PreviousName_8.CompanyName,PreviousName_9.CONDATE, PreviousName_9.CompanyName,PreviousName_10.CONDATE, PreviousName_10.CompanyName,ConfStmtNextDueDate, ConfStmtLastMadeUpDate
"ATS CAR RENTALS LIMITED","10795807","","",", 1ST FLOOR ,WESTHILL HOUSE 2B DEVONSHIRE ROAD","ACCOUNTING FREEDOM","BEXLEYHEATH","","ENGLAND","DA6 8DS","Private Limited Company","Active","United Kingdom","","31/05/2017","31","5","28/02/2023","31/05/2021","TOTAL EXEMPTION FULL","28/06/2018","","0","0","0","0","49390 - Other passenger land transport","","","","0","0","http://business.data.gov.uk/id/company/10795807","","","","","","","","","","","","","","","","","","","","","12/06/2023","29/05/2022"
"ATS CARE LIMITED","10393661","","","UNIT 5 CO-OP BUILDINGS HIGH STREET","ABERSYCHAN","PONTYPOOL","TORFAEN","WALES","NP4 7AE","Private Limited Company","Active","United Kingdom","","26/09/2016","30","9","30/06/2023","30/09/2021","UNAUDITED ABRIDGED","24/10/2017","","0","0","0","0","87900 - Other residential care activities n.e.c.","","","","0","0","http://business.data.gov.uk/id/company/10393661","17/05/2018","ATS SUPPORT LIMITED","22/12/2017","ATS CARE LIMITED","","","","","","","","","","","","","","","","","09/10/2022","25/09/2021"

I'm reading the csv like so:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
_file = "/path/dir/BasicCompanyDataAsOneFile-2022-08-01.csv"
df = spark.read.csv(_file, header=True, quote='"', escape="\"")

Focusing on the CompanyCategory column, we should see Private Limited Company for both lines. But this is what I get instead when using select():

df.select("CompanyCategory").show(truncate=False)

 ----------------------- 
|CompanyCategory        |
 ----------------------- 
|DA6 8DS                |
|Private Limited Company|
 ----------------------- 
df.select("CompanyCategory").collect()

[Row(CompanyCategory='DA6 8DS'),
 Row(CompanyCategory='Private Limited Company')]

vs when not using select():

from pprint import pprint
for row in df.collect():
    pprint(row.asDict())
{' CompanyNumber': '10795807',
 ...
 'CompanyCategory': 'Private Limited Company',
 'CompanyName': 'ATS CAR RENTALS LIMITED',
 ...}
{' CompanyNumber': '10393661',
...
 'CompanyCategory': 'Private Limited Company',
 'CompanyName': 'ATS CARE LIMITED',
...}

Using asDict() for readability.

SQL doing the same thing:

df.createOrReplaceTempView("companies")
spark.sql('select CompanyCategory from companies').show()

 -------------------- 
|     CompanyCategory|
 -------------------- 
|Private Limited C...|
|             DA6 8DS|
 -------------------- 

As you can see when not using select() the CompanyCategory values are showing correctly. Why is this happening? What can I do to avoid this?

Context: I'm trying to creating dimension tables which is why I'm selecting a single column. The next phase is to drop duplicates, filter, sort, etc.

Edit:

Here are two example values in the actual CSV that are throwing things off:

  • CompanyName of """ BORA "" 2 LTD"
  • 1st line address of ", 1ST FLOOR ,WESTHILL HOUSE 2B DEVONSHIRE ROAD"

Note:

  • These values from two separate distinct lines in the CSV.
  • These values are copy and pasted from the CSV opened in text editor like Notepad or VSCode).

Tried and failed:

  1. df = spark.read.csv(_file, header=True) - completely picks up incorrect column.
  2. df = spark.read.csv(_file, header=True, escape='\"') - exact same thing described in original question above. So same results.
  3. df = spark.read.csv(_file, header=True, escape='""') - since the CSV escapes quotes using two double quotes, then I guess using two double quotes as escape param would do the trick? But getting following error:
Py4JJavaError: An error occurred while calling o276.csv.
: java.lang.RuntimeException: escape cannot be more than one character

CodePudding user response:

When reading the csv, the parameters quote and escape are set to the same value ('"'=="\"" returns True in Python).

I would guess that configuring both parameters in this way will somehow disturb the parser that Spark uses to separate the single fields. After removing the escape parameter you can process the remaining " with regexp_replace:

from pyspark.sql import functions as F

df = spark.read.csv(<filename>, header=True, quote='"')
cols = [F.regexp_replace(F.regexp_replace(
    F.regexp_replace("`"   col   "`", '^"', ''), 
    '"$', ''), '""', '"').alias(col) for col in df.columns]
df.select(cols).show(truncate=False)

Probably there is a smart regexp that can combine all three replace operations into one...

CodePudding user response:

This is an issue when reading a single column from CSV file vs. when reading all the columns:

df = spark.read.csv('companies-house.csv', header=True, quote='"', escape="\"")
df.show()  # correct output (all columns loaded)
df.toPandas()  # same as pd.read_csv()

df.select('CompanyCategory').show()  # wrong output (trying to load a single column)

df.cache()  # all columns will be loaded and used in any subsequent call
df.select('CompanyCategory').show()  # correct output

The first select() performs a different (optimized) read than the second, so one possible workaround would be to cache() the data immediately. This will however load all the columns, not just one (although pandas and COPY do the same).

The problematic part of the CSV is the RegAddress.POBox column where empty value is saved as ", instead of "",. You can check this by incrementally loading more columns:

df.unpersist()  # undo cache() operation (for testing purposes only)
df.select(*[f"`{c}`" for c in df.columns[:3]], 'CompanyCategory').show()  # wrong
df.select(*[f"`{c}`" for c in df.columns[:4]], 'CompanyCategory').show()  # OK
  • Related