Creer un dashboard PowerBI
Tags: #fec #powerbi #dataviz #analytics #finance
Last update: 2023-04-12 (Created: 2021-08-17)
Description: This notebook provides instructions for creating a PowerBI dashboard to visualize Federal Election Commission (FEC) data.
import pandas as pd
from datetime import datetime, timedelta
import os
import re
import naas
LOGO = "https://landen.imgix.net/e5hx7wyzf53f/assets/26u7xg7u.png?w=400"
COLOR_1 = None
COLOR_2 = None
def get_all_fec(
file_regex,
sep=",",
decimal=".",
encoding=None,
header=None,
usecols=None,
names=None,
dtype=None,
):
# Create df init
df = pd.DataFrame()
# Get all files in INPUT_FOLDER
files = [f for f in os.listdir() if re.search(file_regex, f)]
if len(files) == 0:
print(f"Aucun fichier FEC ne correspond au standard de nomination")
else:
for file in files:
# Open file and create df
print(file)
tmp_df = pd.read_csv(
file,
sep=sep,
decimal=decimal,
encoding=encoding,
header=header,
usecols=usecols,
names=names,
dtype=dtype,
)
# Add filename to df
tmp_df["NOM_FICHIER"] = file
# Concat df
df = pd.concat([df, tmp_df], axis=0, sort=False)
return df
file_regex = "^\d{9}FEC\d{8}.txt"
db_init = get_all_fec(
file_regex, sep="\t", decimal=",", encoding="ISO-8859-1", header=0
)
db_init
Nettoyage des données
db_clean = db_init.copy()
# Selection des colonnes à conserver
to_select = [
"NOM_FICHIER",
"EcritureDate",
"CompteNum",
"CompteLib",
"EcritureLib",
"Debit",
"Credit",
]
db_clean = db_clean[to_select]
# Renommage des colonnes
to_rename = {
"EcritureDate": "DATE",
"CompteNum": "COMPTE_NUM",
"CompteLib": "RUBRIQUE_N3",
"EcritureLib": "RUBRIQUE_N4",
"Debit": "DEBIT",
"Credit": "CREDIT",
}
db_clean = db_clean.rename(columns=to_rename)
# suppression des espaces colonne "COMPTE_NUM"
db_clean["COMPTE_NUM"] = db_clean["COMPTE_NUM"].astype(str).str.strip()
# Mise au format des colonnes
db_clean = db_clean.astype(
{
"NOM_FICHIER": str,
"DATE": str,
"COMPTE_NUM": str,
"RUBRIQUE_N3": str,
"RUBRIQUE_N4": str,
"DEBIT": float,
"CREDIT": float,
}
)
# Mise au format colonne date
db_clean["DATE"] = pd.to_datetime(db_clean["DATE"])
db_clean.head(5)
Enrichissement de la base
db_enr = db_clean.copy()
# Ajout colonnes entité et période
db_enr["ENTITY"] = db_enr["NOM_FICHIER"].str[:9]
db_enr["PERIOD"] = db_enr["NOM_FICHIER"].str[12:-6]
db_enr["PERIOD"] = pd.to_datetime(db_enr["PERIOD"], format="%Y%m")
db_enr["PERIOD"] = db_enr["PERIOD"].dt.strftime("%Y-%m")
# Ajout colonne month et month_index
db_enr["MONTH"] = db_enr["DATE"].dt.strftime("%b")
db_enr["MONTH_INDEX"] = db_enr["DATE"].dt.month
# Calcul de la valeur debit-cr édit
db_enr["VALUE"] = (db_enr["DEBIT"]) - (db_enr["CREDIT"])
db_enr.head(5)
# Calcul résultat pour équilibrage bilan dans capitaux propre
db_rn = db_enr.copy()
db_rn = db_rn[db_rn["COMPTE_NUM"].str.contains(r"^6|^7")]
to_group = ["ENTITY", "PERIOD"]
to_agg = {"VALUE": "sum"}
db_rn = db_rn.groupby(to_group, as_index=False).agg(to_agg)
db_rn["COMPTE_NUM"] = "10999999"
db_rn["RUBRIQUE_N3"] = "RESULTAT"
# Reorganisation colonne
to_select = ["ENTITY", "PERIOD", "COMPTE_NUM", "RUBRIQUE_N3", "VALUE"]
db_rn = db_rn[to_select]
db_rn
Aggrégation RUBRIQUE N3
# Calcul var v = création de dataset avec Period_comp pour merge
db_var = db_enr.copy()
# Regroupement
to_group = ["ENTITY", "PERIOD", "COMPTE_NUM", "RUBRIQUE_N3"]
to_agg = {"VALUE": "sum"}
db_var = db_var.groupby(to_group, as_index=False).agg(to_agg)
# Ajout des résultats au dataframe
db_var = pd.concat([db_var, db_rn], axis=0, sort=False)
# Creation colonne COMP
db_var["PERIOD_COMP"] = (db_var["PERIOD"].str[:4].astype(int) - 1).astype(str) + db_var[
"PERIOD"
].str[-3:]
db_var
Création de la base comparable
db_comp = db_var.copy()
# Suppression de la colonne période
db_comp = db_comp.drop("PERIOD_COMP", axis=1)
# Renommage des colonnes
to_rename = {"VALUE": "VALUE_N-1", "PERIOD": "PERIOD_COMP"}
db_comp = db_comp.rename(columns=to_rename)
db_comp.head(5)
Jointure des 2 tables et calcul des variations
# Jointure entre les 2 tables
join_on = ["ENTITY", "PERIOD_COMP", "COMPTE_NUM", "RUBRIQUE_N3"]
db_var = (
pd.merge(db_var, db_comp, how="left", on=join_on)
.drop("PERIOD_COMP", axis=1)
.fillna(0)
)
# Création colonne Var V
db_var["VARV"] = db_var["VALUE"] - db_var["VALUE_N-1"]
# Création colonne Var P (%)
db_var["VARP"] = db_var["VARV"] / db_var["VALUE_N-1"]
db_var
db_cat = db_var.copy()
# Calcul des rubriques niveau 2
def rubrique_N2(row):
numero_compte = str(row["COMPTE_NUM"])
value = float(row["VALUE"])
# BILAN SIMPLIFIE type IFRS NIV2
to_check = ["^10", "^11", "^12", "^13", "^14"]
if any(re.search(x, numero_compte) for x in to_check):
return "CAPITAUX_PROPRES"
to_check = ["^15", "^16", "^17", "^18", "^19"]
if any(re.search(x, numero_compte) for x in to_check):
return "DETTES_FINANCIERES"
to_check = ["^20", "^21", "^22", "^23", "^25", "^26", "^27", "^28", "^29"]
if any(re.search(x, numero_compte) for x in to_check):
return "IMMOBILISATIONS"
to_check = ["^31", "^32", "^33", "^34", "^35", "^36", "^37", "^38", "^39"]
if any(re.search(x, numero_compte) for x in to_check):
return "STOCKS"
to_check = ["^40"]
if any(re.search(x, numero_compte) for x in to_check):
return "DETTES_FOURNISSEURS"
to_check = ["^41"]
if any(re.search(x, numero_compte) for x in to_check):
return "CREANCES_CLIENTS"
to_check = ["^42", "^43", "^44", "^45", "^46", "^47", "^48", "^49"]
if any(re.search(x, numero_compte) for x in to_check):
if value > 0:
return "AUTRES_CREANCES"
else:
return "AUTRES_DETTES"
to_check = ["^50", "^51", "^52", "^53", "^54", "^58", "^59"]
if any(re.search(x, numero_compte) for x in to_check):
return "DISPONIBILITES"
# COMPTE DE RESULTAT DETAILLE NIV2
to_check = ["^60"]
if any(re.search(x, numero_compte) for x in to_check):
return "ACHATS"
to_check = ["^61", "^62"]
if any(re.search(x, numero_compte) for x in to_check):
return "SERVICES_EXTERIEURS"
to_check = ["^63"]
if any(re.search(x, numero_compte) for x in to_check):
return "TAXES"
to_check = ["^64"]
if any(re.search(x, numero_compte) for x in to_check):
return "CHARGES_PERSONNEL"
to_check = ["^65"]
if any(re.search(x, numero_compte) for x in to_check):
return "AUTRES_CHARGES"
to_check = ["^66"]
if any(re.search(x, numero_compte) for x in to_check):
return "CHARGES_FINANCIERES"
to_check = ["^67"]
if any(re.search(x, numero_compte) for x in to_check):
return "CHARGES_EXCEPTIONNELLES"
to_check = ["^68", "^78"]
if any(re.search(x, numero_compte) for x in to_check):
return "AMORTISSEMENTS"
to_check = ["^69"]
if any(re.search(x, numero_compte) for x in to_check):
return "IMPOT"
to_check = ["^70"]
if any(re.search(x, numero_compte) for x in to_check):
return "VENTES"
to_check = ["^71", "^72"]
if any(re.search(x, numero_compte) for x in to_check):
return "PRODUCTION_STOCKEE_IMMOBILISEE"
to_check = ["^74"]
if any(re.search(x, numero_compte) for x in to_check):
return "SUBVENTIONS_D'EXPL."
to_check = ["^75", "^791"]
if any(re.search(x, numero_compte) for x in to_check):
return "AUTRES_PRODUITS_GESTION_COURANTE"
to_check = ["^76", "^796"]
if any(re.search(x, numero_compte) for x in to_check):
return "PRODUITS_FINANCIERS"
to_check = ["^77", "^797"]
if any(re.search(x, numero_compte) for x in to_check):
return "PRODUITS_EXCEPTIONNELS"
to_check = ["^78"]
if any(re.search(x, numero_compte) for x in to_check):
return "REPRISES_AMORT._DEP."
to_check = ["^8"]
if any(re.search(x, numero_compte) for x in to_check):
return "COMPTES_SPECIAUX"
# Calcul des rubriques niveau 1
def rubrique_N1(row):
categorisation = row.RUBRIQUE_N2
# BILAN SIMPLIFIE type IFRS N1
to_check = ["CAPITAUX_PROPRES", "DETTES_FINANCIERES"]
if any(re.search(x, categorisation) for x in to_check):
return "PASSIF_NON_COURANT"
to_check = ["IMMOBILISATIONS"]
if any(re.search(x, categorisation) for x in to_check):
return "ACTIF_NON_COURANT"
to_check = ["STOCKS", "CREANCES_CLIENTS", "AUTRES_CREANCES"]
if any(re.search(x, categorisation) for x in to_check):
return "ACTIF_COURANT"
to_check = ["DETTES_FOURNISSEURS", "AUTRES_DETTES"]
if any(re.search(x, categorisation) for x in to_check):
return "PASSIF_COURANT"
to_check = ["DISPONIBILITES"]
if any(re.search(x, categorisation) for x in to_check):
return "DISPONIBILITES"
# COMPTE DE RESULTAT SIMPLIFIE N1
to_check = ["ACHATS"]
if any(re.search(x, categorisation) for x in to_check):
return "COUTS_DIRECTS"
to_check = [
"SERVICES_EXTERIEURS",
"TAXES",
"CHARGES_PERSONNEL",
"AUTRES_CHARGES",
"AMORTISSEMENTS",
]
if any(re.search(x, categorisation) for x in to_check):
return "CHARGES_EXPLOITATION"
to_check = ["CHARGES_FINANCIERES"]
if any(re.search(x, categorisation) for x in to_check):
return "CHARGES_FINANCIERES"
to_check = ["CHARGES_EXCEPTIONNELLES", "IMPOT"]
if any(re.search(x, categorisation) for x in to_check):
return "CHARGES_EXCEPTIONNELLES"
to_check = ["VENTES", "PRODUCTION_STOCKEE_IMMOBILISEE"]
if any(re.search(x, categorisation) for x in to_check):
return "CHIFFRE_D'AFFAIRES"
to_check = [
"SUBVENTIONS_D'EXPL.",
"AUTRES_PRODUITS_GESTION_COURANTE",
"REPRISES_AMORT._DEP.",
]
if any(re.search(x, categorisation) for x in to_check):
return "PRODUITS_EXPLOITATION"
to_check = ["PRODUITS_FINANCIERS"]
if any(re.search(x, categorisation) for x in to_check):
return "PRODUITS_FINANCIERS"
to_check = ["PRODUITS_EXCEPTIONNELS"]
if any(re.search(x, categorisation) for x in to_check):
return "PRODUITS_EXCEPTIONNELS"
# Calcul des rubriques niveau 0
def rubrique_N0(row):
masse = row.RUBRIQUE_N1
to_check = ["ACTIF_NON_COURANT", "ACTIF_COURANT", "DISPONIBILITES"]
if any(re.search(x, masse) for x in to_check):
return "ACTIF"
to_check = ["PASSIF_NON_COURANT", "PASSIF_COURANT"]
if any(re.search(x, masse) for x in to_check):
return "PASSIF"
to_check = [
"COUTS_DIRECTS",
"CHARGES_EXPLOITATION",
"CHARGES_FINANCIERES",
"CHARGES_EXCEPTIONNELLES",
]
if any(re.search(x, masse) for x in to_check):
return "CHARGES"
to_check = [
"CHIFFRE_D'AFFAIRES",
"PRODUITS_EXPLOITATION",
"PRODUITS_FINANCIERS",
"PRODUITS_EXCEPTIONNELS",
]
if any(re.search(x, masse) for x in to_check):
return "PRODUITS"
# Mapping des rubriques
db_cat["RUBRIQUE_N2"] = db_cat.apply(lambda row: rubrique_N2(row), axis=1)
db_cat["RUBRIQUE_N1"] = db_cat.apply(lambda row: rubrique_N1(row), axis=1)
db_cat["RUBRIQUE_N0"] = db_cat.apply(lambda row: rubrique_N0(row), axis=1)
# Reorganisation colonne
to_select = [
"ENTITY",
"PERIOD",
"COMPTE_NUM",
"RUBRIQUE_N0",
"RUBRIQUE_N1",
"RUBRIQUE_N2",
"RUBRIQUE_N3",
"VALUE",
"VALUE_N-1",
"VARV",
"VARP",
]
db_cat = db_cat[to_select]
db_cat
REF_ENTITE
# Creation du dataset ref_entite
dataset_entite = db_cat.copy()
# Regrouper par entite
to_group = ["ENTITY"]
to_agg = {"ENTITY": "max"}
dataset_entite = dataset_entite.groupby(to_group, as_index=False).agg(to_agg)
# Affichage du modèle de donnée
dataset_entite
REF_SCENARIO
# Creation du dataset ref_scenario
dataset_scenario = db_cat.copy()
# Regrouper par entite
to_group = ["PERIOD"]
to_agg = {"PERIOD": "max"}
dataset_scenario = dataset_scenario.groupby(to_group, as_index=False).agg(to_agg)
# Affichage du modèle de donnée
dataset_scenario
KPIS
# Creation du dataset KPIS (CA, MARGE, EBE, BFR, CC, DF)
dataset_kpis = db_cat.copy()
# KPIs CA
dataset_kpis_ca = dataset_kpis[dataset_kpis.RUBRIQUE_N1.isin(["CHIFFRE_D'AFFAIRES"])]
to_group = ["ENTITY", "PERIOD", "RUBRIQUE_N1"]
to_agg = {"VALUE": "sum"}
dataset_kpis_ca = dataset_kpis_ca.groupby(to_group, as_index=False).agg(to_agg)
# Passage value postif
dataset_kpis_ca["VALUE"] = dataset_kpis_ca["VALUE"] * -1
# COUTS_DIRECTS
dataset_kpis_ha = dataset_kpis[dataset_kpis.RUBRIQUE_N1.isin(["COUTS_DIRECTS"])]
to_group = ["ENTITY", "PERIOD", "RUBRIQUE_N1"]
to_agg = {"VALUE": "sum"}
dataset_kpis_ha = dataset_kpis_ha.groupby(to_group, as_index=False).agg(to_agg)
# Passage value négatif
dataset_kpis_ha["VALUE"] = dataset_kpis_ha["VALUE"] * -1
# KPIs MARGE BRUTE (CA - COUTS DIRECTS)
dataset_kpis_mb = dataset_kpis_ca.copy()
dataset_kpis_mb = pd.concat([dataset_kpis_mb, dataset_kpis_ha], axis=0, sort=False)
to_group = ["ENTITY", "PERIOD"]
to_agg = {"VALUE": "sum"}
dataset_kpis_mb = dataset_kpis_mb.groupby(to_group, as_index=False).agg(to_agg)
dataset_kpis_mb["RUBRIQUE_N1"] = "MARGE"
dataset_kpis_mb = dataset_kpis_mb[["ENTITY", "PERIOD", "RUBRIQUE_N1", "VALUE"]]
# CHARGES EXTERNES
dataset_kpis_ce = dataset_kpis[dataset_kpis.RUBRIQUE_N2.isin(["SERVICES_EXTERIEURS"])]
to_group = ["ENTITY", "PERIOD", "RUBRIQUE_N2"]
to_agg = {"VALUE": "sum"}
dataset_kpis_ce = dataset_kpis_ce.groupby(to_group, as_index=False).agg(to_agg)
# Passage value negatif
dataset_kpis_ce["VALUE"] = dataset_kpis_ce["VALUE"] * -1
# IMPOTS
dataset_kpis_ip = dataset_kpis[dataset_kpis.RUBRIQUE_N2.isin(["TAXES"])]
to_group = ["ENTITY", "PERIOD", "RUBRIQUE_N2"]
to_agg = {"VALUE": "sum"}
dataset_kpis_ip = dataset_kpis_ip.groupby(to_group, as_index=False).agg(to_agg)
# Passage value negatif
dataset_kpis_ip["VALUE"] = dataset_kpis_ip["VALUE"] * -1
# CHARGES DE PERSONNEL
dataset_kpis_cp = dataset_kpis[dataset_kpis.RUBRIQUE_N2.isin(["CHARGES_PERSONNEL"])]
to_group = ["ENTITY", "PERIOD", "RUBRIQUE_N2"]
to_agg = {"VALUE": "sum"}
dataset_kpis_cp = dataset_kpis_cp.groupby(to_group, as_index=False).agg(to_agg)
# Passage value negatif
dataset_kpis_cp["VALUE"] = dataset_kpis_cp["VALUE"] * -1
# AUTRES_CHARGES
dataset_kpis_ac = dataset_kpis[dataset_kpis.RUBRIQUE_N2.isin(["AUTRES_CHARGES"])]
to_group = ["ENTITY", "PERIOD", "RUBRIQUE_N2"]
to_agg = {"VALUE": "sum"}
dataset_kpis_ac = dataset_kpis_ac.groupby(to_group, as_index=False).agg(to_agg)
# Passage value negatif
dataset_kpis_ac["VALUE"] = dataset_kpis_ac["VALUE"] * -1
# SUBVENTIONS D'EXPLOITATION
dataset_kpis_ac = dataset_kpis[dataset_kpis.RUBRIQUE_N2.isin(["SUBVENTIONS_D'EXPL."])]
to_group = ["ENTITY", "PERIOD", "RUBRIQUE_N2"]
to_agg = {"VALUE": "sum"}
dataset_kpis_ac = dataset_kpis_ac.groupby(to_group, as_index=False).agg(to_agg)
# KPIs EBE = MARGE - CHARGES EXTERNES - TAXES - CHARGES PERSONNEL - AUTRES CHARGES + SUBVENTION D'EXPLOITATION
dataset_kpis_ebe = dataset_kpis_mb.copy()
dataset_kpis_ebe = pd.concat(
[
dataset_kpis_ebe,
dataset_kpis_ce,
dataset_kpis_ip,
dataset_kpis_cp,
dataset_kpis_ac,
],
axis=0,
sort=False,
)
to_group = ["ENTITY", "PERIOD"]
to_agg = {"VALUE": "sum"}
dataset_kpis_ebe = dataset_kpis_ebe.groupby(to_group, as_index=False).agg(to_agg)
dataset_kpis_ebe["RUBRIQUE_N1"] = "EBE"
dataset_kpis_ebe = dataset_kpis_ebe[["ENTITY", "PERIOD", "RUBRIQUE_N1", "VALUE"]]
# KPIs CREANCES CLIENTS
dataset_kpis_cc = dataset_kpis[dataset_kpis.RUBRIQUE_N2.isin(["CREANCES_CLIENTS"])]
to_group = ["ENTITY", "PERIOD", "RUBRIQUE_N2"]
to_agg = {"VALUE": "sum"}
dataset_kpis_cc = dataset_kpis_cc.groupby(to_group, as_index=False).agg(to_agg)
# Renommage colonne
to_rename = {"RUBRIQUE_N2": "RUBRIQUE_N1"}
dataset_kpis_cc = dataset_kpis_cc.rename(columns=to_rename)
# KPIs STOCKS
dataset_kpis_st = dataset_kpis[dataset_kpis.RUBRIQUE_N2.isin(["STOCKS"])]
to_group = ["ENTITY", "PERIOD", "RUBRIQUE_N2"]
to_agg = {"VALUE": "sum"}
dataset_kpis_st