Skip to content

Utils

df_row_to_influxdb_point(row)

Function to convert a DataFrame row to an InfluxDB Point

Source code in inference_model/utils.py
 93
 94
 95
 96
 97
 98
 99
100
101
def df_row_to_influxdb_point(row) -> Point:
    """Function to convert a DataFrame row to an InfluxDB Point"""
    # TODO vehicle_id and timestamp column have to be specified in some config file
    return (
        Point("prediction")
        .tag("vehicle_id", row["vehicle_id"])
        .field("value", row["class"])
        .time(row["timestamp"])
    )

dill_dump(file_loc, content)

Helper function to open/close dill file and dump content into it, otherwise the python outputs warning that the file remains opened

Parameters:

Name Type Description Default
file_loc str

location of the file

required
content object

data that will be saved to dill, usually dictionary

required
Source code in inference_model/utils.py
36
37
38
39
40
41
42
43
44
45
def dill_dump(file_loc: Union[str, Path], content: object):
    """Helper function to open/close dill file and dump content into it,
    otherwise the python outputs warning that the file remains opened

    Args:
        file_loc (str): location of the file
        content (object): data that will be saved to dill, usually dictionary
    """
    with open(file_loc, "wb") as f:
        dill.dump(content, f)

dill_load(file_loc)

Helper function to open/close dill file, otherwise the python outputs warning that the file remains opened

Parameters:

Name Type Description Default
file_loc str

location of the file

required
Source code in inference_model/utils.py
22
23
24
25
26
27
28
29
30
31
32
33
def dill_load(file_loc: Union[str, Path]) -> Any:
    """Helper function to open/close dill file,
    otherwise the python outputs warning that the file remains opened

    Args:
        file_loc (str): location of the file
    Returns:
        content (dict): content of dill file, usually dictionary
    """
    with open(file_loc, "rb") as f:
        content = dill.load(f)
    return content

handle_redis_message(message)

Handle Redis channel messages: 1. decode messages from Redis channel 2. make a prediction 3. convert pandas rows to influxdb points 4. write into the influxdb

Source code in inference_model/utils.py
78
79
80
81
82
83
84
85
86
87
88
89
90
def handle_redis_message(message):
    """Handle Redis channel messages:
    1. decode messages from Redis channel
    2. make a prediction
    3. convert pandas rows to influxdb points
    4. write into the influxdb
    """
    # Convert the message data (bytes) to string
    json_data = message["data"].decode("utf-8")

    # Convert JSON string to DataFrame
    df = pd.read_json(json_data, orient="split")
    return df

intsec(list1, list2)

Simple intesection of two lists. Args: list1 (list): list1 list2 (list): list2 Returns: list (list): intersection of lists

Source code in inference_model/utils.py
11
12
13
14
15
16
17
18
19
def intsec(list1: list, list2: list) -> list:
    """Simple intesection of two lists.
    Args:
        list1 (list): list1
        list2 (list): list2
    Returns:
        list (list): intersection of lists
    """
    return list(set.intersection(set(list1), set(list2)))