Skip to content

suhec

label_encoder

LabelEncoder

LabelEncoder(columns_to_encode=None)

Bases: object

Label Encode categorical values for multiple columns at once

ℹ️ NOTE: Shamlessly copied from https://github.com/jrzaurin/pytorch-widedeep

ℹ️ NOTE: LabelEncoder reserves 0 for unseen new categories. This is convenient when defining the embedding layers, since we can just set padding idx to 0.

Parameters:

  • columns_to_encode (list, Optional, default = None) –

    List of strings containing the names of the columns to encode. If None all columns of type object in the dataframe will be label encoded.

Attributes:

  • encoding_dict (Dict) –

    Dictionary containing the encoding mappings in the format, e.g. :
    {'colname1': {'cat1': 1, 'cat2': 2, ...}, 'colname2': {'cat1': 1, 'cat2': 2, ...}, ...} # noqa

  • inverse_encoding_dict(Dict) (Dict) –

    Dictionary containing the inverse encoding mappings in the format, e.g. :
    {'colname1': {1: 'cat1', 2: 'cat2', ...}, 'colname2': {1: 'cat1', 2: 'cat2', ...}, ...} # noqa

Source code in suhec/label_encoder.py
29
30
31
32
33
def __init__(
    self,
    columns_to_encode: Optional[List[str]] = None,
):
    self.columns_to_encode = columns_to_encode

fit

fit(df)

Creates encoding attributes

Returns:

  • LabelEncoder( LabelEncoder ) –

    LabelEncoder fitted object

Source code in suhec/label_encoder.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def fit(self, df: pd.DataFrame) -> "LabelEncoder":
    """Creates encoding attributes

    Returns:
        LabelEncoder: `LabelEncoder` fitted object
    """

    df_inp = df.copy()

    if self.columns_to_encode is None:
        self.columns_to_encode = list(
            df_inp.select_dtypes(include=["object"]).columns
        )
    else:
        # sanity check to make sure all categorical columns are in an adequate
        # format
        for col in self.columns_to_encode:
            df_inp[col] = df_inp[col].astype("O")

    unique_column_vals = dict()
    for c in self.columns_to_encode:
        unique_column_vals[c] = df_inp[c].unique()

    self.encoding_dict = dict()

    # leave 0 for padding/"unseen" categories
    idx = 1
    for k, v in unique_column_vals.items():
        self.encoding_dict[k] = {
            o: i + idx for i, o in enumerate(unique_column_vals[k])
        }
        idx = 1

    self.inverse_encoding_dict = dict()
    for c in self.encoding_dict:
        self.inverse_encoding_dict[c] = {
            v: k for k, v in self.encoding_dict[c].items()
        }
        self.inverse_encoding_dict[c][0] = "unseen"

    return self

transform

transform(df)

Label Encoded the categories in columns_to_encode

Returns:

  • pd.DataFrame

    pd.DataFrame: label-encoded dataframe

Source code in suhec/label_encoder.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
    """Label Encoded the categories in `columns_to_encode`

    Returns:
        pd.DataFrame: label-encoded dataframe
    """
    try:
        self.encoding_dict
    except AttributeError:
        raise NotFittedError(
            "This LabelEncoder instance is not fitted yet. "
            "Call 'fit' with appropriate arguments before using this LabelEncoder."
        )

    df_inp = df.copy()
    # sanity check to make sure all categorical columns are in an adequate
    # format
    for col in self.columns_to_encode:  # type: ignore
        df_inp[col] = df_inp[col].astype("O")

    for k, v in self.encoding_dict.items():
        df_inp[k] = df_inp[k].apply(lambda x: v[x] if x in v.keys() else 0)

    return df_inp

fit_transform

fit_transform(df)

Combines fit and transform

Returns:

  • pd.DataFrame

    pd.DataFrame: label-encoded dataframe

Examples:

>>> import pandas as pd
>>> from data_preparation.label_encoder import LabelEncoder
>>> df = pd.DataFrame({'col1': [1,2,3], 'col2': ['me', 'you', 'him']})
>>> columns_to_encode = ['col2']
>>> encoder = LabelEncoder(columns_to_encode)
>>> encoder.fit_transform(df)
   col1  col2
0     1     1
1     2     2
2     3     3
>>> encoder.encoding_dict
{'col2': {'me': 1, 'you': 2, 'him': 3}}
Source code in suhec/label_encoder.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
    """Combines `fit` and `transform`

    Returns:
        pd.DataFrame: label-encoded dataframe

    Examples:
        >>> import pandas as pd
        >>> from data_preparation.label_encoder import LabelEncoder
        >>> df = pd.DataFrame({'col1': [1,2,3], 'col2': ['me', 'you', 'him']})
        >>> columns_to_encode = ['col2']
        >>> encoder = LabelEncoder(columns_to_encode)
        >>> encoder.fit_transform(df)
           col1  col2
        0     1     1
        1     2     2
        2     3     3
        >>> encoder.encoding_dict
        {'col2': {'me': 1, 'you': 2, 'him': 3}}
    """
    return self.fit(df).transform(df)

inverse_transform

inverse_transform(df)

Returns the original categories

Returns:

  • pd.DataFrame

    pd.DataFrame: label-encoded dataframe

Examples:

>>> import pandas as pd
>>> from data_preparation.label_encoder import LabelEncoder
>>> df = pd.DataFrame({'col1': [1,2,3], 'col2': ['me', 'you', 'him']})
>>> columns_to_encode = ['col2']
>>> encoder = LabelEncoder(columns_to_encode)
>>> df_enc = encoder.fit_transform(df)
>>> encoder.inverse_transform(df_enc)
   col1 col2
0     1   me
1     2  you
2     3  him
Source code in suhec/label_encoder.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
    """Returns the original categories

    Returns:
        pd.DataFrame: label-encoded dataframe

    Examples:
        >>> import pandas as pd
        >>> from data_preparation.label_encoder import LabelEncoder
        >>> df = pd.DataFrame({'col1': [1,2,3], 'col2': ['me', 'you', 'him']})
        >>> columns_to_encode = ['col2']
        >>> encoder = LabelEncoder(columns_to_encode)
        >>> df_enc = encoder.fit_transform(df)
        >>> encoder.inverse_transform(df_enc)
           col1 col2
        0     1   me
        1     2  you
        2     3  him
    """
    for k, v in self.inverse_encoding_dict.items():
        df[k] = df[k].apply(lambda x: v[x])
    return df

surname_classification

create_optimizer

create_optimizer(model, learning_rate=5e-05, eps=1e-08)

Creates an optimizer for the BERT model.

  • model: The BERT model.
  • learning_rate: Learning rate for the optimizer.
  • eps: Epsilon for the AdamW optimizer.
  • An AdamW optimizer.
Source code in suhec/surname_classification.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def create_optimizer(model, learning_rate=5e-5, eps=1e-8):
    """
    Creates an optimizer for the BERT model.
    Args:
    - model: The BERT model.
    - learning_rate: Learning rate for the optimizer.
    - eps: Epsilon for the AdamW optimizer.

    Returns:
    - An AdamW optimizer.
    """
    # List of model parameters
    param_optimizer = list(model.named_parameters())

    # We will apply weight decay to all parameters except bias and layer normalization terms
    no_decay = ["bias", "LayerNorm.weight"]
    optimizer_grouped_parameters = [
        {
            "params": [
                p for n, p in param_optimizer if not any(nd in n for nd in no_decay)
            ],
            "weight_decay": 0.01,
        },
        {
            "params": [
                p for n, p in param_optimizer if any(nd in n for nd in no_decay)
            ],
            "weight_decay": 0.0,
        },
    ]

    # Create the optimizer
    optimizer = AdamW(optimizer_grouped_parameters, lr=learning_rate, eps=eps)

    return optimizer

train_model

train_model(
    model,
    train_dataloader,
    validation_dataloader,
    optimizer,
    scheduler,
    device,
    num_epochs=10,
    specific_epoch_to_defreeze=5,
)

Trains and evaluates the BERT model.

  • model: The BERT model for classification.
  • train_dataloader: DataLoader for the training data.
  • validation_dataloader: DataLoader for the validation data.
  • optimizer: Optimizer for training.
  • device: Device to train on (e.g., 'cuda', 'cpu').
  • num_epochs: Number of training epochs.
  • The trained model.
Source code in suhec/surname_classification.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def train_model(
    model,
    train_dataloader,
    validation_dataloader,
    optimizer,
    scheduler,
    device,
    num_epochs=10,
    specific_epoch_to_defreeze=5,
):
    """
    Trains and evaluates the BERT model.
    Args:
    - model: The BERT model for classification.
    - train_dataloader: DataLoader for the training data.
    - validation_dataloader: DataLoader for the validation data.
    - optimizer: Optimizer for training.
    - device: Device to train on (e.g., 'cuda', 'cpu').
    - num_epochs: Number of training epochs.

    Returns:
    - The trained model.
    """
    model.to(device)

    for epoch in range(num_epochs):
        if (
            epoch == specific_epoch_to_defreeze
        ):  # Replace with the epoch number you choose
            _defreeze_all_bert_layers(model)

        model.train()
        total_train_loss = 0
        progress_bar = tqdm(
            train_dataloader,
            desc=f"Epoch {epoch + 1}/{num_epochs}",
            leave=False,
            disable=False,
        )

        for batch in progress_bar:
            b_input_ids, b_input_mask, b_labels = batch
            b_input_ids = b_input_ids.to(device)
            b_input_mask = b_input_mask.to(device)
            b_labels = b_labels.to(device)

            model.zero_grad()

            outputs = model(
                b_input_ids,
                token_type_ids=None,
                attention_mask=b_input_mask,
                labels=b_labels,
            )

            loss = outputs.loss
            total_train_loss += loss.item()
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            scheduler.step()

            progress_bar.set_postfix(
                {"training_loss": "{:.3f}".format(loss.item() / len(batch))}
            )

        avg_train_loss = total_train_loss / len(train_dataloader)
        val_loss, val_accuracy = evaluate_model(model, validation_dataloader, device)

        print(f"\nEpoch {epoch + 1}/{num_epochs}")
        print(f"Training Loss: {avg_train_loss:.3f}")
        print(
            f"Validation Loss: {val_loss:.3f}, Validation Accuracy: {val_accuracy:.3f}"
        )

    print("Training complete")
    return model

evaluate_model

evaluate_model(model, dataloader, device)

Evaluates the BERT model.

  • model: The trained BERT model.
  • dataloader: DataLoader for the validation or test data.
  • device: Device for evaluation (e.g., 'cuda', 'cpu').
  • Average loss and accuracy of the model on the given data.
Source code in suhec/surname_classification.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def evaluate_model(model, dataloader, device):
    """
    Evaluates the BERT model.
    Args:
    - model: The trained BERT model.
    - dataloader: DataLoader for the validation or test data.
    - device: Device for evaluation (e.g., 'cuda', 'cpu').

    Returns:
    - Average loss and accuracy of the model on the given data.
    """
    model.eval()
    model.to(device)
    total_loss, total_accuracy = 0, 0

    for batch in dataloader:
        b_input_ids, b_input_mask, b_labels = batch
        b_input_ids = b_input_ids.to(device)
        b_input_mask = b_input_mask.to(device)
        b_labels = b_labels.to(device)

        with torch.no_grad():
            outputs = model(
                b_input_ids,
                token_type_ids=None,
                attention_mask=b_input_mask,
                labels=b_labels,
            )

        logits = outputs.logits
        loss = outputs.loss
        total_loss += loss.item()

        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to("cpu").numpy()
        total_accuracy += _flat_accuracy(logits, label_ids)

    avg_loss = total_loss / len(dataloader)
    avg_accuracy = total_accuracy / len(dataloader)
    return avg_loss, avg_accuracy

predict_nationality

predict_nationality(
    model, surnames, tokenizer, inverse_label_dict, device
)

Predicts and decodes the nationalities of given surnames.

  • model: The trained BERT model.
  • surnames: List of surnames to predict.
  • tokenizer: The tokenizer used for BERT model.
  • inverse_label_dict: Dictionary for converting numeric labels back to nationalities.
  • device: Device for prediction (e.g., 'cuda', 'cpu').
  • Decoded predictions for each surname.
Source code in suhec/surname_classification.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def predict_nationality(model, surnames, tokenizer, inverse_label_dict, device):
    """
    Predicts and decodes the nationalities of given surnames.
    Args:
    - model: The trained BERT model.
    - surnames: List of surnames to predict.
    - tokenizer: The tokenizer used for BERT model.
    - inverse_label_dict: Dictionary for converting numeric labels back to nationalities.
    - device: Device for prediction (e.g., 'cuda', 'cpu').

    Returns:
    - Decoded predictions for each surname.
    """
    model.eval()
    model.to(device)

    predictions = []
    progress_bar = tqdm(
        surnames, desc="Predicting Nationalities", leave=False, disable=False
    )

    with torch.no_grad():
        for surname in progress_bar:
            encoded_surname = tokenizer.encode_plus(
                surname,
                add_special_tokens=True,
                max_length=128,
                pad_to_max_length=True,
                return_attention_mask=True,
                return_tensors="pt",
            )

            input_ids = encoded_surname["input_ids"].to(device)
            attention_mask = encoded_surname["attention_mask"].to(device)

            output = model(input_ids, attention_mask=attention_mask)
            logits = output.logits
            probs = torch.nn.functional.softmax(logits, dim=1)
            predicted_label = torch.argmax(probs, dim=1).cpu().numpy()[0]

            decoded_label = inverse_label_dict[predicted_label]
            predictions.append((surname, decoded_label))

    return predictions

utils

get_kepler_pod_stats

get_kepler_pod_stats(
    to_timestamp,
    from_timestamp,
    prometheus_url="http://prometheus-kube-prometheus-prometheus.monitoring:9090",
    container_namespace="jupyterhub",
    pod_name="jupyter-5uperpalo",
)

Function to query Kepler power consumption data of specific pod in Kubernetes.

https://sustainable-computing.io/design/kepler-energy-sources/

https://github.com/sustainable-computing-io/kepler/blob/1c397ff00b72b5cb1585d0de2cd495c73d88f07a/grafana-dashboards/Kepler-Exporter.json#L299

https://prometheus.io/docs/prometheus/latest/querying/basics/#time-durations

[metric for metric in prom.all_metrics() if "kepler" in metric]

Parameters:

  • to_timestamp (list) –

    'to' timestamp

  • from_timestamp (list) –

    'from' timestamp

  • prometheus_url (str) –

    Prometheus service url

  • container_namespace (str) –

    Kubernetes pod namespace name

  • pod_name (str) –

    Kubernetes namespace name

Returns:

  • metrics( dict ) –

    Kepler metrics of the power consumption of pod in Kubernetes

Source code in suhec/utils.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def get_kepler_pod_stats(
    to_timestamp: float,
    from_timestamp: float,
    prometheus_url: str = "http://prometheus-kube-prometheus-prometheus.monitoring:9090",
    container_namespace: str = "jupyterhub",
    pod_name: str = "jupyter-5uperpalo",
) -> dict:
    """Function to query Kepler power consumption data of specific pod in Kubernetes.

    # https://sustainable-computing.io/design/kepler-energy-sources/
    # https://github.com/sustainable-computing-io/kepler/blob/1c397ff00b72b5cb1585d0de2cd495c73d88f07a/grafana-dashboards/Kepler-Exporter.json#L299
    # https://prometheus.io/docs/prometheus/latest/querying/basics/#time-durations
    # [metric for metric in prom.all_metrics() if "kepler" in metric]

    Args:
        to_timestamp (list): 'to' timestamp
        from_timestamp (list): 'from' timestamp
        prometheus_url (str): Prometheus service url
        container_namespace (str): Kubernetes pod namespace name
        pod_name (str): Kubernetes namespace name
    Returns:
        metrics (dict): Kepler metrics of the power consumption of pod in Kubernetes
    """
    prom = PrometheusConnect(url=prometheus_url, disable_ssl=True)

    pod_name = f"'{pod_name}'"
    container_namespace = f"'{container_namespace}'"

    time_range_sec = str(int(to_timestamp - from_timestamp))
    container_sum_query = f"sum by (pod_name, container_namespace) (irate(kepler_container_joules_total{{container_namespace={container_namespace}, pod_name={pod_name}}}[{time_range_sec}s] @ {str(to_timestamp)}))"
    container_core_query = f"sum by (pod_name, container_namespace) (irate(kepler_container_core_joules_total{{container_namespace={container_namespace}, pod_name={pod_name}}}[{time_range_sec}s] @ {str(to_timestamp)}))"
    container_uncore_query = f"sum by (pod_name, container_namespace) (irate(kepler_container_uncore_joules_total{{container_namespace={container_namespace}, pod_name={pod_name}}}[{time_range_sec}s] @ {str(to_timestamp)}))"
    container_pkg_query = f"sum by (pod_name, container_namespace) (irate(kepler_container_package_joules_total{{container_namespace={container_namespace}, pod_name={pod_name}}}[{time_range_sec}s] @ {str(to_timestamp)}))"
    container_dram_query = f"sum by (pod_name, container_namespace) (irate(kepler_container_dram_joules_total{{container_namespace={container_namespace}, pod_name={pod_name}}}[{time_range_sec}s] @ {str(to_timestamp)}))"
    container_other_query = f"sum by (pod_name, container_namespace) (irate(kepler_container_other_joules_total{{container_namespace={container_namespace}, pod_name={pod_name}}}[{time_range_sec}s] @ {str(to_timestamp)}))"
    container_gpu_query = f"sum by (pod_name, container_namespace) (irate(kepler_container_gpu_joules_total{{container_namespace={container_namespace}, pod_name={pod_name}}}[{time_range_sec}s] @ {str(to_timestamp)}))"

    sum_data = prom.custom_query(query=container_sum_query)
    core_data = prom.custom_query(query=container_core_query)
    uncore_data = prom.custom_query(query=container_uncore_query)
    pkg_data = prom.custom_query(query=container_pkg_query)
    dram_data = prom.custom_query(query=container_dram_query)
    other_data = prom.custom_query(query=container_other_query)
    gpu_data = prom.custom_query(query=container_gpu_query)

    metrics = {
        "from": datetime.fromtimestamp(from_timestamp).strftime("%m/%d/%Y, %H:%M:%S"),
        "to": datetime.fromtimestamp(to_timestamp).strftime("%m/%d/%Y, %H:%M:%S"),
        "sum": float(sum_data[0]["value"][1]),
        "core": float(core_data[0]["value"][1]),
        "uncore": float(uncore_data[0]["value"][1]),
        "pkg": float(pkg_data[0]["value"][1]),
        "dram": float(dram_data[0]["value"][1]),
        "other": float(other_data[0]["value"][1]),
        "gpu": float(gpu_data[0]["value"][1]),
    }
    return metrics

intsec

intsec(list1, list2)

Simple intesection of two lists.

Parameters:

  • list1 (list) –

    list1

  • list2 (list) –

    list2

Returns:

  • list( list ) –

    intersection of lists

Source code in suhec/utils.py
68
69
70
71
72
73
74
75
76
def intsec(list1: list, list2: list) -> list:
    """Simple intesection of two lists.
    Args:
        list1 (list): list1
        list2 (list): list2
    Returns:
        list (list): intersection of lists
    """
    return list(set.intersection(set(list1), set(list2)))

dill_load

dill_load(file_loc)

Helper function to open/close dill file, otherwise the python outputs warning that the file remains opened

Parameters:

  • file_loc (str) –

    location of the file

Returns:

  • content( dict ) –

    content of dill file, usually dictionary

Source code in suhec/utils.py
79
80
81
82
83
84
85
86
87
88
89
90
def dill_load(file_loc: Union[str, Path]) -> Any:
    """Helper function to open/close dill file,
    otherwise the python outputs warning that the file remains opened

    Args:
        file_loc (str): location of the file
    Returns:
        content (dict): content of dill file, usually dictionary
    """
    with open(file_loc, "rb") as f:
        content = dill.load(f)
    return content

dill_dump

dill_dump(file_loc, content)

Helper function to open/close dill file and dump content into it, otherwise the python outputs warning that the file remains opened

Parameters:

  • file_loc (str) –

    location of the file

  • content (object) –

    data that will be saved to dill, usually dictionary

Source code in suhec/utils.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
def dill_dump(file_loc: Union[str, Path], content: object):
    """Helper function to open/close dill file and dump content into it,
    otherwise the python outputs warning that the file remains opened

    Args:
        file_loc (str): location of the file
        content (object): data that will be saved to dill, usually dictionary
    """
    with open(file_loc, "wb") as f:
        dill.dump(content, f)