Home > Software engineering >  How can I read multiple csv files and merge them together (they may not have the same columns) using
How can I read multiple csv files and merge them together (they may not have the same columns) using

Time:04-13

My hdfs structure shows a folder named "mapped_files" and within that folder contains several csv files - "mapped_file_1.csv", "mapped_file_2.csv", ... How can I merge all these files? There is a possibility that the files do not have exactly the same columns. For example, when I use pyspark to read the files "mapped_file_1.csv" and "mapped_file_2.csv" they look like this:

###mapped_file_1.csv

 ---------- --------- --------- -------- ----- ----- ----------- ---------- ------------------ --------------------- ------------------- --------------- -------------------- 
|chromosome|    start|      end|assembly|  ref|  alt|risk_allele|     genes|         phenotype|clinical_significance|polyphen_prediction|sift_prediction|                hgvs|
 ---------- --------- --------- -------- ----- ----- ----------- ---------- ------------------ --------------------- ------------------- --------------- -------------------- 
|         9| 96369762| 96369762|    null|  C/T|  C/T|          T|intergenic|Migraine with aura|                 null|               null|           null|          rs59270819|
|        10| 29075768| 29075768|    null|G/A/C|G/A/C|          A|intergenic|Migraine with aura|                 null|               null|           null|          rs59495588|
 ---------- --------- --------- -------- ----- ----- ----------- ---------- ------------------ --------------------- ------------------- --------------- -------------------- 

###mapped_file_2.csv

 ------------ ---------- ---------- ---------- -------------------- ----------- ------------------- --------------- -------------------- -------- ------------- -------- -------- --- --- ----------- -------------------- ---------------------- -------------- --------------------- ---------------- -------------------- -------------------- ---------- -------------------- -------- ----- ---- ------- ---- ------- ------- -------------------- ------- -------------------- ----------------- 
|variant_name|variant_id|chromosome|     genes|        variant_type|description|polyphen_prediction|sift_prediction|                hgvs|assembly|assembly.date|   start|     end|ref|alt|risk_allele|           phenotype|clinical_actionability|classification|clinical_significance|          method|  assertion_criteria|     level_certainty|      date|              author|  origin|title|year|authors|pmid|is_gwas|   name|                 url|version|databanks.variant_id|clinvar_accession|
 ------------ ---------- ---------- ---------- -------------------- ----------- ------------------- --------------- -------------------- -------- ------------- -------- -------- --- --- ----------- -------------------- ---------------------- -------------- --------------------- ---------------- -------------------- -------------------- ---------- -------------------- -------- ----- ---- ------- ---- ------- ------- -------------------- ------- -------------------- ----------------- 
|        null|      null|         1|['TARDBP']|single nucleotide...|       null|               null|           null|['Q13148:p.Ala90V...|  GRCh38|         null|11016874|11016874|  C|  T|          T|Amyotrophic later...|                  null|          null| Uncertain signifi...| literature only|                null|no assertion crit...|2019-07-02|         GeneReviews|germline| null|null|   null|null|   null|ClinVar|https://www.ncbi....|   null|                null|     VCV000021481|
|        null|      null|         1|['TARDBP']|single nucleotide...|       null|               null|           null|['Q13148:p.Ala90V...|  GRCh38|         null|11016874|11016874|  C|  T|          T|Amyotrophic later...|                  null|          null| Uncertain signifi...|clinical testing|Invitae Variant C...|criteria provided...|2019-08-15|             Invitae|germline| null|null|   null|null|   null|ClinVar|https://www.ncbi....|   null|                null|     VCV000021481|
|        null|      null|         1|['TARDBP']|single nucleotide...|       null|               null|           null|['Q13148:p.Ala90V...|  GRCh38|         null|11016874|11016874|  C|  T|          T|Amyotrophic later...|                  null|          null| Uncertain signifi...| literature only|                null|no assertion crit...|2019-07-02|         GeneReviews|germline| null|null|   null|null|   null|ClinVar|https://www.ncbi....|   null|                null|     VCV000021481|
 ------------ ---------- ---------- ---------- -------------------- ----------- ------------------- --------------- -------------------- -------- ------------- -------- -------- --- --- ----------- -------------------- ---------------------- -------------- --------------------- ---------------- -------------------- -------------------- ---------- -------------------- -------- ----- ---- ------- ---- ------- ------- -------------------- ------- -------------------- ----------------- 

From the previous dataframes/files, we can see that there are columns that do not exist in both dataframes/files. I did this:

from pyspark.sql import SparkSession
from pyspark.sql import Row
from functools import reduce
import pyspark.sql.functions as F

warehouse_location ='hdfs://hdfs-nn:9000'

spark = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("csv") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .getOrCreate()

all_data = spark.read.options(header='True', delimiter=';').csv("hdfs://hdfs-nn:9000/mapped_files/*")

all_data.show()

 ------------ ---------- ---------- ---------- -------------------- ----------- ------------------- --------------- -------------------- -------- ------------- -------- ---------- ---- ---- ----------- -------------------- ---------------------- -------------- --------------------- ---------------- -------------------- -------------------- ---------- -------------------- -------- ----- ---- ------- ---- ------- ------- -------------------- ------- -------------------- ----------------- 
|variant_name|variant_id|chromosome|     genes|        variant_type|description|polyphen_prediction|sift_prediction|                hgvs|assembly|assembly.date|   start|       end| ref| alt|risk_allele|           phenotype|clinical_actionability|classification|clinical_significance|          method|  assertion_criteria|     level_certainty|      date|              author|  origin|title|year|authors|pmid|is_gwas|   name|                 url|version|databanks.variant_id|clinvar_accession|
 ------------ ---------- ---------- ---------- -------------------- ----------- ------------------- --------------- -------------------- -------- ------------- -------- ---------- ---- ---- ----------- -------------------- ---------------------- -------------- --------------------- ---------------- -------------------- -------------------- ---------- -------------------- -------- ----- ---- ------- ---- ------- ------- -------------------- ------- -------------------- ----------------- 
|        null|      null|         1|['TARDBP']|single nucleotide...|       null|               null|           null|['Q13148:p.Ala90V...|  GRCh38|         null|11016874|  11016874|   C|   T|          T|Amyotrophic later...|                  null|          null| Uncertain signifi...| literature only|                null|no assertion crit...|2019-07-02|         GeneReviews|germline| null|null|   null|null|   null|ClinVar|https://www.ncbi....|   null|                null|     VCV000021481|
|        null|      null|         1|['TARDBP']|single nucleotide...|       null|               null|           null|['Q13148:p.Ala90V...|  GRCh38|         null|11016874|  11016874|   C|   T|          T|Amyotrophic later...|                  null|          null| Uncertain signifi...|clinical testing|Invitae Variant C...|criteria provided...|2019-08-15|             Invitae|germline| null|null|   null|null|   null|ClinVar|https://www.ncbi....|   null|                null|     VCV000021481|
|        null|      null|         1|['TARDBP']|single nucleotide...|       null|               null|           null|['Q13148:p.Ala90V...|  GRCh38|         null|11016874|  11016874|   C|   T|          T|Amyotrophic later...|                  null|          null| Uncertain signifi...| literature only|                null|no assertion crit...|2019-07-02|         GeneReviews|germline| null|null|   null|null|   null|ClinVar|https://www.ncbi....|   null|                null|     VCV000021481|
|           9|  96369762|  96369762|      null|                 C/T|        C/T|                  T|     intergenic|  Migraine with aura|    null|         null|    null|rs59270819|null|null|       null|                null|                  null|          null|                 null|            null|                null|                null|      null|                null|    null| null|null|   null|null|   null|   null|                null|   null|                null|             null|
|          10|  29075768|  29075768|      null|               G/A/C|      G/A/C|                  A|     intergenic|  Migraine with aura|    null|         null|    null|rs59495588|null|null|       null|                null|                  null|          null|                 null|            null|                null|                null|      null|                null|    null| null|null|   null|null|   null|   null|                null|   null|                null|             null|
 ------------ ---------- ---------- ---------- -------------------- ----------- ------------------- --------------- -------------------- -------- ------------- -------- ---------- ---- ---- ----------- -------------------- ---------------------- -------------- --------------------- ---------------- -------------------- -------------------- ---------- -------------------- -------- ----- ---- ------- ---- ------- ------- -------------------- ------- -------------------- ----------------- 
only showing top 20 rows

When I use the previous code, the values of the columns in common do not appear in the right place (In the last two rows the values are not in the correct columns).

So, my question is: How can I read multiple csv files and merge them together (they may not have the same columns) using pyspark?

CodePudding user response:

The easy way is to add the missing columns to both dataframes and by using a union function. I prefer unionByName, so I'll be using that for my example:

df1 = spark.read.options(header='True', delimiter=';').csv("mapped_file_1.csv")
df2 = spark.read.options(header='True', delimiter=';').csv("mapped_file_2.csv")

united_df = df1.unionByName(df2, allowMissingColumns=True)

allowMissingColumns will complete missing columns in a dataframe with NULLs.

If you have more than 2 files than you can just define a function and use reduce to unite all the dataframes:

def unite_dfs(df1, df2):
  return df1.unionByName(df2, allowMissingColumns=True)


list_of_dfs = [df1, df2, df3, df4, df5, df6]
united_df = reduce(unite_dfs, list_of_dfs)

Let me know if this is clear. I haven't included the imports as I only used libraries that were in your snippet. If it's unclear I can edit.

  • Related