Links

parallelize operations on multiple csvs

Tags: #csv #pandas #snippet #read #dataframe #parallel #parallelize #dask #operations
Author: Minura Punchihewa​
Description: This notebook demonstrates how to use Dask to efficiently process and analyze multiple CSV files in parallel.

Input

Imports

import os

Import Graphviz (install if not present)

try:
import graphviz
except:
!pip install --user graphviz
import graphviz

Import Dask (install if not present)

try:
import dask.dataframe as dd
except:
!python -m pip install "dask[complete]"
import dask.dataframe as dd

Variable

folder_path = "nycflights"
​
%env FOLDER_PATH=$folder_path

Download dataset if it does not exists

%%bash
​
[[ -f "$FOLDER_PATH/nycflights.csv" ]] || (mkdir -p $FOLDER_PATH && wget -O $FOLDER_PATH/nycflights.csv https://github.com/vaibhavwalvekar/NYC-Flights-2013-Dataset-Analysis/raw/master/flights.csv )

Model

Read the CSV files from path

# when the actual data types of given columns cannot be inferred from the first few examples
# they need to be specified manually
# this is where the dtype parameters comes in
df = dd.read_csv(
os.path.join(folder_path, "*.csv"),
parse_dates={"Date": [0, 1, 2]},
dtype={
"TailNum": str,
"CRSElapsedTime": float,
"Cancelled": bool,
"dep_delay": float,
},
)

Output

Calculate the max of a column

# no operation is actually performed until the .compute() function is called
df["dep_delay"].max().compute()

Visualize the parallel execution of the operation

# the underlying task graph can be viewed to understand how the parallel execution takes place
df.dep_delay.max().visualize(rankdir="LR", size="12, 12!")

Comparison

Pandas

import pandas as pd
import glob
%%time
# the equivalent operation performed using Pandas
all_files = glob.glob(os.path.join(folder_path,'*.csv'))
dfs = []
for file in all_files:
dfs.append(pd.read_csv(file, parse_dates={'Date': [0, 1, 2]}))
df = pd.concat(dfs, axis=0)
df.dep_delay.max()

Dask

%%time
# the entire operation again performed using Dask
df = dd.read_csv(os.path.join(folder_path,'*.csv'),
parse_dates={'Date': [0, 1, 2]},
dtype={'TailNum': str,
'CRSElapsedTime': float,
'Cancelled': bool,
'dep_delay': float})
df['dep_delay'].max().compute()
​
# Dask clearly performs better in comparison to Pandas
# the performance benefits are more apparent when working on larger datasets
# especially when the size of the data exceeds available memory
​
Last modified 1mo ago