I was recently presented with a challenge by a customer. They had data stored in CSV files and as part of a nightly process they added each new file to a master file which could be used in testing. This worked fine until someone added a new column to the incoming data and their databricks routine failed as the files no longer had a consistent schema.
We needed a way to find a dynamic solution so that their could routine handle different schemas in the files without hardcoding column names.
Consider the following two datasets:
Date | Region | Location | Tide | TimeUTC | Height |
2020-01-27 | South | Portsmouth | High | 0036 | 4.55 |
2020-01-27 | South | Portsmouth | Low | 0603 | 1.02 |
2020-01-27 | South | Portsmouth | High | 1243 | 4.54 |
2020-01-27 | South | Portsmouth | Low | 1821 | 0.85 |
2020-01-28 | South | Portsmouth | High | 0114 | 4.52 |
2020-01-28 | South | Portsmouth | Low | 0638 | 1.08 |
2020-01-28 | South | Portsmouth | High | 1319 | 4.48 |
2020-01-28 | South | Portsmouth | Low | 1855 | 0.94 |
Date | ReadingId | Location | Tide | TimeUTC | Height | TideNumber |
2020-01-29 | 13e5 | Portsmouth | High | 0151 | 4.47 | 1 |
2020-01-29 | 5k32 | Portsmouth | Low | 0712 | 1.19 | 2 |
2020-01-29 | 7jsn | Portsmouth | High | 1354 | 4.38 | 1 |
2020-01-29 | 45m3 | Portsmouth | Low | 1927 | 1.08 | 2 |
If you try to union these two datasets in Databricks you will get an error message:
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 6 columns and the second table has 7 columns
So, how can we programmatically create a dataset when we don’t know the names of the columns in all of the files?
First, lets creat the data frames with the data from the two files:
dfOriginal = spark.read.format('csv').options(header='true', inferSchema='true').load('/mnt/testdata/TestFile1.csv')
dfNew = spark.read.format('csv').options(header='true', inferSchema='true').load('/mnt/testdata/TestFile2.csv')
The next thing we needed to do is import the lit function from pyspark.sql.functions. This will allow us to create a column with a literal value.
from pyspark.sql.functions import lit
Now lets iterate the columns in the new file using df.columns, and for each column that does not exist in the original file, lets add that to the original data frame. We’ll set the value to null.
#Add new columns from the New File that were not in the Original File
for column in [column for column in dfNew.columns if column not in dfOriginal.columns]:
dfOriginal = dfOriginal.withColumn(column, lit(None))
Then lets repeat that to include any columns that existed originally but that might have been removed.
#Add Columns from the Original File into the New File that are missing
for column in [column for column in dfOriginal.columns if column not in dfNew.columns]:
dfNew = dfNew.withColumn(column, lit(None))
And now we can union the two data frames. But before we get carried away, if we use the .union function we’ll end up with a mis-match of column data as it will union on the column ordinal. We can ensure that we retain all the right data in the right columns by using uiononByName. If you followed along, you should now have a dataset that looks like this:
Date | Region | Location | Tide | TimeUTC | Height | ReadingId | TideNumber |
2020-01-27 | South | Portsmouth | High | 0036 | 4.55 | null | null |
2020-01-27 | South | Portsmouth | Low | 0603 | 1.02 | null | null |
2020-01-27 | South | Portsmouth | High | 1243 | 4.54 | null | null |
2020-01-27 | South | Portsmouth | Low | 1821 | 0.85 | null | null |
2020-01-28 | South | Portsmouth | High | 0114 | 4.52 | null | |
2020-01-28 | South | Portsmouth | Low | 0638 | 1.08 | null | null |
2020-01-28 | South | Portsmouth | High | 1319 | 4.48 | null | null |
2020-01-28 | South | Portsmouth | Low | 855 | 0.94 | null | null |
2020-01-29 | null | Portsmouth | High | 0151 | 4.47 | 13e5 | 1 |
2020-01-29 | null | Portsmouth | Low | 0712 | 1.19 | 5k32 | 2 |
2020-01-29 | null | Portsmouth | High | 1354 | 4.38 | 7jsn | 1 |
2020-01-29 | null | Portsmouth | Low | 1927 | 1.08 | 45m3 | 2 |