Skip to content

Preprocess data

auxiliary_data

surname_origin_bert

surname_origin_bert(df, surname_col)

THIS IS WORK IN PROGRESS!!! Surname classification that uses adjusted script scripts/surname_classification_with_bert.py) from: https://www.kaggle.com/code/yonatankpl/surname-classification-with-bert to train a bert model for surname classification

additional data from: https://github.com/greenelab/wiki-nationality-estimate/tree/master

and possibly data from: https://github.com/philipperemy/name-dataset?tab=readme-ov-file#full-dataset

Parameters:

  • df (pd.DataFrame) –

    input dataset

  • surname_col (str) –

    column name with surnames

Source code in churn_pred/preprocessing/auxiliary_data.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def surname_origin_bert(df: pd.DataFrame, surname_col: str) -> pd.DataFrame:
    """
    THIS IS WORK IN PROGRESS!!!
    Surname classification that uses adjusted script
    scripts/surname_classification_with_bert.py) from:
    https://www.kaggle.com/code/yonatankpl/surname-classification-with-bert
    to train a bert model for surname classification

    additional data from:
    https://github.com/greenelab/wiki-nationality-estimate/tree/master

    and possibly data from:
    https://github.com/philipperemy/name-dataset?tab=readme-ov-file#full-dataset

    Args:
        df (pd.DataFrame): input dataset
        surname_col (str): column name with surnames
    """
    dfc = df.copy()
    return dfc

surname_origin

surname_origin(df, surname_col)

Surname classification that uses db of names leaked from FB, see: https://github.com/philipperemy/name-dataset

Parameters:

  • df (pd.DataFrame) –

    input dataset

  • surname_col (str) –

    column name with surnames

Source code in churn_pred/preprocessing/auxiliary_data.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def surname_origin(df: pd.DataFrame, surname_col: str) -> pd.DataFrame:
    """
    Surname classification that uses db of names leaked from FB, see:
    https://github.com/philipperemy/name-dataset

    Args:
        df (pd.DataFrame): input dataset
        surname_col (str): column name with surnames
    """
    dfc = df.copy()
    nd = NameDataset()

    def _search_surname(surname):
        try:
            res = nd.search(surname)
            origin_country = max(
                res["last_name"]["country"], key=res["last_name"]["country"].get
            )
        except:
            origin_country = "NA"
        return origin_country

    dfc["res"] = "NA"
    for surname in dfc[surname_col].unique():
        dfc.loc[dfc[surname_col] == surname, ["res"]] = _search_surname(surname)

    return dfc["res"]

hemisphere

hemisphere(df, cc_col, loc_db_df=None, city_col=None)

Returns string pd.Series identifing hemisphere: "northern" or "southern".

Parameters:

  • df (pd.DataFrame) –

    input dataframe

  • city_col (str) –

    city name column

  • cc_col (str) –

    country code column

  • loc_db_df (pd.DataFrame) –

    pandas dataframe with previously saved locations

Source code in churn_pred/preprocessing/auxiliary_data.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def hemisphere(
    df: pd.DataFrame,
    cc_col: str,
    loc_db_df: Optional[pd.DataFrame] = None,
    city_col: Optional[str] = None,
) -> pd.Series:
    """Returns string pd.Series identifing hemisphere: "northern" or "southern".

    Args:
        df (pd.DataFrame): input dataframe
        city_col (str): city name column
        cc_col (str): country code column
        loc_db_df (pd.DataFrame): pandas dataframe with previously saved locations
    """
    if city_col:
        uq_locs = (df[city_col] + "," + df[cc_col]).unique()
    else:
        uq_locs = (" ," + df[cc_col]).unique()
    dfc = df.copy()
    geolocator = Nominatim(user_agent="hemisphere_identification")
    dfc["hemisphere"] = "northern"

    for uq_loc in uq_locs:
        if (loc_db_df is not None) and (uq_loc in loc_db_df["location"].values):
            location = loc_db_df[loc_db_df["location"] == uq_loc][
                ["latitude", "longitude"]
            ]
            latitude = float(location.latitude)
        else:
            latitude = geolocator.geocode(uq_loc).latitude

        if latitude < 0:
            city, country = uq_loc.split(",")
            dfc.loc[
                (dfc[city_col] == city) & (dfc[cc_col] == country), "hemisphere"
            ] = "southern"

    return dfc

age_categories

age_categories(df, age_col)

Returns string pd.DataFrame identifing different age classification categories according to the age: * working_class (https://ourworldindata.org/age-structure) * children_and_adolescents: <0, 15) * working_age: <15, 65) * elderly: <65, inf) * stage_of_life (https://integrishealth.org/resources/on-your-health/2015/october/stages-of-life-health-for-every-age) * infant: <0, 2) * toddler: <2, 5) * child: <5, 13) * teen: <13, 20) * adult: <20, 40) * middle_age_adult: <40, 60) * senior_adult: <60, inf) * generation (https://www.beresfordresearch.com/age-range-by-generation/) * gen_z: <12, 28) * millennials: <28, 44) * gen_x: <44, 60) * boomers_2: <60, 70) * boomers_1: <70, 79) * post_war: <79, 97) * ww2: <97, 102) * vampire: <102, inf)

Parameters:

  • df (pd.DataFrame) –

    input dataset

  • age_col (str) –

    age column

Source code in churn_pred/preprocessing/auxiliary_data.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def age_categories(df: pd.DataFrame, age_col: str) -> pd.DataFrame:
    """Returns string pd.DataFrame identifing different age classification
    categories according to the age:
    * working_class (https://ourworldindata.org/age-structure)
      * children_and_adolescents: <0, 15)
      * working_age: <15, 65)
      * elderly: <65, inf)
    * stage_of_life (https://integrishealth.org/resources/on-your-health/2015/october/stages-of-life-health-for-every-age)
      * infant: <0, 2)
      * toddler: <2, 5)
      * child: <5, 13)
      * teen: <13, 20)
      * adult: <20, 40)
      * middle_age_adult: <40, 60)
      * senior_adult: <60, inf)
    * generation (https://www.beresfordresearch.com/age-range-by-generation/)
      * gen_z: <12, 28)
      * millennials: <28, 44)
      * gen_x: <44, 60)
      * boomers_2: <60, 70)
      * boomers_1: <70, 79)
      * post_war: <79, 97)
      * ww2: <97, 102)
      * vampire: <102, inf)

    Args:
        df (pd.DataFrame): input dataset
        age_col (str): age column
    """
    dfc = df.copy()
    dfc["working_class"] = pd.cut(
        dfc[age_col],
        bins=[0, 15, 65, np.inf],
        labels=["children_and_adolescents", "working_age", "elderly"],
        include_lowest=True,
    )
    dfc["stage_of_life"] = pd.cut(
        dfc[age_col],
        bins=[0, 2, 5, 13, 20, 40, 60, np.inf],
        labels=[
            "infant",
            "toddler",
            "child",
            "teen",
            "adult",
            "middle_age_adult",
            "senior_adult",
        ],
        include_lowest=True,
    )
    dfc["generation"] = pd.cut(
        dfc[age_col],
        bins=[0, 28, 44, 60, 70, 79, 97, 102, np.inf],
        labels=[
            "gen_z",
            "millennials",
            "gen_x",
            "boomers_2",
            "boomers_1",
            "post_war",
            "ww2",
            "vampire",
        ],
        include_lowest=True,
    )
    return dfc

big_mac_index

big_mac_index(df, country_name_col)

Returns dataframe with Big Max Index corresponding to country codes. Downloaded on 26/4/2024.

Note

Unfortunatelly Spain, France, Germany (unique companies in the dataset) are not included, other index to explore: https://en.wikipedia.org/wiki/Big_Mac_Index

Based on
Downloaded from

Parameters:

  • df (pd.DataFrame) –

    input dataset

  • country_name_col (str) –

    column name with country names

Source code in churn_pred/preprocessing/auxiliary_data.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def big_mac_index(df: pd.DataFrame, country_name_col: str) -> pd.DataFrame:
    """
    Returns dataframe with Big Max Index corresponding to country codes.
    Downloaded on 26/4/2024.

    Note:
        Unfortunatelly Spain, France, Germany (unique companies in the dataset) are not included, other index to explore:
        https://en.wikipedia.org/wiki/Big_Mac_Index

    Based on:
        https://www.economist.com/big-mac-index

    Downloaded from:
        https://github.com/TheEconomist/big-mac-data
        https://raw.githubusercontent.com/TheEconomist/big-mac-data/master/output-data/big-mac-full-index.csv

    Args:
        df (pd.DataFrame): input dataset
        country_name_col: column name with country names
    """
    big_mac_index = pd.read_csv(BIG_MAC_INDEX)
    # filter the last available price
    big_mac_index["date"] = pd.to_datetime(big_mac_index["date"])
    big_mac_index = (
        big_mac_index.sort_values("date").groupby("iso_a3").tail(1).reset_index()
    )
    big_mac_index = big_mac_index[["iso_a3", "dollar_price"]]
    big_mac_index.rename(
        columns={"dollar_price": "big_mac_index_dollar_price"}, inplace=True
    )

    big_mac_index = big_mac_index.dropna()

    dfc = df.copy()
    dfc["iso_a3"] = get_iso_a3(df=dfc, country_name_col=country_name_col)
    dfc = dfc.merge(big_mac_index, on="iso_a3", how="left")
    dfc.drop(columns=["iso_a3"], inplace=True)
    return dfc

gdppc

gdppc(df, country_name_col)

Returns dataframe with Gross Domestic Product Per Capita corresponding to country codes. Uses information from worldbank API downloaded on 26/4/2024. Last file update: 3/28/2024. Posprocessed file has initial 3 lines removed for easier processing using pandas df.

Downloaded from
Main file before preprocessing

'data/gdpp/API_NY.GDP.PCAP.PP.CD_DS2_en_csv_v2_213153.csv'

Other included files

'data/gdpp/Metadata_Country_API_NY.GDP.PCAP.PP.CD_DS2_en_csv_v2_213153.csv' 'data/gdpp/Metadata_Indicator_API_NY.GDP.PCAP.PP.CD_DS2_en_csv_v2_213153.csv'

Parameters:

  • df (pd.DataFrame) –

    input dataset

  • country_name_col (str) –

    column name with country names

Source code in churn_pred/preprocessing/auxiliary_data.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def gdppc(df: pd.DataFrame, country_name_col: str) -> pd.DataFrame:
    """
    Returns dataframe with Gross Domestic Product Per Capita corresponding to country
    codes. Uses information from worldbank API downloaded on 26/4/2024.
    Last file update: 3/28/2024.
    Posprocessed file has initial 3 lines removed for easier processing using pandas df.

    Downloaded from:
        * https://data.worldbank.org/indicator/NY.GDP.PCAP.PP.CD
        * https://api.worldbank.org/v2/en/indicator/NY.GDP.PCAP.PP.CD?downloadformat=csv
    Main file before preprocessing:
        'data/gdpp/API_NY.GDP.PCAP.PP.CD_DS2_en_csv_v2_213153.csv'
    Other included files:
        'data/gdpp/Metadata_Country_API_NY.GDP.PCAP.PP.CD_DS2_en_csv_v2_213153.csv'
        'data/gdpp/Metadata_Indicator_API_NY.GDP.PCAP.PP.CD_DS2_en_csv_v2_213153.csv'

    Args:
        df (pd.DataFrame): input dataset
        country_name_col: column name with country names
    """
    gdp_per_capita_raw = pd.read_csv(GDP_PER_CAPITA)
    gdp_per_capita_ic = pd.read_csv(GDP_PER_CAPITA_INCOMEGROUP)
    # add last available year
    gdp_per_capita = gdp_per_capita_raw[["Country Code"]]
    gdp_per_capita.rename(columns={"Country Code": "iso_a3"}, inplace=True)
    gdp_per_capita["gdp_per_capita"] = (
        gdp_per_capita_raw.drop(
            columns=["Country Name", "Country Code", "Indicator Name", "Indicator Code"]
        )
        .ffill(axis=1)
        .iloc[:, -1]
    )

    gdp_per_capita_ic = gdp_per_capita_ic[["Country Code", "IncomeGroup"]]
    gdp_per_capita_ic.rename(columns={"Country Code": "iso_a3"}, inplace=True)

    gdp_per_capita = gdp_per_capita.dropna()
    gdp_per_capita_ic = gdp_per_capita_ic.dropna()

    dfc = df.copy()
    dfc["iso_a3"] = get_iso_a3(df=dfc, country_name_col=country_name_col)

    dfc = dfc.merge(gdp_per_capita, on="iso_a3", how="left")
    dfc = dfc.merge(gdp_per_capita_ic, on="iso_a3", how="left")
    dfc.drop(columns=["iso_a3"], inplace=True)

    return dfc

get_iso_a3

get_iso_a3(df, country_name_col)

Returns pd.Series with mapped country names to iso_a3.

Parameters:

  • df (pd.Series) –

    input country name pandas series

Source code in churn_pred/preprocessing/auxiliary_data.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def get_iso_a3(df: pd.DataFrame, country_name_col: str):
    """Returns pd.Series with mapped country names to iso_a3.

    Args:
        df (pd.Series): input country name pandas series
    """
    dfc = df.copy()

    def _get_iso_a3_map(country_name: str) -> str:
        """Helper function for get_iso_a3"""
        try:
            result = pycountry.countries.search_fuzzy(country_name)
            iso_a3_str = result[0].alpha_3  # type: ignore
        except:
            iso_a3_str = "NA"
        return iso_a3_str

    dfc["res"] = "NA"
    for country_name in dfc[country_name_col].unique():
        dfc.loc[dfc[country_name_col] == country_name, ["res"]] = _get_iso_a3_map(
            country_name
        )
    return dfc["res"]

get_country_name

get_country_name(df, country_name_col)

Returns pd.Series with mapped country iso_a2 to names.

Parameters:

  • df (pd.Series) –

    input country name pandas series

Source code in churn_pred/preprocessing/auxiliary_data.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def get_country_name(df: pd.DataFrame, country_name_col: str):
    """Returns pd.Series with mapped country iso_a2 to names.

    Args:
        df (pd.Series): input country name pandas series
    """
    dfc = df.copy()

    def _get_country_name_map(country_name: str) -> str:
        """Helper function for get_country_name"""
        try:
            result = pycountry.countries.get(alpha_2=country_name)
            name_str = result.name
        except:
            name_str = "NA"
        return name_str

    dfc["res"] = "NA"
    for country_name in dfc[country_name_col].unique():
        dfc.loc[dfc[country_name_col] == country_name, ["res"]] = _get_country_name_map(
            country_name
        )
    return dfc["res"]

get_country_region_subregion

get_country_region_subregion(df, country_name_col)

Returns pd.Series with mapped country iso_a2 to names.

Parameters:

  • df (pd.Series) –

    input country name pandas series

Source code in churn_pred/preprocessing/auxiliary_data.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def get_country_region_subregion(df: pd.DataFrame, country_name_col: str):
    """Returns pd.Series with mapped country iso_a2 to names.

    Args:
        df (pd.Series): input country name pandas series
    """
    dfc = df.copy()

    def _get_country_region_subregion_map(country_name: str) -> Tuple[str, str]:
        """Helper function for get_country_name"""
        try:
            result = CountryInfo(country_name)
            region_str = result.region()
            subregion_str = result.subregion()
        except:
            region_str = "NA"
            subregion_str = "NA"
        return (region_str, subregion_str)

    dfc[[country_name_col + "_region", country_name_col + "_subregion"]] = ("NA", "NA")
    for country_name in dfc[country_name_col].unique():
        dfc.loc[
            dfc[country_name_col] == country_name,
            [country_name_col + "_region", country_name_col + "_subregion"],
        ] = _get_country_region_subregion_map(country_name)
    return dfc[[country_name_col + "_region", country_name_col + "_subregion"]]

label_encoder

LabelEncoder

LabelEncoder(columns_to_encode=None)

Bases: object

Label Encode categorical values for multiple columns at once

ℹ️ NOTE: Shamlessly copied from https://github.com/jrzaurin/pytorch-widedeep

ℹ️ NOTE: LabelEncoder reserves 0 for unseen new categories. This is convenient when defining the embedding layers, since we can just set padding idx to 0.

Parameters:

  • columns_to_encode (list, Optional, default = None) –

    List of strings containing the names of the columns to encode. If None all columns of type object in the dataframe will be label encoded.

Attributes:

  • encoding_dict (Dict) –

    Dictionary containing the encoding mappings in the format, e.g. :
    {'colname1': {'cat1': 1, 'cat2': 2, ...}, 'colname2': {'cat1': 1, 'cat2': 2, ...}, ...} # noqa

  • inverse_encoding_dict(Dict) (Dict) –

    Dictionary containing the inverse encoding mappings in the format, e.g. :
    {'colname1': {1: 'cat1', 2: 'cat2', ...}, 'colname2': {1: 'cat1', 2: 'cat2', ...}, ...} # noqa

Source code in churn_pred/preprocessing/label_encoder.py
29
30
31
32
33
def __init__(
    self,
    columns_to_encode: Optional[List[str]] = None,
):
    self.columns_to_encode = columns_to_encode

fit

fit(df)

Creates encoding attributes

Returns:

  • LabelEncoder( LabelEncoder ) –

    LabelEncoder fitted object

Source code in churn_pred/preprocessing/label_encoder.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def fit(self, df: pd.DataFrame) -> "LabelEncoder":
    """Creates encoding attributes

    Returns:
        LabelEncoder: `LabelEncoder` fitted object
    """

    df_inp = df.copy()

    if self.columns_to_encode is None:
        self.columns_to_encode = list(
            df_inp.select_dtypes(include=["object"]).columns
        )
    else:
        # sanity check to make sure all categorical columns are in an adequate
        # format
        for col in self.columns_to_encode:
            df_inp[col] = df_inp[col].astype("O")

    unique_column_vals = dict()
    for c in self.columns_to_encode:
        unique_column_vals[c] = df_inp[c].unique()

    self.encoding_dict = dict()

    # leave 0 for padding/"unseen" categories
    idx = 1
    for k, v in unique_column_vals.items():
        self.encoding_dict[k] = {
            o: i + idx for i, o in enumerate(unique_column_vals[k])
        }
        idx = 1

    self.inverse_encoding_dict = dict()
    for c in self.encoding_dict:
        self.inverse_encoding_dict[c] = {
            v: k for k, v in self.encoding_dict[c].items()
        }
        self.inverse_encoding_dict[c][0] = "unseen"

    return self

transform

transform(df)

Label Encoded the categories in columns_to_encode

Returns:

  • pd.DataFrame

    pd.DataFrame: label-encoded dataframe

Source code in churn_pred/preprocessing/label_encoder.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
    """Label Encoded the categories in `columns_to_encode`

    Returns:
        pd.DataFrame: label-encoded dataframe
    """
    try:
        self.encoding_dict
    except AttributeError:
        raise NotFittedError(
            "This LabelEncoder instance is not fitted yet. "
            "Call 'fit' with appropriate arguments before using this LabelEncoder."
        )

    df_inp = df.copy()
    # sanity check to make sure all categorical columns are in an adequate
    # format
    for col in self.columns_to_encode:  # type: ignore
        df_inp[col] = df_inp[col].astype("O")

    for k, v in self.encoding_dict.items():
        df_inp[k] = df_inp[k].apply(lambda x: v[x] if x in v.keys() else 0)

    return df_inp

fit_transform

fit_transform(df)

Combines fit and transform

Returns:

  • pd.DataFrame

    pd.DataFrame: label-encoded dataframe

Examples:

>>> import pandas as pd
>>> from churn_pred.preprocessing.label_encoder import LabelEncoder
>>> df = pd.DataFrame({'col1': [1,2,3], 'col2': ['me', 'you', 'him']})
>>> columns_to_encode = ['col2']
>>> encoder = LabelEncoder(columns_to_encode)
>>> encoder.fit_transform(df)
   col1  col2
0     1     1
1     2     2
2     3     3
>>> encoder.encoding_dict
{'col2': {'me': 1, 'you': 2, 'him': 3}}
Source code in churn_pred/preprocessing/label_encoder.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
    """Combines `fit` and `transform`

    Returns:
        pd.DataFrame: label-encoded dataframe

    Examples:
        >>> import pandas as pd
        >>> from churn_pred.preprocessing.label_encoder import LabelEncoder
        >>> df = pd.DataFrame({'col1': [1,2,3], 'col2': ['me', 'you', 'him']})
        >>> columns_to_encode = ['col2']
        >>> encoder = LabelEncoder(columns_to_encode)
        >>> encoder.fit_transform(df)
           col1  col2
        0     1     1
        1     2     2
        2     3     3
        >>> encoder.encoding_dict
        {'col2': {'me': 1, 'you': 2, 'him': 3}}
    """
    return self.fit(df).transform(df)

inverse_transform

inverse_transform(df)

Returns the original categories

Returns:

  • pd.DataFrame

    pd.DataFrame: label-encoded dataframe

Examples:

>>> import pandas as pd
>>> from churn_pred.preprocessing.label_encoder import LabelEncoder
>>> df = pd.DataFrame({'col1': [1,2,3], 'col2': ['me', 'you', 'him']})
>>> columns_to_encode = ['col2']
>>> encoder = LabelEncoder(columns_to_encode)
>>> df_enc = encoder.fit_transform(df)
>>> encoder.inverse_transform(df_enc)
   col1 col2
0     1   me
1     2  you
2     3  him
Source code in churn_pred/preprocessing/label_encoder.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
    """Returns the original categories

    Returns:
        pd.DataFrame: label-encoded dataframe

    Examples:
        >>> import pandas as pd
        >>> from churn_pred.preprocessing.label_encoder import LabelEncoder
        >>> df = pd.DataFrame({'col1': [1,2,3], 'col2': ['me', 'you', 'him']})
        >>> columns_to_encode = ['col2']
        >>> encoder = LabelEncoder(columns_to_encode)
        >>> df_enc = encoder.fit_transform(df)
        >>> encoder.inverse_transform(df_enc)
           col1 col2
        0     1   me
        1     2  you
        2     3  him
    """
    for k, v in self.inverse_encoding_dict.items():
        df[k] = df[k].apply(lambda x: v[x])
    return df

preprocess_data

drop_high_nan_cols

drop_high_nan_cols(df, threshold=0.8, verbose=False)

Returns dataframe without columns that have ratio of missingness above threshold.

Parameters:

  • df (pd.DataFrame) –

    input dataframe

  • threshold (float = 0.8) –

    ratio of missingness applied per column

  • verbose (bool) –

    whether the output should be verbose

Source code in churn_pred/preprocessing/preprocess_data.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def drop_high_nan_cols(
    df: pd.DataFrame, threshold: float = 0.8, verbose: bool = False
) -> pd.DataFrame:
    """Returns dataframe without columns that have ratio of missingness above threshold.

    Args:
        df (pd.DataFrame): input dataframe
        threshold (float = 0.8): ratio of missingness applied per column
        verbose (bool): whether the output should be verbose
    """
    n_rows = df.shape[0]
    nan_fraction_per_col = df.apply(lambda x: x.isna().sum() / n_rows, axis=0)
    high_nan_percentage_cols = nan_fraction_per_col[
        nan_fraction_per_col > threshold
    ].index.values
    df = df.drop(high_nan_percentage_cols, axis=1)
    if verbose:
        print(
            """
            Dropped {} columns with fraction of NaN above threshold = {}.
            Affected columns: {}
            """.format(
                len(high_nan_percentage_cols), threshold, high_nan_percentage_cols
            )
        )
    return df

drop_constant_cols

drop_constant_cols(df, verbose=False)

Returns dataframe without constant columns, i.e. those with just 1 unique value for all rows.

Source code in churn_pred/preprocessing/preprocess_data.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def drop_constant_cols(df: pd.DataFrame, verbose: bool = False) -> pd.DataFrame:
    """Returns dataframe without constant columns, i.e. those with just 1 unique
    value for all rows."""
    nunique_per_col = df.apply(lambda x: x.nunique(), axis=0)
    const_cols = nunique_per_col[nunique_per_col == 1].index.values
    df = df.drop(const_cols, axis=1)
    if verbose:
        print(
            """
            Dropped {} constant columns.
            Affected columns: {}
            """.format(
                len(const_cols), const_cols
            )
        )
    return df

drop_high_uq_cat_cols

drop_high_uq_cat_cols(
    df, cat_cols, uq_val_count, verbose=False
)

Returns dataframe without categorical columns that have too many unique values

Parameters:

  • df (pd.DataFrame) –

    input dataframe

  • cat_cols (list) –

    list of categorical columns

  • uq_val_count (int) –

    unique value count

  • verbose (bool) –

    whether the output should be verbose

Source code in churn_pred/preprocessing/preprocess_data.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def drop_high_uq_cat_cols(
    df: pd.DataFrame, cat_cols: list, uq_val_count: int, verbose: bool = False
) -> pd.DataFrame:
    """Returns dataframe without categorical columns that have too many unique values

    Args:
        df (pd.DataFrame): input dataframe
        cat_cols (list): list of categorical columns
        uq_val_count (int): unique value count
        verbose (bool): whether the output should be verbose
    """
    nunique_per_col = df[cat_cols].apply(lambda x: x.nunique(), axis=0)
    high_uq_value_cat_cols = nunique_per_col[
        nunique_per_col > uq_val_count
    ].index.values
    df = df.drop(high_uq_value_cat_cols, axis=1)
    if verbose:
        print(
            """
            Dropped {} high unique value columns.
            Affected columns: {}
            """.format(
                len(high_uq_value_cat_cols), high_uq_value_cat_cols
            )
        )
    return df

drop_highly_correlated_columns

drop_highly_correlated_columns(
    df, cont_cols, crosscorr_val=0.95, verbose=False
)

Returns dataframe without highly correlated columns, cross correlation is evaluated with crosscorr_val.

Parameters:

  • df (pd.DataFrame) –

    input dataframe

  • cont_cols (list) –

    list of columns to evaluate correlation for

  • crosscorr_val (float = 0.95) –

    threshold value of correlation

  • verbose (bool) –

    whether the output should be verbose

Source code in churn_pred/preprocessing/preprocess_data.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def drop_highly_correlated_columns(
    df: pd.DataFrame,
    cont_cols: list,
    crosscorr_val: float = 0.95,
    verbose: bool = False,
) -> pd.DataFrame:
    """Returns dataframe without highly correlated columns, cross correlation is
    evaluated with crosscorr_val.

    Args:
        df (pd.DataFrame): input dataframe
        cont_cols (list): list of columns to evaluate correlation for
        crosscorr_val (float = 0.95): threshold value of correlation
        verbose (bool): whether the output should be verbose
    """
    corr_matrix = df[cont_cols].corr().abs()
    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype("bool"))
    upper_above = upper.apply(lambda x: any(x > crosscorr_val), axis=1)
    to_drop = upper_above[upper_above].index.values
    df = df.drop(to_drop, axis=1)
    if verbose:
        print(
            """
            Dropped {} highly correlated columns.
            Affected columns: {}
            """.format(
                len(to_drop), to_drop
            )
        )
    return df

nan_with_unknown_imputer

nan_with_unknown_imputer(
    df, columns, fill_token="unknown", verbose=False
)

Fills NAs with surrogate string, 'unknown' is default value used, it can be customized.

Parameters:

  • df (pd.DataFrame) –

    input dataframe

  • columns (List[str]) –

    ist of columns that will be filled

  • fill_token (str = "unknown") –

    string used to replace NAs

  • verbose (bool) –

    whether the output should be verbose

Source code in churn_pred/preprocessing/preprocess_data.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def nan_with_unknown_imputer(
    df: pd.DataFrame,
    columns: List[str],
    fill_token: str = "unknown",
    verbose: bool = False,
) -> pd.DataFrame:
    """Fills NAs with surrogate string, 'unknown' is default value used, it can be customized.

    Args:
        df (pd.DataFrame): input dataframe
        columns (List[str]): ist of columns that will be filled
        fill_token (str = "unknown"): string used to replace NAs
        verbose (bool): whether the output should be verbose
    """
    df = df.copy()
    if verbose:
        sum_nan_vals = df[columns].isna().sum()
        nans_cols = []
        for c in columns:
            nans_cols.append((c, df[c].isna().sum()))
    for c in columns:
        df[c] = df[c].astype(object).fillna(fill_token)

    dfc = df.copy()
    dfc[columns] = dfc[columns].apply(lambda x: x.astype(object).fillna(fill_token))
    if verbose:
        print(
            """
            Imputed {} NaN values with {}.
            Affected columns (col, num_NaNs): {}
            """.format(
                sum_nan_vals, fill_token, nans_cols
            )
        )
    return df

nan_with_number_imputer

nan_with_number_imputer(
    df, columns, fill_number=-1.0, verbose=False
)

Fills NAs with surrogate float, -1 is default value used, it can be customized.

Parameters:

  • df (pd.DataFrame) –

    input dataframe

  • columns (List[str]) –

    list of columns that will be filled

  • fill_number (float = -1) –

    number used to replace NAs. Defaults to

  • verbose (bool) –

    whether the output should be verbose

Source code in churn_pred/preprocessing/preprocess_data.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def nan_with_number_imputer(
    df: pd.DataFrame,
    columns: List[str],
    fill_number: float = -1.0,
    verbose: bool = False,
) -> pd.DataFrame:
    """Fills NAs with surrogate float, -1 is default value used, it can be customized.

    Args:
        df (pd.DataFrame): input dataframe
        columns (List[str]): list of columns that will be filled
        fill_number (float = -1): number used to replace NAs. Defaults to
        verbose (bool): whether the output should be verbose
    """
    dfc = df.copy()
    if verbose:
        sum_nan_vals = df[columns].isna().sum()
        nans_cols = []
        for c in columns:
            nans_cols.append((c, df[c].isna().sum()))
    dfc = df.copy()
    dfc[columns] = dfc[columns].apply(lambda x: x.astype(float).fillna(fill_number))
    if verbose:
        print(
            """
            Imputed {} NaN values with {}.
            Affected columns (col, num_NaNs): {}
            """.format(
                sum_nan_vals, fill_number, nans_cols
            )
        )
    return dfc

nuq_in_list_col

nuq_in_list_col(dfs)

Returns pd.Series with the number of unique values in the lists.

Parameters:

  • dfs (pd.Series) –

    input pandas series containing string list values

Source code in churn_pred/preprocessing/preprocess_data.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def nuq_in_list_col(dfs: pd.Series):
    """Returns pd.Series with the number of unique values in the lists.

    Args:
        dfs (pd.Series): input pandas series containing string list values
    """
    dfsc = dfs.copy()
    dfsc.apply(
        lambda string: (
            _nunique(list(string.replace("[", "").replace("]", "").split(", ")))
            if isinstance(string, str)
            else string
        )
    )
    return dfsc

most_frequent_in_list_col

most_frequent_in_list_col(dfs)

Returns pd.Series with the most frequent values in the lists.

Parameters:

  • dfs (pd.Series) –

    input pandas series containing string list values

Source code in churn_pred/preprocessing/preprocess_data.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def most_frequent_in_list_col(dfs: pd.Series):
    """Returns pd.Series with the most frequent values in the lists.

    Args:
        dfs (pd.Series): input pandas series containing string list values
    """
    dfsc = dfs.copy()
    dfsc.apply(
        lambda string: (
            _most_frequent(list(string.replace("[", "").replace("]", "").split(", ")))
            if isinstance(string, str)
            else string
        )
    )
    return dfsc

replace_rare_categories_with_str_other

replace_rare_categories_with_str_other(
    df,
    categorical_cols,
    quantile=0.05,
    surrogate_value="other",
    verbose=False,
)

Replaces rare category value with surrogate string.

Parameters:

  • df (pd.DataFrame) –

    input dataframe

  • categorical_cols (List[str]) –

    list of columns in dataframe to process.

  • quantile (float = 0.05) –

    determines what values are considered as rare

  • surrogate_value (str = "other") –

    string used to replace rare values

Returns:

  • Tuple[pd.DataFrame, Dict]

    Tuple[pd.DataFrame, Dict]: New dataframe and a dict. with mapping between orig. and surrogate values.

Source code in churn_pred/preprocessing/preprocess_data.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def replace_rare_categories_with_str_other(
    df: pd.DataFrame,
    categorical_cols: List[str],
    quantile: float = 0.05,
    surrogate_value: str = "other",
    verbose: bool = False,
) -> Tuple[pd.DataFrame, Dict]:
    """Replaces rare category value with surrogate string.

    Args:
        df (pd.DataFrame): input dataframe
        categorical_cols (List[str]): list of columns in dataframe to process.
        quantile (float = 0.05): determines what values are considered as rare
        surrogate_value (str = "other"): string used to replace rare values

    Returns:
        Tuple[pd.DataFrame, Dict]:
            New dataframe and a dict. with mapping between orig. and surrogate values.
    """
    if surrogate_value in df[categorical_cols].values:
        raise ValueError(
            "Surrogate string - "
            + surrogate_value
            + " - is already present, choose another one."
        )

    dfc = df.copy()
    replace_dict_per_col = (
        df[categorical_cols]
        .apply(_replace_rare_dict, args=[surrogate_value, quantile])
        .to_dict()
    )
    dfc = dfc.replace(replace_dict_per_col)
    if verbose:
        sum_rare_cats = sum(
            [len(replace_dict_per_col[col]) for col in replace_dict_per_col]
        )
        rare_cats_per_col = [
            (col, list(replace_dict_per_col[col].keys()))
            for col in replace_dict_per_col
        ]
        print(
            """
            Replaced {} rare categories (val_count < {}) with {}.
            Affected columns (List[(col, rare_cats)]): {}
            """.format(
                sum_rare_cats,
                quantile,
                surrogate_value,
                rare_cats_per_col,
            )
        )
    return dfc, replace_dict_per_col

preprocess_text

language_detection

language_detection(df, text_col, model_type='fasttext')
Source code in churn_pred/preprocessing/preprocess_text.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def language_detection(
    df: pd.DataFrame,
    text_col: str,
    model_type: Literal["roberta", "fasttext"] = "fasttext",
) -> pd.DataFrame:
    """
    https://huggingface.co/papluca/xlm-roberta-base-language-detection
    https://spacy.io/universe/project/spacy_fastlang
    """
    dfc = df.copy()

    if model_type == "fasttext":

        def _get_language(token: Doc) -> str:
            return token._.language

        def _get_language_score(token: Doc) -> str:
            return token._.language_score

        nlp = spacy.load("en_core_web_sm")
        nlp.add_pipe("language_detector")
        res = df[text_col].astype(str).apply(nlp)
        dfc[text_col + "_language"] = res.apply(_get_language)
        dfc[text_col + "_language_score"] = res.apply(_get_language_score)
    elif model_type == "roberta":

        def _get_language(token: Doc) -> str:
            return token._.language

        def _get_language_score(token: Doc) -> str:
            return token._.language_score

        pipe = pipeline(
            "text-classification", model="papluca/xlm-roberta-base-language-detection"
        )
        res = pipe(dfc["CustomerFeedback"].astype(str).to_list())
        dfc[text_col + "_language"] = pipe(dfc[text_col].astype(str))
    else:
        raise NotImplementedError
    return dfc

text_cleaning

text_cleaning(df, text_col)

Returns dataframe with preprocessed/cleaned text column. Spacy-cleaner that uses spacy functionalities. https://spacy.io/universe/project/spacy-cleaner

Note

The spacy-cleaner library does not do much. Future task - develop own cleaner

Parameters:

  • df (pd.DataFrame) –

    input dataset

  • text_col (str) –

    column name with text

Source code in churn_pred/preprocessing/preprocess_text.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def text_cleaning(df: pd.DataFrame, text_col: str) -> pd.DataFrame:
    """
    Returns dataframe with preprocessed/cleaned text column.
    Spacy-cleaner that uses spacy functionalities.
    https://spacy.io/universe/project/spacy-cleaner

    Note:
        The spacy-cleaner library does not do much.
        Future task - develop own cleaner

    Args:
        df (pd.DataFrame): input dataset
        text_col: column name with text
    """
    dfc = df.copy()
    model = spacy.load("en_core_web_sm")
    cleaner = spacy_cleaner.Cleaner(
        model,
        removers.remove_stopword_token,
        replacers.replace_punctuation_token,
        mutators.mutate_lemma_token,
    )
    dfc[text_col] = cleaner.clean(dfc[text_col].astype(str))

    return dfc

sentiment_analysis

sentiment_analysis(df, text_col, sentiment_depth=3)

Returns dataframe with new column that analysis sentintent in the text_col.

initial idea: Inspired by https://www.nature.com/articles/s41598-024-60210-7 I also used the 3 most popular models with voting: 1. cardiffnlp/twitter-roberta-base-sentiment-latest 2. nlptown/bert-base-multilingual-uncased-sentiment 3. mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis 4. lxyuan/distilbert-base-multilingual-cased-sentiments-student 5. finiteautomata/bertweet-base-sentiment-analysis

issues: 1. 3(pos, neutral, neg) vs 5(1-5 stars) sentiments; models 1,3 vs 2 2. maximum sequence length models 4-5

final idea: Either 3 sentimnets(model 1) or 5 stars (model 2)

Parameters:

  • df (pd.DataFrame) –

    input dataset

  • text_col (str) –

    column name with text

  • sentiment_depth ([3, 5]) –

    depth of sentiment analysis

Source code in churn_pred/preprocessing/preprocess_text.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def sentiment_analysis(
    df: pd.DataFrame, text_col: str, sentiment_depth: Literal[3, 5] = 3
) -> pd.DataFrame:
    """
    Returns dataframe with new column that analysis sentintent in the text_col.

    initial idea:
    Inspired by https://www.nature.com/articles/s41598-024-60210-7 I also used
    the 3 most popular models with voting:
    1. cardiffnlp/twitter-roberta-base-sentiment-latest
    2. nlptown/bert-base-multilingual-uncased-sentiment
    3. mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis
    4. lxyuan/distilbert-base-multilingual-cased-sentiments-student
    5. finiteautomata/bertweet-base-sentiment-analysis

    issues:
    1. 3(pos, neutral, neg) vs 5(1-5 stars) sentiments; models 1,3 vs 2
    2. maximum sequence length models 4-5

    final idea:
    Either 3 sentimnets(model 1) or 5 stars (model 2)

    Args:
        df (pd.DataFrame): input dataset
        text_col (str): column name with text
        sentiment_depth ([3, 5]): depth of sentiment analysis
    """
    dfc = df.copy()
    if sentiment_depth == 3:
        pipe = pipeline(
            "text-classification",
            model="cardiffnlp/twitter-roberta-base-sentiment-latest",
            device=torch.device("cuda" if torch.cuda.is_available() else "cpu"),
        )
    elif sentiment_depth == 5:
        pipe = pipeline(
            "text-classification",
            model="nlptown/bert-base-multilingual-uncased-sentiment",
            device=torch.device("cuda" if torch.cuda.is_available() else "cpu"),
        )
    else:
        raise NotImplementedError
    res = pipe(dfc[text_col].astype(str).to_list())
    res = pd.DataFrame(res).rename(
        columns={
            "label": text_col + "_sentiment",
            "score": text_col + "_sentiment_score",
        }
    )
    return pd.concat([dfc, res], axis=1)

preprocess

PreprocessData

PreprocessData(
    target_col, id_cols, cat_cols=None, cont_cols=None
)

Object to preprocess the dataset.

Parameters:

  • target_col (str) –

    target column name

  • id_cols (List[str]) –

    id columns

  • cat_cols (Optional[List[str]]) –

    list of categorical column names

  • cont_cols (Optional[List[str]]) –

    list of continuous column names

Source code in churn_pred/preprocessing/preprocess.py
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(
    self,
    target_col: str,
    id_cols: str,
    cat_cols: Optional[List[str]] = None,
    cont_cols: Optional[List[str]] = None,
):
    self.target_col = target_col
    self.id_cols = id_cols
    self.cat_cols = cat_cols
    self.cont_cols = cont_cols

    self.is_fitted = False

fit_transform

fit_transform(df)

Fit peprocessor and transform dataset in training step.

Source code in churn_pred/preprocessing/preprocess.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
    """Fit peprocessor and transform dataset in training step."""

    dfc = df.drop(columns=self.id_cols).copy()

    dfc = dfc.pipe(drop_constant_cols)

    self.cat_cols = intsec(list(dfc), self.cat_cols)

    if self.cat_cols is None:
        self.cat_cols = self._infere_cat_cols(dfc)
        dfc_cat_cols = drop_high_nan_cols(dfc[self.cat_cols])
        dfc = dfc.drop(columns=self.cat_cols)
        dfc = pd.concat([dfc, dfc_cat_cols], axis=1)
        self.cat_cols = dfc_cat_cols.columns.values.tolist()

    if self.cont_cols is None:
        self.init_cont_cols = self._infere_cont_cols(dfc, self.cat_cols)
        dfc_init_cont_cols = drop_high_nan_cols(dfc[self.init_cont_cols])
        dfc = dfc.drop(columns=self.init_cont_cols)
        dfc = pd.concat([dfc, dfc_init_cont_cols], axis=1)
        self.init_cont_cols = dfc_init_cont_cols.columns.values.tolist()

        dfc = nan_with_number_imputer(dfc, self.init_cont_cols, -9999.0)

        (
            dfc,
            self.final_cont_cols,
        ) = self._drop_highly_corr_cols_and_get_final_cont_cols(
            dfc, self.init_cont_cols
        )

    else:
        self.final_cont_cols = list(
            set(self.cont_cols).intersection(set(dfc.columns.values))
        )

        dfc = nan_with_number_imputer(dfc, self.final_cont_cols, -9999.0)

    dfc = nan_with_unknown_imputer(dfc, self.cat_cols)

    dfc = dfc[self.cat_cols + self.final_cont_cols + [self.target_col]]

    self.label_encoder = LabelEncoder(self.cat_cols)
    dfc_le = self.label_encoder.fit_transform(dfc)

    dfc_le = self._change_int_float_types(dfc_le)

    dfc_le[self.id_cols] = df[self.id_cols]

    self.is_fitted = True

    return dfc_le

transform

transform(df)

Transform dataset in inference step.

Source code in churn_pred/preprocessing/preprocess.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
    """Transform dataset in inference step."""

    if not self.is_fitted:
        raise NotFittedError(
            """This instance of 'PreprocessData' has not been fitted yet.
            Please, run 'fit' first"""
        )

    dfc = df.drop(columns=self.id_cols).copy()

    # added as mlflow inference is receiving all the data as objects
    dfc[self.cat_cols] = dfc[self.cat_cols].astype(str)
    dfc[self.final_cont_cols] = dfc[self.final_cont_cols].astype(float)

    try:
        dfc = dfc[self.cat_cols + self.final_cont_cols + [self.target_col]]
    except KeyError:
        dfc = dfc[self.cat_cols + self.final_cont_cols]

    # dfc = dfc.replace(self.replace_rare_categories_dict)

    dfc = nan_with_unknown_imputer(dfc, self.cat_cols)

    dfc = nan_with_number_imputer(
        dfc,
        list(set(self.final_cont_cols)),
        -9999.0,  # noqa
    )

    dfc_le = self.label_encoder.transform(dfc)

    dfc_le = self._change_int_float_types(dfc_le)

    dfc_le[self.id_cols] = df[self.id_cols]

    return dfc_le

fit

fit(df)

Just to keep familiar naming convention with sklearn.

Source code in churn_pred/preprocessing/preprocess.py
134
135
136
def fit(self, df: pd.DataFrame) -> pd.DataFrame:
    """Just to keep familiar naming convention with sklearn."""
    return self.fit_transform(df)

scaler

scaler_mapper

scaler_mapper(
    cont_cols, cat_cols, id_cols, scaler_mapper_def=None
)

Function that maps scaler functions to appropriate columns.

By default does not assign any scaler to continuous, categorical or identifier columns. The scalers must be set in scaler_mapper_def. Use sklearn scalers. Only columns defined in mapper object will be present in the transformed dataset.

Parameters:

  • cont_cols (list) –

    list of continuous feature columns in the dataset

  • cat_cols (list) –

    list of categorical feature columns in the dataset

  • id_cols (list) –

    identifier columns

  • scaler_mapper_def (dict) –

    optional dictionary that contains keys ['cont_cols', 'cat_cols', 'id_cols'] with their corresponding scalers (defined by names, not instantiated) from sklearn library

Returns:

  • scaler( DataFrameMapper ) –

    scaler object mapping sklearn scalers to columns in pandas dataframe

Source code in churn_pred/preprocessing/scaler.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def scaler_mapper(
    cont_cols: List[str],
    cat_cols: List[str],
    id_cols: List[str],
    scaler_mapper_def: Optional[dict] = None,
) -> DataFrameMapper:
    """Function that maps scaler functions to appropriate columns.

    By default does not assign any scaler to continuous, categorical or
    identifier columns. The scalers must be set in scaler_mapper_def. Use sklearn scalers.
    Only columns defined in mapper object will be present in the transformed dataset.

    Args:
        cont_cols (list): list of continuous feature columns in the dataset
        cat_cols (list): list of categorical feature columns in the dataset
        id_cols (list): identifier columns
        scaler_mapper_def (dict): optional dictionary that contains keys
            ['cont_cols', 'cat_cols', 'id_cols'] with their corresponding
            scalers (defined by names, not instantiated) from sklearn library
    Returns:
        scaler (DataFrameMapper): scaler object mapping sklearn scalers to columns in
            pandas dataframe
    """
    if scaler_mapper_def:
        cont_cols_def = gen_features(
            columns=list(map(lambda x: [x], cont_cols)),
            classes=[scaler_mapper_def["cont_cols"]],
        )

        cat_cols_def = gen_features(
            columns=list(map(lambda x: [x], cat_cols)),
            classes=[scaler_mapper_def["cat_cols"]],
        )

        id_cols_def = gen_features(
            columns=list(map(lambda x: [x], id_cols)),
            classes=[scaler_mapper_def["id_cols"]],
        )

    else:
        cont_cols_def = gen_features(
            columns=list(map(lambda x: [x], cont_cols)), classes=[None]
        )

        cat_cols_def = gen_features(
            columns=list(map(lambda x: [x], cat_cols)), classes=[None]
        )

        id_cols_def = gen_features(
            columns=list(map(lambda x: [x], id_cols)), classes=[None]
        )

    scaler = DataFrameMapper(cont_cols_def + cat_cols_def + id_cols_def, df_out=True)
    return scaler