Merging CSV files with schema drift in Databricks

I was recently presented with a challenge by a customer. They had data stored in CSV files and as part of a nightly process they added each new file to a master file which could be used in testing. This worked fine until someone added a new column to the incoming data and their databricks routine failed as the files no longer had a consistent schema.

We needed a way to find a dynamic solution so that their could routine handle different schemas in the files without hardcoding column names.

Consider the following two datasets:

DateRegionLocationTideTimeUTCHeight
2020-01-27SouthPortsmouthHigh00364.55
2020-01-27SouthPortsmouthLow06031.02
2020-01-27SouthPortsmouthHigh12434.54
2020-01-27SouthPortsmouthLow18210.85
2020-01-28SouthPortsmouthHigh01144.52
2020-01-28SouthPortsmouthLow06381.08
2020-01-28SouthPortsmouthHigh13194.48
2020-01-28SouthPortsmouthLow18550.94
Dataset 1: Tidal Data 27/28 Jan 2020
DateReadingIdLocationTideTimeUTCHeightTideNumber
2020-01-2913e5PortsmouthHigh01514.471
2020-01-295k32PortsmouthLow07121.192
2020-01-297jsnPortsmouthHigh13544.381
2020-01-2945m3PortsmouthLow19271.082
Dataset2: Tidal Data 29 Jan 2020

If you try to union these two datasets in Databricks you will get an error message:

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 6 columns and the second table has 7 columns

So, how can we programmatically create a dataset when we don’t know the names of the columns in all of the files?

First, lets creat the data frames with the data from the two files:

dfOriginal = spark.read.format('csv').options(header='true', inferSchema='true').load('/mnt/testdata/TestFile1.csv')

dfNew = spark.read.format('csv').options(header='true', inferSchema='true').load('/mnt/testdata/TestFile2.csv')

The next thing we needed to do is import the lit function from pyspark.sql.functions. This will allow us to create a column with a literal value.

from pyspark.sql.functions import lit

Now lets iterate the columns in the new file using df.columns, and for each column that does not exist in the original file, lets add that to the original data frame. We’ll set the value to null.

#Add new columns from the New File that were not in the Original File
for column in [column for column in dfNew.columns if column not in dfOriginal.columns]:
    dfOriginal = dfOriginal.withColumn(column, lit(None))

Then lets repeat that to include any columns that existed originally but that might have been removed.

#Add Columns from the Original File into the New File that are missing 
for column in [column for column in dfOriginal.columns if column not in dfNew.columns]:
    dfNew = dfNew.withColumn(column, lit(None))

And now we can union the two data frames. But before we get carried away, if we use the .union function we’ll end up with a mis-match of column data as it will union on the column ordinal. We can ensure that we retain all the right data in the right columns by using uiononByName. If you followed along, you should now have a dataset that looks like this:

DateRegionLocationTideTimeUTCHeightReadingIdTideNumber
2020-01-27SouthPortsmouthHigh00364.55nullnull
2020-01-27SouthPortsmouthLow06031.02nullnull
2020-01-27SouthPortsmouthHigh12434.54nullnull
2020-01-27SouthPortsmouthLow18210.85nullnull
2020-01-28SouthPortsmouthHigh01144.52null 
2020-01-28SouthPortsmouthLow06381.08nullnull
2020-01-28SouthPortsmouthHigh13194.48nullnull
2020-01-28SouthPortsmouthLow8550.94nullnull
2020-01-29nullPortsmouthHigh01514.4713e51
2020-01-29nullPortsmouthLow07121.195k322
2020-01-29nullPortsmouthHigh13544.387jsn1
2020-01-29nullPortsmouthLow19271.0845m32

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.