cancel
Showing results for 
Search instead for 
Did you mean: 

Find everything you need to get certified on Fabric—skills challenges, live sessions, exam prep, role guidance, and more. Get started

Reply
DebbieE
Community Champion
Community Champion

Fabric Notebooks. Get latest file from bronze Data Lake into Dataframe

I have a bronze lake house and files

 

S1File_19012018.csv

S2File_13052019.csv

S3File_01112019.csv

 

And A new file drops in

S4File_12022020.csv

 

What I want to do it pull through this latest file only (Obviously I cant hard code it because i dont know what the last file will be)

And then run through the transformations and the append the data into the Parquet file in the Silver lakehouse that already contains the files we have already captured

 

I feel like I need a parameter to dynamically get that last file. But Im not quite sure how would be the best way to do it. Has anyone got tips or could point me towards proper documentation for this?

17 REPLIES 17
v-huijiey-msft
Community Support
Community Support

Hi @DebbieE ,

 

Thanks for the reply from frithjof_v .

 

You can use Python's built-in os module to list all the files in a directory and then sort them by creation time. The file with the latest creation time is your latest file.

 

Once you have the latest file, you can use pandas to read the file.

 

After reading the file, you can perform transformations on the data. This depends on the specific transformation you want to apply.

 

Finally, you can use the pyarrow library to append the transformed data to the existing Parquet file.

 

Below I have provided a sample Python code for these operations:

import os
import pandas as pd
import pyarrow.parquet as pq

# Get the list of all files in your directory
files = os.listdir('/path/to/your/directory')

# Get the latest file
latest_file = max(files, key=os.path.getctime)

# Read the latest file
df = pd.read_csv('/path/to/your/directory/' + latest_file)

# Perform your transformations here
# df = transform(df)

# Append to existing Parquet file
table = pq.Table.from_pandas(df)
pq.write_to_dataset(table, root_path='/path/to/your/parquet/file', partition_cols=['date'])

 

Please replace '/path/to/your/directory' and '/path/to/your/parquet/file' with the actual path to your directory.

 

If you have any further questions, please feel free to contact me.

 

Best Regards,
Yang
Community Support Team

 

If there is any post helps, then please consider Accept it as the solution  to help the other members find it more quickly.
If I misunderstand your needs or you still have problems on it, please feel free to let us know. Thanks a lot!

You can use Python's built-in os module to list all the files in a directory and then sort them by creation time. The file with the latest creation time is your latest file.

 

I wouldnt know how to go about this. i would need more detail 

 

I wouldnt mind trying to attempt this without using Pandas. I havent needed to do this yet

frithjof_v
Impactful Individual
Impactful Individual

I was able to get the latest modified file from a Lakehouse directory by using this code (ChatGPT and Google helped me).

 

I used Python's os module, as it seems mssparkutils' ls-method doesn't return the created-time or modified-time of the files in a directory.

 

I guess this code will only work if the Notebook's mounted Lakehouse is the Lakehouse where the raw files (bronze files) reside. At least, I only know how to use os module with the Notebook's mounted Lakehouse.

 

 

 

import os
from datetime import datetime
import pandas as pd

# Directory's relative path for Spark (you can copy this from the Fabric user interface)
relative_path_for_spark = "Files/Daily/"
# Directory's File API path (you could also copy this from the Fabric user interface)
file_api_path = '/lakehouse/default/' + relative_path_for_spark 

# Get the list of all files in your directory
files = os.listdir(file_api_path)

# Initialize lists to store file names and last modified times
file_names = []
last_modified_times = []

# Iterate over the list of files
for file in files:
    # Get the full path of the file
    file_path = os.path.join(file_api_path, file)
    
    # Ensure it is a file
    if os.path.isfile(file_path):
        # Get the last modified time
        modified_timestamp = os.path.getmtime(file_path)
        
        # Convert to a human-readable format
        last_modified_date = datetime.fromtimestamp(modified_timestamp)
        
        # Append to lists
        file_names.append(file)
        last_modified_times.append(last_modified_date)

# Create a Pandas DataFrame
df = pd.DataFrame({
    'File Name': file_names,
    'Last Modified Time': last_modified_times
})

# Print the Pandas DataFrame (you can remove this step)
print(df)

# Sort the Pandas DataFrame by 'Last Modified Time' in descending order
df_sorted = df.sort_values(by='Last Modified Time', ascending=False)

# Choose the file name of the first row in the sorted Pandas dataframe (as this is the file which was modified latest)
latest_file_name = df_sorted['File Name'].values[0]

# Create a Spark dataframe with the content of the CSV file
df_file_content = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(relative_path_for_spark + latest_file_name)
display(df_file_content)

# Then write the dataframe contents to somewhere...

 

 

 

 

There is also some interesting information about mounting Lakehouses dynamically in a Notebook in this blog post: How To Mount A Lakehouse and Identify Mounted Lakehouses in Fabric

 

 

Here is another thread about looping files in a directory:

Solved: Re: Fabric Notebook how do I loop files in a folde... - Microsoft Fabric Community

 

A user suggested a logic like below. This does not require the use of the os module. However, I think it reads all the content of all files in the folder, which I guess may cause some overhead, depending on your situation:

# path
abfss_path = "<abfss path to the folder which contains the files>"

# read each CSV file in the folder
df_files = spark.read.option("header", "true").csv(abfss_path).select("*", "_metadata.file_name","_metadata.file_modification_time")

display(df_files)

 

Im finding this all incredibily difficult to follow to be honest. 

 

Ill see if I can get my head round it but I can't now even see how its answering my question. I think Im more confused than when I started 

Perhaps if you could prefix your file names with the timestamp in a format such as "yyyy-MM-dd HH-mm-ss" it will be easier to sort them and you could use either mssparkutils or os module for listing all the files in the folder.

 

 

(Perhaps you could move the files you have processed to another folder, or change their name, in order to keep track of which files have already been processed, if relevant. But as previously mentioned, I don't have experience with this.)

its really not enough for me to go on unfortunately. 

 

you could use either mssparkutils or os module for listing all the files in the folder.

 

I wouldnt have a clue how to do this. Even Copilot couldnt help me.

My files do have dates on them at the end of the file name

What I want to do is add the processed file names to a Parquet file. Along with a modified date and somehow use this (I have  created the file at least) I think thats the way I want to go, but I don't know what my next step is.

 

I can't understand why there arent any clear instructions out there somewhere for this situation, because Im pretty sure its one that a lot of people have to do on a regular basis. 

frithjof_v
Impactful Individual
Impactful Individual

I see. The code samples I have included, are different ways of identifying the file which was last modified in a folder. 

 

---

 

Yeah I'm not sure what is the usual way of doing this.

 

Perhaps, as you mention, create a table (or file) which contains the names of all files which have already been processed. And make the notebook use that table as a lookup table so it knows which files it already has processed. Make the notebook compare the list of all the files in your bronze folder vs. the table which contains the names of the already processed files. Filter away (anti join) the file names which already have been processed. Then loop through the remaining list of files which have not already been processed, and for each file process (transform) the data content as required and write the transformed data content to the silver layer table, and after finished processing a file insert the file name as a new entry in the table which keeps a record of which files have been processed.

 

And/or have the notebook move each file to another folder, after the notebook has finished processing the file.

 

And/or have the notebook rename each file slightly (add a certain prefix or suffix) after the notebook has finished processing the file.

 

And/or have the notebook add the source file name as a string in an extra column in the target table in the silver layer, and also a "processedTime" timestamp column, along with the processed (transformed) data content from the csv file.

Ref. https://microsoftlearning.github.io/mslearn-fabric/Instructions/Labs/03b-medallion-lakehouse.html#tr...

 

---

 

To detect when a new file is added to a folder, and run a pipeline based on that event, there is the storage event trigger which could potentially be of use in your case. However this is a preview feature currently: https://learn.microsoft.com/en-us/fabric/data-factory/pipeline-storage-event-triggers

 

---

 

How do the files enter your Lakehouse folder? If there is a process (e.g. a data pipeline) which enter the files into your folder, then could the processing (transformation) and appending the data to silver be run as a part of the same pipeline? Then you won't have an issue with identifying which file to process.

 

---

 

Here is some general guidance from Microsoft regarding medallion architecture: https://learn.microsoft.com/en-us/fabric/onelake/onelake-medallion-lakehouse-architecture

 

Recommendation is to use Delta table in both Silver and Gold layer.

 

---

I hope someone with experience with this kind of process can come with suggestions based on experience 😃

I did a test which seems to work:

 

In my Bronze lakehouse, I created a folder which receives a new csv file every 10 minutes. 

 

frithjof_v_0-1720271689537.png

 

I also created a table in my Bronze lakehouse, called processed_files_log, where I keep a log of the csv files which have already been processed and loaded into a table in my Silver Lakehouse.

 

frithjof_v_0-1720280327145.png

 

 

I have pasted the Notebook code below. The notebook runs every 30 minutes to process new files and load the content to the Silver table. The notebook also enters the processed files' filenames into the log table.

 

ChatGPT helped me with creating this code, and I don't claim to fully understand all the details of the code. 

 

Step 1 is to list the files in the folder

 

 

# abfss path to the folder where the csv files are located 
files_path = '<insert abfss path to the folder which keeps the csv files>'

# the mssparkutils.fs.ls method lists all the files (and/or subfolders) inside the folder. I only have csv files in the folder.
files = mssparkutils.fs.ls(files_path)

# Convert FileInfo objects to list of tuples (this creates a list of the file names in the folder.)
file_data = [(file.name,) for file in files]

# This creates a dataframe consisting of the file names in the folder
df_files_in_folder = spark.createDataFrame(file_data, ["name"])

# Show the DataFrame (this step can be removed)
display(df_files_in_folder)

 

 

 

Step 2 is to list all the files which have already been processed

 

 

# This creates a dataframe with the file names of all the files which have already been processed
df_already_processed = spark.sql("SELECT * FROM Lakehouse_bronze.processed_files_log")

# Show the DataFrame (this step can be removed)
display(df_already_processed)

 

 

 

Step 3 is to find out which files in the folder have not been processed yet

 

 

# Selecting only the filename column from df_already_processed
df_already_processed_filenames = df_already_processed.select("filename")

# Selecting only the name column from df_files_in_folder
df_files_in_folder_names = df_files_in_folder.select("name")

# Performing subtract operation to find non-matching rows 
# (so only the file names of the files which have not been processed already are kept)
df_to_process = df_files_in_folder_names.subtract(df_already_processed_filenames)

# Showing the resulting DataFrame of files which have not yet been processed (this step can be removed)
display(df_to_process)

 

 

 

Step 4 is the code to process the files which have not already been processed, load the content into the silver table, and create new entries into the log table

 

 

from pyspark.sql.functions import current_timestamp, lit, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from datetime import datetime

# Loop through the dataframe which consist of the filenames
for row in df_to_process.rdd.collect(): # Collecting to driver (local) as a list of Rows 
    # Extract filename from the current row
    filename = row["name"]
    # Read the current csv file into a dataframe
    df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(files_path + filename).select("OrderDateTime", "OrderID", "ProductID", "Quantity")
    # Add filename column to the dataframe
    df = df.withColumn("source_filename", lit(filename))
    tidsstempel = current_timestamp()
    # Add current timestamp column ("source_processedTime") to the dataframe
    df = df.withColumn("source_processedTime", tidsstempel)
    # Append the dataframe to the table in Silver lakehouse
    df.write.format("delta").mode("append").save('<Insert the abfss path to your silver layer Lakehouse table here>')
    # Create a single-row DataFrame with the filename (this will be appended to the log table)
    single_row = [[filename]]
    single_row_schema = StructType([
        StructField("filename", StringType(), False)
    ])
    df_log = spark.createDataFrame(single_row, single_row_schema)
    # Add the processedTime to the single-row DataFrame which will be inserted into the log table
    df_log = df_log.withColumn("processedTime", tidsstempel)
    # Insert the filename and the processedTime into the log table
    df_log.write.mode("append").saveAsTable("processed_files_log")

 

 

 

The raw csv files in Bronze lakehouse look like this:

frithjof_v_2-1720282445245.png

 

The destination table in the Silver lakehouse looks like this:

frithjof_v_1-1720281739363.png

This table contains the data from the csv files, in addition to the source_filename and the time when the source csv file was processed by the Notebook and loaded to the table (source_processedTime).

 

(If someone notice: The raw csv files just contain dummy data, this is the reason why the OrderID's are repeating between 1-5 for all the csv files.).

 

 

If someone have suggestions and/or corrections to this approach and code, please share 😀

Thank you so much for this. Its got me so much more forward. i had to add this in Step 4 

load(files_path +"/" + filename)
 
To get the / between the folder andf the file name
 
What I want to do is change this to append each none processed file into a dataframe first. Before going on to do the rest of y processing on all the files in the data frame
 
So Im hoping to change this code to append into a dataframe if possible
DebbieE
Community Champion
Community Champion

Done it. I created an Empty dataframe and then appended into it in the loop you suggested

 

dfapp = dfapp.union(dfnf)

Hi @DebbieE ,

 

Thank @frithjof_v  for your strong support for this case and made great contributions!

 

I am glad to hear you solve the problem. If you can, please answer your answer to you to accept it as a solution, which will provide a lot of help to users who encounter similar problems.

 

Best Regards,
Yang
Community Support Team

 

If there is any post helps, then please consider Accept it as the solution  to help the other members find it more quickly.
If I misunderstand your needs or you still have problems on it, please feel free to let us know. Thanks a lot!

Here is another method, which only uses mssparkutils and Spark dataframe (not using os or Pandas dataframe).

 

A benefit of using mssparkutils is that we can use the abfss path, so it means this should work regardless of which Lakehouse is mounted in the Notebook. 

 

While the Fabric documentation for mssparkutils doesn't mention the file.modifyTime attribute, it is mentioned in Azure Synapse Analytics' documentation for mssparkutils. And it seems to work also in Fabric. I'm not quite sure about the difference between using mssparkutils in Azure Synapse Analytics vs. Fabric. So I'm not sure if the file.modifyTime attribute is supported in Fabric. However, it seems to work.

 

Microsoft Spark Utilities (MSSparkUtils) for Fabric - Microsoft Fabric | Microsoft Learn

Introduction to Microsoft Spark utilities - Azure Synapse Analytics | Microsoft Learn

 

Again, ChatGPT helped me with the code:

 

from pyspark.sql import Row
from pyspark.sql.functions import col
from datetime import datetime
files = mssparkutils.fs.ls('<abfss path to the folder where the files are saved>')

# Create a Spark dataframe
schema = "name STRING, isDir BOOLEAN, isFile BOOLEAN, path STRING, size BIGINT, modifyTime BIGINT, modifyTimeHuman STRING"
df = spark.createDataFrame([], schema)

# Iterate over the list of files
for file in files:
    name = file.name
    is_dir = file.isDir
    is_file = file.isFile
    path = file.path
    size = file.size
    modify_time = file.modifyTime
    modify_time_human = datetime.fromtimestamp(file.modifyTime/1000)

    # Create a Row object or a tuple
    row = Row(name=name, isDir=is_dir, isFile=is_file, path=path, size=size, modifyTime=modify_time, modifyTimeHuman=modify_time_human)

    # Append the row to the DataFrame
    df = df.union(spark.createDataFrame([row]))

# Filter away rows where IsDir is True
df_filtered = df.filter(~col('IsDir'))

# Sort the DataFrame by ModifyTime in descending order (this is used to get the latest file)
df_sorted = df_filtered.orderBy(col('ModifyTime').desc())

# Get the path value of the top row after sorting (this is the latest file)
top_path = df_sorted.select('Path').first()[0]

# Display the dataframe which lists the files in the folder (this step can be deleted)
display(df_sorted)
print(top_path)

# Create a Spark dataframe with the content of the CSV file
df_file_content = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(top_path)
display(df_file_content)

# Then write the dataframe contents to somewhere...

 

 

 

However, for small dataframes (like the dataframe containing the list of all files in a folder), I guess maybe Pandas dataframe is better suited than Spark dataframe?

@DebbieE do you want to write to a parquet file or a delta table? In general, my impression is that the delta table is the preferred format in Fabric. So I'm curious if there is a specific reason to write to a parquet file only.

Parquet because its just the transformation data in the silver layer. I have Delta parquet in the gold layer as dims and facts

frithjof_v
Impactful Individual
Impactful Individual

Thank you @DebbieE,

 

I have limited experience with medallion architecture in Fabric. 

 

It's interesting to hear how others solve this 😃

frithjof_v
Impactful Individual
Impactful Individual

Maybe something like this:

https://youtu.be/Uu5_Qeo0sfg?si=ytP2X5NjoXz8iqNt

 

Maybe you can use some functions for getting information about files in Fabric Lakehouse directory from here (e.g. list files):

https://learn.microsoft.com/en-us/fabric/data-engineering/microsoft-spark-utilities

 

In general I think you could use some directory function to list all the files in the directory, put this information in a dataframe, then sort the dataframe according to some attribute like file created timestamp / last modified timestamp / file name (or substring from file name), depending on which attribute you consider to be most relevant for determining what is the latest file in your context.

 

And then select the first or last row from the dataframe, depending on the sort order you chose.

 

And then select the value of the path attribute (path column) from the selected row, to get the path of the latest file.

 

 

However: could there sometimes be situations where you will need to load not only the latest, but the two latest (or more) files? If more than one file exists which has not already been loaded into silver?

Then you would need some mechanism to keep track of which files have already been loaded to silver.

 

I must admit I don't have so much experience with this case. Hopefully someone more experienced can tell how this is usually done 😃

The youtube is on databricks and I think i would be more comfortable following one on Fabric

 

And having a look at the spark utilities. I dont think Im going to get very far with this unfortunately, but I agree that thats definitely what Im wanting to do.

 

And yes, Eventually I want to be able to trigger either delta or full load

Helpful resources

Announcements
FabricCarousel_June2024

Fabric Monthly Update - June 2024

Check out the June 2024 Fabric update to learn about new features.

PBI_Carousel_NL_June

Fabric Community Update - June 2024

Get the latest Fabric updates from Build 2024, key Skills Challenge voucher deadlines, top blogs, forum posts, and product ideas.