Analyze patient treatment data with the help of pandas

I work in the population health industry and I get contracts with commercial companies to conduct research on their products. This is the general code for identifying target patient groups from a provincial dataset, including DAD (discharge from hospital), PC (physician requests), NACRS (visit to the ward) emergency), PIN (Drug Dispensing) and REG (Provincial Registry). The same patients can have multiple lines in each of the databases. For example, if a patient has been hospitalized three times, it will appear as three separate lines in the DAD data. The code does the following things:

  1. Import data from CSV files into individual Pandas data frames (df)
  2. Then, it performs an initial cleaning and processing of the data (such as random sampling, date formatting, call for additional reference information (such as the icd code for the conditions d & # 39; study).
  3. Under the section 1) Identify patients for Case 1, a series of steps have been performed to tag (as tags) each of the relevant data and filter according to these tags. Datasets are linked to see if a particular patient meets the requirements of the diagnostic code.
  4. The information must also be aggregated by single patient level via the pivot_table function to be summarized by single patients
  5. At the end, the final patient data frame is saved in a local directory and the analytical results are printed.
  6. I have also made my own modules feature_tagger to host some of the most used functions far from this main code
# Global steps:
# 1) Patient definition: Had an ICD code and a procedure code in a period of time
# 2) Result: a list of PHN_ENC patients included; corresponding index date
# .. & # 39; CaseDefn1_PatientDict_FINAL.txt & # 39;
# .. & # 39; CaseDefn1_PatientDf_FINAL.csv & # 39;
# 3) Results: Analytical results
# ------------------------------------------------- ---- ---------------------------------------------- ---- -------

import pandas as pd
import date / time
randomly import
import feature_tagger.feature_tagger as ft
import data_descriptor.data_descriptor as dd
import data_transformer.data_transformer as dt
import var_creator.var_creator as vc

# Pandas exit display without restriction
pd.set_option (& # 39; display.max_rows & # 39; 500)
pd.set_option (& # 39; display.max_columns & # 39; 500)
pd.set_option (& # 39; display.width & # 39 ;, 120)

# Control panel
save_file_switch = False # WARNING: Overwrite the existing when == True
df_subsampling_switch = False # WARNING: Make sure to turn it off for the final results
edge_date_inclusion = True # s or not include the last date in the range of inclusion criteria
testing_printout_switch = False
result_printout_switch = True
done_switch = True
df_subsampling_n = 15000
random_seed = 888

# Instantiate objects
ft_obj = ft.Tagger ()
dt_obj = dt.Data_Transformer ()

# Import data
loc = & # 39; office
if loc == & # 39; office:
directory = r = E:  My_Working_Primary  Projects  Data_Analysis \ & # 39;
elif loc == & # 39; home
directory = r: C:  Users  MyStuff  Dropbox  Projects  Data_Analysis \ & # 39;
otherwise: pass

refDataDir = r_Data  RefData \ & # 39;
realDataDir = r  Data  RealData \ & # 39;
resultDir = r_Results \ & # 39;

file_dad = & # 39; Prepped_DAD_Data.csv & # 39;
file_pc = & # 39; Prepped_PC_Data.csv & # 39;
file_nacrs = & # 39; Prepped_NACRS_Data.csv & # 39;
file_pin = & # 39; Prepped_PIN_Data.csv & # 39;
file_reg = & # 39; Prepped_REG_Data.csv & # 39;

df_dad = pd.read_csv (+ realDataDir + file_dad directory, dtype = {PHN_ENC: str}, encoding = utf-8, low_memory = False)
df_pc = pd.read_csv (+ realDataDir + file_pc directory, dtype = {PHN_ENC: str}, encoding = utf-8, low_memory = False)
df_nacrs = pd.read_csv (directory + realDataDir + file_nacrs, dtype = {PHN_ENC: str}, encoding = utf-8, low_memory = False)
df_pin = pd.read_csv (directory + realDataDir + file_pin, dtype = {PHN_ENC: str}, encoding = utf-8, low_memory = False)
df_reg = pd.read_csv (+ realDataDir + file_reg directory, dtype = {PHN_ENC: str}, encoding = utf-8, low_memory = False)

# Create a random sample of df to run the codes faster
if df_subsampling_switch == True:
if (df_subsampling_n> len (df_dad)) | (df_subsampling_n> len (df_pc)) | (df_subsampling_n> len (df_nacrs)) | (df_subsampling_n> len (df_nacrs)) |
print (Caution: the specified subsample size is larger than the total number of rows in some datasets, & # 39;)
print (& # 39; As a result, a resampling with replacement will be performed to reach the specified subsample size. & # 39;)
df_dad = dt_obj.random_n (df_dad, n = df_subsampling_n, on_switch = df_subsampling_switch, random_state = random_seed)
df_pc = dt_obj.random_n (df_pc, n = df_subsampling_n, on_switch = df_subsampling_switch, random_state = random_seed)
df_nacrs = dt_obj.random_n (df_nacrs, n = df_subsampling_n, on_switch = df_subsampling_switch, random_state = random_seed)
df_pin = dt_obj.random_n (df_pin, n = df_subsampling_n, on_switch = df_subsampling_switch, random_state = random_seed)

# Variable type format
df_dad['ADMIT_DATE'] = pd.to_datetime (df_dad['ADMIT_DATE'], format =% Y-% m-% d)
df_dad['DIS_DATE'] = pd.to_datetime (df_dad['DIS_DATE'], format =% Y-% m-% d)
df_pc['SE_END_DATE'] = pd.to_datetime (df_pc['SE_END_DATE'], format =% Y-% m-% d)
df_pc['SE_START_DATE'] = pd.to_datetime (df_pc['SE_START_DATE'], format =% Y-% m-% d)
df_nacrs['ARRIVE_DATE'] = pd.to_datetime (df_nacrs['ARRIVE_DATE'], format =% Y-% m-% d)
df_pin['DSPN_DATE'] = pd.to_datetime (df_pin['DSPN_DATE'], format =% Y-% m-% d)
df_reg['PERS_REAP_END_RSN_DATE'] = pd.to_datetime (df_reg['PERS_REAP_END_RSN_DATE'], format =% Y-% m-% d)

# Import reference codes
file_rxCode = _InStudyCodes_ATC & DIN.csv & # 39;
file_icdCode = _InStudyCodes_DxICD.csv & # 39;
file_serviceCode = _InStudyCodes_ServiceCode.csv & # 39;

df_rxCode = pd.read_csv (directory + refDataDir + file_rxCode, dtype = {ICD_9: str}, encoding = utf-8, low_memory = False)
df_icdCode = pd.read_csv (directory + refDataDir + file_icdCode, encoding = utf-8, low_memory = False)
df_serviceCode = pd.read_csv (directory + refDataDir + file_serviceCode, encoding = utf-8, low_memory = False)

# Definition of the constant variables of the study
inclusion_start_date = datetime.datetime (2017, 4, 1, 00, 00, 00)
inclusion_end_date = datetime.datetime (2018, 3, 31, 23, 59, 59)

sp_serviceCode_dict = {df_serviceCode['Short_Desc'][0]: df_serviceCode['Health_Service_Code'][0]}
sp_serviceCode_val = sp_serviceCode_dict['ABC injection']

sp_dxCode_dict = {DIABETES_ICD9: df_icdCode['ICD_9'][0], & # 39; DIABETES_ICD10 & # 39 ;: df_icdCode['ICD_10'][0]}
sp_dxCode_val_icd9 = sp_dxCode_dict['DIABETES_ICD9']
sp_dxCode_val_icd10 = sp_dxCode_dict['DIABETES_ICD10']

# ------------------------------------------------- ---- ---------------------------------------------- ---- -------

# 1) Identify the patients for case # 1.
# Step 1 - Between 18 and 100 years old at the date of the index
# Step 2 - Had at least 1 registered ICD diagnosis code based on a doctor's visit (ICD-9-CA = 9999 on PC) or
Number of hospitalizations (ICD-10-CA = G9999 in DAD) during the period of inclusion
# Step 3.1 - Had at least 1 specific procedure code (99.999O) for
# the inclusion period (Note: the earliest date of the ABC injection code is the date of the index)
# Step 3.2 - Date of construction of the index
Step 4 - Registered as a valid resident of Alberta for two years prior to the index date and one year after
# index date (determined from PR)

# 1.1) Get age at each service, then delete the lines whose age is outside the range 18-100
df_dad_ageTrimmed = df_dad.copy ()
df_dad_ageTrimmed = df_dad_ageTrimmed[(df_dad_ageTrimmed['AGE']> = 18) & (df_dad_ageTrimmed['AGE']<=100)]

df_pc_ageTrimmed = df_pc.copy()
df_pc_ageTrimmed = df_pc_ageTrimmed[(df_pc_ageTrimmed['AGE']>= 18) & (df_pc_ageTrimmed['AGE']<=100)]

# 1.2) Tag appropriate date within sp range >    code tag DIABETES> combine tags
df_dad_ageTrimmed['DAD_DATE_TAG'] = ft_obj.date_range_tagger (df_dad_ageTrimmed, 'ADMIT_DATE',
start_date_range = inclusion_start_date, end_date_range = inclusion_end_date, edge_date_inclusion =
edge_date_inclusion)
df_dad_ageTrimmed['DAD_ICD_TAG'] = ft_obj.multi_var_cond_tagger (df_dad_ageTrimmed, repeat_var_base_name = & # 39; DXCODE & # 39;
repeat_var_start = 1, repeat_var_end = 25, cond_list =[sp_dxCode_val_icd10])
df_dad_ageTrimmed['DAD_DATE_ICD_TAG'] = ft_obj.summing_all_tagger (df_dad_ageTrimmed, tag_var_list =['DAD_DATE_TAG', 
    'DAD_ICD_TAG'])

df_pc_ageTrimmed['PC_DATE_TAG'] = ft_obj.date_range_tagger (df_pc_ageTrimmed, 'SE_END_DATE',
start_date_range = inclusion_start_date, end_date_range = inclusion_end_date, edge_date_inclusion =
edge_date_inclusion)
df_pc_ageTrimmed['PC_ICD_TAG'] = ft_obj.multi_var_cond_tagger (df_pc_ageTrimmed, repeat_var_base_name = & # 39; HLTH_DX_ICD9X_CODE _ & # 39;
repeat_var_start = 1, repeat_var_end = 3, cond_list =[str(sp_dxCode_val_icd9)])
df_pc_ageTrimmed['PC_DATE_ICD_TAG'] = ft_obj.summing_all_tagger (df_pc_ageTrimmed, tag_var_list =['PC_DATE_TAG', 
    'PC_ICD_TAG'])

# Displays the list of all PHN_ENC patients that meet the DIABETES date and code criteria.
df_dad_ageDateICDtrimmed = df_dad_ageTrimmed[df_dad_ageTrimmed[df_dad_ageTrimmed[df_dad_ageTrimmed[df_dad_ageTrimmed['DAD_DATE_ICD_TAG']== 1]df_pc_ageDateICDtrimmed = df_pc_ageTrimmed[df_pc_ageTrimmed[df_pc_ageTrimmed[df_pc_ageTrimmed[df_pc_ageTrimmed['PC_DATE_ICD_TAG']== 1]dad_patientList_diabetes_Code = df_dad_ageDateICDtrimmed['PHN_ENC'].unique (). tolist ()
pc_patientList_diabetes_Code = df_pc_ageDateICDtrimmed['PHN_ENC'].unique (). tolist ()
dad_pc_patientList_diabetes_Code = list (set (dad_patientList_diabetes_Code) | set (pc_patientList_diabetes_Code))
dad_pc_patientList_diabetes_Code.sort ()

# 1.3.1) Mark the appropriate date in the interval sp> ABC Injection code> Combine tags
df_pc_ageTrimmed['PC_PROC_TAG'] = df_pc_ageTrimmed['ABC_INJECT']
df_pc_ageTrimmed['PC_DATE_PROC_TAG'] = ft_obj.summing_all_tagger (df_pc_ageTrimmed, tag_var_list =['PC_DATE_TAG', 
    'PC_PROC_TAG'])
df_pc_ageDateProcTrimmed = df_pc_ageTrimmed[df_pc_ageTrimmed[df_pc_ageTrimmed[df_pc_ageTrimmed[df_pc_ageTrimmed['PC_DATE_PROC_TAG']== 1]pc_patientList_procCode = df_pc_ageDateProcTrimmed['PHN_ENC'].unique (). tolist ()
dad_pc_patientList_diabetes_NprocCode = list (set (dad_pc_patientList_diabetes_Code) & set (pc_patientList_procCode))
dad_pc_patientList_diabetes_NprocCode.sort ()

# 1.3.2) Find the date of the index
df_pc_ageDateProcTrimmed_pivot = pd.pivot_table (df_pc_ageDateProcTrimmed, index =['PHN_ENC'],
values ​​=['SE_END_DATE', 'AGE', 'SEX', 'RURAL'], aggfunc = {SE_END_DATE: np.min, A AGE: np.min,
& # 39 ;: SEX: & # 39; first & # 39; RURAL & # 39; first & # 39;)
df_pc_ageDateProcTrimmed_pivot = pd.DataFrame (df_pc_ageDateProcTrimmed_pivot.to_records ())
df_pc_ageDateProcTrimmed_pivot = df_pc_ageDateProcTrimmed_pivot.rename (columns = {SE_END_DATE: & # 39; INDEX_DT & # 39;})

# 1.4) Filter by valid registry
# Create a list variable (based on the date of the index) to indicate which exercises should be valid according to
# required 2 years before the index and 1 year after the date of the index, in df_pc_ageDateProcTrimmed_pivot
def extract_needed_fiscal_years (row): # retrieved 2 years before and 1 year after the date of the index
if int (row['INDEX_DT'].mois)> = 4:
index_yr = int (line['INDEX_DT'].year) +1
other:
index_yr = int (line['INDEX_DT'].year)
first_yr = index_yr-2
four_yrs_str = str (first_an) + & # 39;, & # 39; + str (first_an + 1) + & # 39;, & # 39; + str (first_an + 2) + & # 39;, & # 39; + str (first_y + 3)
return four_yrs_str

df_temp = df_pc_ageDateProcTrimmed_pivot.copy ()
df_temp['FYE_NEEDED'] = df_temp.apply (extract_needed_fiscal_years, axis = 1)
df_temp['FYE_NEEDED'] = df_temp['FYE_NEEDED'].apply (lambda x: x[0:].split (& # 39;)) # of the entire string to the list of string elements
df_temp['FYE_NEEDED'] = df_temp['FYE_NEEDED'].apply (lambda x: [int(i) for i in x]# of the list of string elements to the list of int elements

# Create a list variable to indicate the active fiscal year, in df_reg
df_reg['FYE_ACTIVE'] = np.where (df_reg['ACTIVE_COVERAGE']== 1, df_reg['FYE'], np.nan)
df_reg_agg = df_reg.groupby (by = & # 39; PHN_ENC & # 39;) agg ({& # 39; FYE_ACTIVE: lambda x: list (x)})
df_reg_agg = df_reg_agg.reset_index ()
df_reg_agg['FYE_ACTIVE'] = df_reg_agg['FYE_ACTIVE'].apply (lambda x: [i for i in x if ~np.isnan(i)]) # remove float nan
df_reg_agg['FYE_ACTIVE'] = df_reg_agg['FYE_ACTIVE'].apply (lambda x: [int(i) for i in x]# convert float to int

# Merge and create a tag, if active years do not cover the entire tax year required, exclude patients
# Create a list of inclusion / exclusion patients to apply to obtain a cohort of patients based on case # 1
df_temp_v2 = df_temp.merge (df_reg_agg, on = & # 39; PHN_ENC & # 39 ;, how = & # 39; left & # 39;)
df_temp_v2_trimmed = df_temp_v2[(Df_temp_v2[(Df_temp_v2[(df_temp_v2[(df_temp_v2['FYE_NEEDED'].notnull ()) & (df_temp_v2['FYE_ACTIVE'].notnull ())]# Delete the missing lines on one or the other of the variables

def compare_list_elements_btw_cols (line):
if defined (line['FYE_NEEDED']) .issubset (row['FYE_ACTIVE']):
back 1
other:
returns 0

df_temp_v2_trimmed['VALID_REG'] = df_temp_v2_trimmed.apply (compare_list_elements_btw_cols, axis = 1)
df_temp_v2_trimmed_v2 = df_temp_v2_trimmed[df_temp_v2_trimmed[df_temp_v2_trimmed[df_temp_v2_trimmed[df_temp_v2_trimmed['VALID_REG']== 1]reg_patientList = df_temp_v2_trimmed_v2['PHN_ENC'].unique (). tolist ()

# Apply the list of patients included / excluded (from REG) to find the final patients
# Get the final list of patients
df_final_defn1 = df_pc_ageDateProcTrimmed_pivot.merge (df_temp_v2_trimmed_v2, on = PHN_ENC, comment = internal)
df_final_defn1 = df_final_defn1[['PHN_ENC', 'AGE_x', 'SEX_x', 'RURAL_x', 'INDEX_DT_x']]df_final_defn1 = df_final_defn1.rename (columns = {AGE_x: AGE, & # 39; SEX_x: & # 39; SEX & # 39 ;, & # 39; RURAL_x & # 39; : "RURAL", "INDEX_DT_x": INDEX_DT, #######################################################################
df_final_defn1['PREINDEX_1Yr'] = (df_final_defn1['INDEX_DT']-pd.Timedelta (days = 364)) # 364 because the date of the index counts for a pre-index date
df_final_defn1['PREINDEX_2Yr'] = (df_final_defn1['INDEX_DT']-pd.Timedelta (days = 729)) # 729 because the date of the index counts for a date prior to the index
df_final_defn1['POSTINDEX_1Yr'] = (df_final_defn1['INDEX_DT']+ pd.Timedelta (days = 364))

list_final_defn1 = df_final_defn1['PHN_ENC'].unique (). tolist ()
dict_final_defn1 = {###; ### Last unique patient in case # 1 definition: list_final_defn1}

# Additional request (later)
# How: create INDEX_DT_FIS_YR (fiscal year of the index date) by mapping INDEX_DT on the fiscal year
def index_date_fiscal_year (row):
if ((row['INDEX_DT'] > = datetime.datetime (2015, 4, 1, 00, 00, 00)) &
(row['INDEX_DT'] < datetime.datetime(2016, 4, 1, 00, 00, 00))):
        return '2015/2016'
    elif ((row['INDEX_DT'] >= datetime.datetime (2016, 4, 1, 00, 00, 00)) &
(row['INDEX_DT'] <datetime.datetime (2017, 4, 1, 00, 00, 00))):
back - 2016/2017 & # 39;
other:
returns 'Potential Error'

df_final_defn1['INDEX_DT_FIS_YR'] = df_final_defn1.apply (index_date_fiscal_year, axis = 1)

# 2) Release of the final patient list for future access
# WARNING: will overwrite existing information
if save_file_switch == True:
if df_subsampling_switch == True:
f = open (directory + resultDir + & # 39; _ CaseDefn1_PatientDict_Subsample.txt & # 39 ;, "w")
f.write (str (dict_final_defn1) + ', & # 39;)
f.close ()
df_final_defn1.to_csv (directory + resultDir + 'CaseDefn1_PatientDf_Subsample.csv', sep = ',', encoding = 'utf-8')
elif df_subsampling_switch == False:
f = open (directory + resultDir + & # 39; CaseDefn1_PatientDict_FINAL.txt & # 39 ;, "w")
f.write (str (dict_final_defn1) + ', & # 39;)
f.close ()
df_final_defn1.to_csv (directory + resultDir + & # 39; CaseDefn1_PatientDf_FINAL.csv & # 39 ;, sep = & # 39 ;, encoding = & utf-8 & # 39;)

# 3) Results: Analytical results
if result_printout_switch == True:
print (PH Unique PHN_ENC N, (aged 18 to 100 years during the inclusion period) of DAD: & # 39;)
print (df_dad_ageTrimmed['PHN_ENC'].nunique ())

print (PH Unique PHN_ENC N, (aged 18 to 100 years during the inclusion period) from PC: & # 39;)
print (df_pc_ageTrimmed['PHN_ENC'].nunique ())

print (& Unique PHN_ENC N, (aged 18 to 100 years during the inclusion period) of DAD or PC: &)
dd_obj = dd.Data_Comparator (df_dad_ageTrimmed, df_pc_ageTrimmed, & # 39; PHN_ENC & # 39;)
print (dd_obj.unique_n_union ())

print (PH Unique PHN_ENC N, (aged 18 to 100 years old) and (had DIABETES code during the period of inclusion) from DAD: &
print (df_dad_ageDateICDtrimmed['PHN_ENC'].nunique ())

print (PH Unique PHN_ENC N, (aged 18 to 100 years old) and (had DIABETES code during the inclusion period) from PC: &
print (df_pc_ageDateICDtrimmed['PHN_ENC'].nunique ())

print (PH Unique PHN_ENC N, (aged 18 to 100 years) and (had the code DIABETES during the inclusion period) from DAD or PC: &)
print (len (dad_pc_patientList_diabetes_Code))

print (PH Unique PHN_ENC N, (aged 18 to 100 years old) and (had DIABETES code during the inclusion period) 
and (had the ABC injection code) of DAD and PC: & # 39;)
print (df_pc_ageDateProcTrimmed_pivot['PHN_ENC'].nunique ())

print (PH Unique PHN_ENC N, (aged 18 to 1005 years old) and (had DIABETES code during the inclusion period) 
and (had the ABC injection code) and (had a resident AB around the date of index) of DAD, PC and REG [Case Def #1]: & # 39;)
print (df_final_defn1['PHN_ENC'].nunique ())

# Additional analysis request (later)
print ('Patient N' by indexation date corresponding to the corresponding exercise: & # 39;)
print (df_final_defn1['INDEX_DT_FIS_YR'].value_counts ())

if done_switch == True:
ctypes.windll.user32.MessageBoxA (0, b 'Hello there', b 'Program done.', 3)

My questions are:

  • It's a project-specific code, to other projects from other companies, that follow exactly the same steps: data cleanup, data binding, tagging, tag filtering, aggregation, backup, and production of data. How can I refactor my code so that it is maintainable in this specific project, as well as reusable in similar projects?
  • Many times, once I have run the code and produced the results, the clients can come back to ask for additional information (for example, those under # Additional request (later)). How can I handle additional requests more efficiently with maintainability and scalability in mind?
  • Are there areas in which I can try to use patterns
  • Any other suggestion on how I can write better python code is welcome.