Validate CSV files before ingestion in Microsoft Fabric Data Factory Pipelines
Published Jun 05 2024 02:31 PM 3,473 Views
Microsoft

A very common task for Microsoft Fabric, Azure Data Factory and Synapse Analytics Pipelines is to receive unstructured files, land them in an Azure Data Lake (ADLS Gen2) and load them into structured tables. This often leads to a very common issue with unstructured files when “SOMETHING HAS CHANGED” and the unstructured file does not meet the defined table format. If issues are not handled properly within the pipeline, the data workloads will fail and users will be asking "WHERE'S MY DATA???" You then need to communicate with the owner of the file, have them fix the issues, then rerun the pipeline after the issues have been fixed. Along with unhappy users, rerunning failed pipelines adds cost. Validating these files before they are processed allows your pipelines to continue ingesting files that do have the correct format. For pipelines that do fail, your code or process can pinpoint what caused the error, leading to faster resolution of the issue. In this blog, we'll walk through a Microsoft Fabric Data Factory Pipeline that validates incoming CSV files for common errors before loading to a Microsoft Fabric Lakehouse delta table.

 

Overview

jehayes_0-1717529080351.jpeg

 

This source files in this process are in an Azure Data Lake storage account, which has a shortcut in the Fabric Lakehouse. A data pipeline calls a Spark notebook to check the file for any new or missing columns, any invalid data for the expected data type, or any duplicate key values. If the file has no errors, the pipeline loads the CSV data into a parquet file and then calls another Spark notebook to load the parquet file into a delta table in the Lakehouse. Otherwise if there are errors in the file, the pipeline sends a notification email.

 

Source files and metadata files

jehayes_0-1717533337972.png

In this solution, files are landing in an ADLS Gen 2 container folder called scenario1-validatecsv which has a shortcut to it in the Fabric Lakehouse. The files folder contains the files to process; the metadata folder contains a file describing the format each CSV file type should conform to. 

jehayes_1-1717533575439.png

 

This solution is to load to a table called customer, which has columns number, name, city and state. In the format definition file, customer_meta, there's a row for each customer table column, providing the column name, the column data type, and whether or not it is a key value. This metadata file is later used in a Spark notebook to validate that the incoming file conforms to this format.

 

Orchestrator Pipeline

jehayes_2-1717533682153.png

The orchestrator pipeline is very simple – since I am running my pipeline as a scheduled batch, it loops through the files folder and invokes another pipeline for each file.  Note the parametrization of the lakehouse path, the source folders, the destination folder and the file format. This allows the same process to be run for any lakehouse and for any file format/table to load.

 

For Each activity

jehayes_3-1717533796550.png

When invoking the child pipeline from the For Each activity, it passes in the parameter values from the orchestrator pipeline plus the name of the current file being processed and the metadata file name, which is the file format name with ‘_meta’ appended to it.

 

Child pipeline - Validate and load pipeline

jehayes_0-1717533924310.png

The Validate and load pipeline validates the current CSV file, and if the file conforms to the format, loads it into a parquet table then merges the parquet data into a delta table.

 

1 - Parameters

Parameters passed in from the orchestrator pipeline for the current CSV file to process

 

2 - Set variable activity - Set parquet file name

Removes .csv to define the parquet file name

jehayes_3-1717534175894.png

 

3- Notebook activity - Validate CSV and load file to parquet 

Calls the notebook, passing in parameter and variable valuesjehayes_4-1717534262831.png

 

Below is the pyspark code for the notebook. It gets the column names and inferred data types from the CSV file as well as the column names, data types and key field names from the metadata file. It checks if the column names match, if the data types match, if there are keys defined for the file, and finally if there are any duplicate key values in the incoming file.  If there were duplicate key fields, it writes the duplicate key values to a file. If the names and data types match and there are no duplicate key values, it writes the file to parquet and passes back the key field names from the metadata file; otherwise, it returns the appropriate error message.

 

 

 

 

 

# Files/landingzone/files parameters
lakehousepath = 'abfss://xxxxx@msit-onelake.dfs.fabric.microsoft.com/xxxxx'
filename = 'customer_good.csv'
outputfilename = 'customer_good'
metadatafilename = 'customer_meta.csv'
filefolder = 'scenario1-validatecsv/landingzone/files'
metadatafolder = 'scenario1-validatecsv/landingzone/metadata'
outputfolder = 'scenario1-validatecsv/bronze'
fileformat = 'customer'

# Import pandas and pyarrow
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# Set path variables
inputfilepath = f'{lakehousepath}/Files/{filefolder}/'
metadatapath = f'{lakehousepath}/Files/{metadatafolder}/'
outputpath =  f'{lakehousepath}/Files/{outputfolder}/'


# Read the text file and the metadata file
print(f'{inputfilepath}{filename}')
data = pd.read_csv(f'{inputfilepath}{filename}')
meta = pd.read_csv(f'{metadatapath}{metadatafilename}')

# only get the column names for the file formattype that was input
meta = meta.loc[meta['formatname'] == fileformat]
print(data.dtypes)
print(list(meta['columname']))

# get any key fields specified
keyfields = meta.loc[meta['iskeyfield'] == 1, 'columname'].tolist()
print(keyfields)

# Check for errors in CSV
haserror = 0
# Check if the column names match
if list(data.columns) != list(meta["columname"]):
    # Issue an error
    result = "Error: Column names do not match."
    haserror = 1
else:
    # Check if the datatypes match
    if list(data.dtypes) != list(meta["datatype"]):
        # Issue an error
        result = "Error: Datatypes do not match."
        haserror = 1
    else:
        # If the file has key fields, check if there are any duplicate keys
        # if there are duplicate keys, also write the duplicate key values to a file
        if keyfields != '':
            checkdups = data.groupby(keyfields).size().reset_index(name='count')
            print(checkdups)
            if checkdups['count'].max() > 1:
                dups = checkdups[checkdups['count'] > 1]
                print(dups)
                haserror = 1
                (dups.to_csv(f'{lakehousepath}/Files/processed/error_duplicate_key_values/duplicaterecords_{filename}',
                mode='w',index=False)) 
                result = 'Error: Duplicate key values'

if haserror == 0:       
   # Write the data to parquet if no errors
    df = spark.read.csv(f"{inputfilepath}{filename}", header=True, inferSchema=True)
    print(f'File is: {inputfilepath}{filename}')
    display(df)
    df.write.mode("overwrite").format("parquet").save(f"{outputpath}{outputfilename}")
    result = f"Data written to parquet successfully. Key fields are:{keyfields} "

mssparkutils.notebook.exit(str(result))

 

 

 

 

 

4 - Copy data activity - Move File to processed folder

This Copy Data activity essentially moves the csv file from the ADLS Gen 2 files folder to a processed folder in the Fabric Lakehouse

jehayes_5-1717534410521.png

 

Destination folder name is derived from the Notebook exit value, which returns success or the error message

jehayes_6-1717534485574.png

5 - If condition activity: If File Validated

Check if the CSV file was successfully validated and loaded to parquet

jehayes_0-1717602483877.png

5a - File validated successfully

If there were no errors in the file, call the spark notebook to merge the parquet file written from the previous pyspark notebook to the delta table. 

jehayes_0-1717606120668.png

 

Parameters for the lakehousepath, the parquet file path and name, the table name, and the key fields passed in. As shown above, the key fields were derived from the previous pyspark notebook and are passed into the Create or Merge to Table notebook.

 

Below is the spark notebook code. If the delta table already exists and there are key fields, it builds a string expression to be used on the pyspark merge statement and then performs the merge on the delta table. If there are no key fields or the table does not exist, it writes or overwrites the delta table.

 

 

 

 

 

 

# create or merge to delta

# input parameters below
lakehousepath = 'abfss://xxxe@yyyy.dfs.fabric.microsoft.com/xxx'
inputfolder = 'scenario1-validatecsv/bronze'
filename = 'customergood'
tablename = 'customer'
keyfields = "['number']"

# define paths
outputpath = f'{lakehousepath}/Tables/{tablename}'
inputpath = f'{lakehousepath}/Files/{inputfolder}/{filename}'

# import delta table and sql functions
from delta.tables import *
from pyspark.sql.functions import *


# get list of key values
keylist = eval(keyfields)
print(keylist)

# read input parquet file
df2 = spark.read.parquet(inputpath)
# display(df2)

# if there are keyfields define in the table, build the merge key expression
if keyfields != None:
    mergekey = ''
    keycount = 0
    for key in keylist:
        mergekey = mergekey + f't.{key} = s.{key} AND '
    mergeKeyExpr = mergekey.rstrip(' AND')
    print(mergeKeyExpr)


# if table exists and if table should be upserted as indicated by the merge key, do an upsert and return how many rows were inserted and updated; 
# if it does not exist or is a full load, overwrite existing table return how many rows were inserted


if DeltaTable.isDeltaTable(spark,outputpath) and mergeKeyExpr is not None:
    deltaTable = DeltaTable.forPath(spark,outputpath)
    deltaTable.alias("t").merge(
        df2.alias("s"),
        mergeKeyExpr
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    history = deltaTable.history(1).select("operationMetrics")
    operationMetrics = history.collect()[0]["operationMetrics"]
    numInserted = operationMetrics["numTargetRowsInserted"]
    numUpdated = operationMetrics["numTargetRowsUpdated"]
else:
    df2.write.format("delta").mode("overwrite").save(outputpath)
    numInserted = df2.count()
    numUpdated = 0
print(numInserted)

result = "numInserted="+str(numInserted)+  "|numUpdated="+str(numUpdated)
mssparkutils.notebook.exit(str(result))

 

 

 

 

 

 

5b - File validation failed

If there was an error in the CSV file, send an email notification

jehayes_0-1717615492052.png

 

 

Summary

Building resilient and efficient data pipelines is critical no matter your ETL tool or data sources. Thinking ahead to what types of problems can, and inevitably will, occur and incorporating data validation into your pipelines will save you a lot of headaches when those pipelines are moved into production. The examples in this blog are just a few of the most common errors with CSV files. Get ahead of those data issues and resolve them without last minute fixes and disrupting other processes! You can easily enhance the methods in this blog by including other validations or validating other unstructured file types like Json. You can change the pipeline to run as soon as the unstructured file is loaded into ADLS rather than in batch. Using techniques like this to reduce hard errors gives your pipelines (and yourself!) more credibility!

 

Co-Authors
Version history
Last update:
‎Jun 06 2024 05:16 AM
Updated by: