diff --git a/predict_pipeline/predict_sample.py b/predict_pipeline/predict_sample.py index 65b51db..8502f68 100644 --- a/predict_pipeline/predict_sample.py +++ b/predict_pipeline/predict_sample.py @@ -1,235 +1,253 @@ -# Imports -import pandas as pd -import json -from pathlib import Path -import numpy as np -import sys -import yaml -import pickle -#sys.path.append('/home/edgekit/MSY_FS/fahrsimulator_msy2526_ai/tools') -sys.path.append(r"c:\\repo\\Fahrsimulator_MSY2526_AI\\tools") -import db_helpers -import joblib - -def _load_serialized(path: Path): - suffix = path.suffix.lower() - if suffix == ".pkl": - with path.open("rb") as f: - return pickle.load(f) - if suffix == ".joblib": - return joblib.load(path) - raise ValueError(f"Unsupported file format: {suffix}. Use .pkl or .joblib.") - -def getLastEntryFromSQLite(path, table_name, key="_Id"): - conn, cursor = db_helpers.connect_db(path) - try: - row_df = db_helpers.get_data_from_table( - conn=conn, - table_name=table_name, - order_by={key: "DESC"}, - limit=1, - ) - finally: - db_helpers.disconnect_db(conn, cursor, commit=False) - - if row_df.empty: - return pd.Series(dtype="object") - - return row_df.iloc[0] - -def callModel(sample, model_path): - if callable(sample): - raise TypeError( - f"Invalid sample type: got callable `{getattr(sample, '__name__', type(sample).__name__)}`. " - "Expected numpy array / pandas row." - ) - - model_path = Path(model_path) - if not model_path.is_absolute(): - model_path = Path.cwd() / model_path - model_path = model_path.resolve() - - suffix = model_path.suffix.lower() - if suffix in {".pkl", ".joblib"}: - model = _load_serialized(model_path) - elif suffix == ".keras": - import tensorflow as tf - model = tf.keras.models.load_model(model_path) - else: - raise ValueError(f"Unsupported model format: {suffix}. Use .pkl, .joblib, or .keras.") - - x = np.asarray(sample, dtype=np.float32) - if x.ndim == 1: - x = x.reshape(1, -1) - - if suffix == ".keras": - x_full = x - # Future model (35 features): keep this call when your new model is active. - # prediction = model.predict(x_full[:, :35], verbose=0) - prediction = model.predict(x_full[:, :20], verbose=0) - - else: - if hasattr(model, "predict"): - prediction = model.predict(x[:,:20]) - elif callable(model): - prediction = model(x[:,:20]) - else: - raise TypeError("Loaded model has no .predict(...) and is not callable.") - - prediction = np.asarray(prediction) - if prediction.size == 1: - return prediction.item() - return prediction.squeeze() - -def buildMessage(valid, result: np.int32, config_file_path, sample=None): - with Path(config_file_path).open("r", encoding="utf-8") as f: - cfg = yaml.safe_load(f) - - mqtt_cfg = cfg.get("mqtt", {}) - result_key = mqtt_cfg.get("publish_format", {}).get("result_key", "prediction") - - sample_id = None - if isinstance(sample, pd.Series): - sample_id = sample.get("_Id", sample.get("_id")) - elif isinstance(sample, dict): - sample_id = sample.get("_Id", sample.get("_id")) - - message = { - "valid": bool(valid), - "_id": sample_id, - result_key: np.asarray(result).tolist() if isinstance(result, np.ndarray) else result, - } - return message - -def sendMessage(config_file_path, message): - with Path(config_file_path).open("r", encoding="utf-8") as f: - cfg = yaml.safe_load(f) - - mqtt_cfg = cfg.get("mqtt", {}) - topic = mqtt_cfg.get("topic", "ml/predictions") - - payload = json.dumps(message, ensure_ascii=False) - print(payload) - - # Later: publish via MQTT using config parameters above. - # Example (kept commented intentionally): - # import paho.mqtt.client as mqtt - # client = mqtt.Client(client_id=mqtt_cfg.get("client_id", "predictor-01")) - # if "username" in mqtt_cfg and mqtt_cfg.get("username"): - # client.username_pw_set(mqtt_cfg["username"], mqtt_cfg.get("password")) - # client.connect(mqtt_cfg.get("host", "localhost"), int(mqtt_cfg.get("port", 1883)), 60) - # client.publish( - # topic=topic, - # payload=payload, - # qos=int(mqtt_cfg.get("qos", 1)), - # retain=bool(mqtt_cfg.get("retain", False)), - # ) - # client.disconnect() - return - -def replace_nan(sample, config_file_path: Path): - with config_file_path.open("r", encoding="utf-8") as f: - cfg = yaml.safe_load(f) - - fallback_list = cfg.get("fallback", []) - fallback_map = {} - for item in fallback_list: - if isinstance(item, dict): - fallback_map.update(item) - - if sample.empty: - return False, sample - - nan_ratio = sample.isna().mean() - valid = nan_ratio <= 0.5 - - if valid and fallback_map: - sample = sample.fillna(value=fallback_map) - - - return valid, sample - -def sample_to_numpy(sample, drop_cols=("_Id", "start_time")): - if isinstance(sample, pd.Series): - sample = sample.drop(labels=list(drop_cols), errors="ignore") - return sample.to_numpy() - - if isinstance(sample, pd.DataFrame): - sample = sample.drop(columns=list(drop_cols), errors="ignore") - return sample.to_numpy() - - return np.asarray(sample) - -def scale_sample(sample, use_scaling=False, scaler_path=None): - if not use_scaling or scaler_path is None: - return sample - scaler_path = Path(scaler_path) - if not scaler_path.is_absolute(): - scaler_path = Path.cwd() / scaler_path - scaler_path = scaler_path.resolve() - normalizer = _load_serialized(scaler_path) - - # normalizer format from model_training/tools/scaler.py: - # {"scalers": {...}, "method": "...", "scope": "..."} - scalers = normalizer.get("scalers", {}) if isinstance(normalizer, dict) else {} - scope = normalizer.get("scope", "global") if isinstance(normalizer, dict) else "global" - if scope == "global": - scaler = scalers.get("global") - else: - scaler = scalers.get("global", next(iter(scalers.values()), None)) - - # Optional fallback if the stored object is already a raw scaler. - if scaler is None and hasattr(normalizer, "transform"): - scaler = normalizer - if scaler is None or not hasattr(scaler, "transform"): - return sample - - df = sample.to_frame().T if isinstance(sample, pd.Series) else sample.copy() - feature_names = getattr(scaler, "feature_names_in_", None) - if feature_names is None: - return sample - - # Keep columns not in the normalizer unchanged. - cols_to_scale = [c for c in df.columns if c in set(feature_names)] - if cols_to_scale: - df.loc[:, cols_to_scale] = scaler.transform(df.loc[:, cols_to_scale]) - - return df.iloc[0] if isinstance(sample, pd.Series) else df - -def main(): - config_file_path = Path("predict_pipeline/config.yaml") - with config_file_path.open("r", encoding="utf-8") as f: - cfg = yaml.safe_load(f) - - database_path = cfg["database"]["path"] - table_name = cfg["database"]["table"] - row_key = cfg["database"]["key"] - - - sample = getLastEntryFromSQLite(database_path, table_name, row_key) - valid, sample = replace_nan(sample, config_file_path=config_file_path) - - if not valid: - print("Sample invalid: more than 50% NaN.") - message = buildMessage(valid, None, config_file_path, sample=sample) - sendMessage(config_file_path, message) - return - - model_path = cfg["model"]["path"] - scaler_path = cfg["scaler"]["path"] - use_scaling = cfg["scaler"]["use_scaling"] - - sample = scale_sample(sample, use_scaling=use_scaling, scaler_path=scaler_path) - sample_np = sample_to_numpy(sample) - - prediction = callModel(model_path=model_path, sample=sample_np) - - message = buildMessage(valid, prediction, config_file_path, sample=sample) - sendMessage(config_file_path, message) - - -if __name__ == "__main__": - main() - - - +# Imports +import pandas as pd +import json +from pathlib import Path +import numpy as np +import sys +import yaml +import pickle +sys.path.append('/home/edgekit/MSY_FS/fahrsimulator_msy2526_ai/tools') +# sys.path.append(r"c:\\repo\\Fahrsimulator_MSY2526_AI\\tools") +import db_helpers +import joblib + +def _load_serialized(path: Path): + suffix = path.suffix.lower() + if suffix == ".pkl": + with path.open("rb") as f: + return pickle.load(f) + if suffix == ".joblib": + return joblib.load(path) + raise ValueError(f"Unsupported file format: {suffix}. Use .pkl or .joblib.") + +def getLastEntryFromSQLite(path, table_name, key="_Id"): + conn, cursor = db_helpers.connect_db(path) + try: + row_df = db_helpers.get_data_from_table( + conn=conn, + table_name=table_name, + order_by={key: "DESC"}, + limit=1, + ) + finally: + db_helpers.disconnect_db(conn, cursor, commit=False) + + if row_df.empty: + return pd.Series(dtype="object") + + return row_df.iloc[0] + +def callModel(sample, model_path): + if callable(sample): + raise TypeError( + f"Invalid sample type: got callable `{getattr(sample, '__name__', type(sample).__name__)}`. " + "Expected numpy array / pandas row." + ) + + model_path = Path(model_path) + if not model_path.is_absolute(): + model_path = Path.cwd() / model_path + model_path = model_path.resolve() + + suffix = model_path.suffix.lower() + if suffix in {".pkl", ".joblib"}: + model = _load_serialized(model_path) + # elif suffix == ".keras": + # import tensorflow as tf + # model = tf.keras.models.load_model(model_path) + # else: + # raise ValueError(f"Unsupported model format: {suffix}. Use .pkl, .joblib, or .keras.") + + x = np.asarray(sample, dtype=np.float32) + if x.ndim == 1: + x = x.reshape(1, -1) + + if suffix == ".keras": + x_full = x + # Future model (35 features): keep this call when your new model is active. + # prediction = model.predict(x_full[:, :35], verbose=0) + prediction = model.predict(x_full[:, :20], verbose=0) + + else: + if hasattr(model, "predict"): + prediction = model.predict(x[:,:20]) + elif callable(model): + prediction = model(x[:,:20]) + else: + raise TypeError("Loaded model has no .predict(...) and is not callable.") + + prediction = np.asarray(prediction) + if prediction.size == 1: + return prediction.item() + return prediction.squeeze() + +def buildMessage(valid, result: np.int32, config_file_path, sample=None): + with Path(config_file_path).open("r", encoding="utf-8") as f: + cfg = yaml.safe_load(f) + + mqtt_cfg = cfg.get("mqtt", {}) + result_key = mqtt_cfg.get("publish_format", {}).get("result_key", "prediction") + + sample_id = None + if isinstance(sample, pd.Series): + sample_id = sample.get("_Id", sample.get("_id")) + elif isinstance(sample, dict): + sample_id = sample.get("_Id", sample.get("_id")) + + message = { + "valid": bool(valid), + "_id": sample_id, + result_key: np.asarray(result).tolist() if isinstance(result, np.ndarray) else result, + } + return message + +def convert_int64(obj): + if isinstance(obj, np.int64): + return int(obj) + # If the object is a dictionary or list, recursively convert its values + elif isinstance(obj, dict): + return {key: convert_int64(value) for key, value in obj.items()} + elif isinstance(obj, list): + return [convert_int64(item) for item in obj] + return obj + +def sendMessage(config_file_path, message): + # Load the configuration + with Path(config_file_path).open("r", encoding="utf-8") as f: + cfg = yaml.safe_load(f) + + # Get MQTT configuration + mqtt_cfg = cfg.get("mqtt", {}) + topic = mqtt_cfg.get("topic", "ml/predictions") + + # Convert message to ensure no np.int64 values remain + message = convert_int64(message) + + # Serialize the message to JSON + payload = json.dumps(message, ensure_ascii=False) + print(payload) + + # Later: publish via MQTT using config parameters above. + # Example (kept commented intentionally): + # import paho.mqtt.client as mqtt + # client = mqtt.Client(client_id=mqtt_cfg.get("client_id", "predictor-01")) + # if "username" in mqtt_cfg and mqtt_cfg.get("username"): + # client.username_pw_set(mqtt_cfg["username"], mqtt_cfg.get("password")) + # client.connect(mqtt_cfg.get("host", "localhost"), int(mqtt_cfg.get("port", 1883)), 60) + # client.publish( + # topic=topic, + # payload=payload, + # qos=int(mqtt_cfg.get("qos", 1)), + # retain=bool(mqtt_cfg.get("retain", False)), + # ) + # client.disconnect() + return + +def replace_nan(sample, config_file_path: Path): + with config_file_path.open("r", encoding="utf-8") as f: + cfg = yaml.safe_load(f) + + fallback_list = cfg.get("fallback", []) + fallback_map = {} + for item in fallback_list: + if isinstance(item, dict): + fallback_map.update(item) + + if sample.empty: + return False, sample + + nan_ratio = sample.isna().mean() + valid = nan_ratio <= 0.5 + + if valid and fallback_map: + sample = sample.fillna(value=fallback_map) + + + return valid, sample + +def sample_to_numpy(sample, drop_cols=("_Id", "start_time")): + if isinstance(sample, pd.Series): + sample = sample.drop(labels=list(drop_cols), errors="ignore") + return sample.to_numpy() + + if isinstance(sample, pd.DataFrame): + sample = sample.drop(columns=list(drop_cols), errors="ignore") + return sample.to_numpy() + + return np.asarray(sample) + +def scale_sample(sample, use_scaling=False, scaler_path=None): + if not use_scaling or scaler_path is None: + return sample + scaler_path = Path(scaler_path) + if not scaler_path.is_absolute(): + scaler_path = Path.cwd() / scaler_path + scaler_path = scaler_path.resolve() + normalizer = _load_serialized(scaler_path) + + # normalizer format from model_training/tools/scaler.py: + # {"scalers": {...}, "method": "...", "scope": "..."} + scalers = normalizer.get("scalers", {}) if isinstance(normalizer, dict) else {} + scope = normalizer.get("scope", "global") if isinstance(normalizer, dict) else "global" + if scope == "global": + scaler = scalers.get("global") + else: + scaler = scalers.get("global", next(iter(scalers.values()), None)) + + # Optional fallback if the stored object is already a raw scaler. + if scaler is None and hasattr(normalizer, "transform"): + scaler = normalizer + if scaler is None or not hasattr(scaler, "transform"): + return sample + + df = sample.to_frame().T if isinstance(sample, pd.Series) else sample.copy() + feature_names = getattr(scaler, "feature_names_in_", None) + if feature_names is None: + return sample + + # Keep columns not in the normalizer unchanged. + cols_to_scale = [c for c in df.columns if c in set(feature_names)] + if cols_to_scale: + df.loc[:, cols_to_scale] = scaler.transform(df.loc[:, cols_to_scale]) + + return df.iloc[0] if isinstance(sample, pd.Series) else df + +def main(): + pd.set_option('future.no_silent_downcasting', True) # kann ggf raus + + config_file_path = Path("/home/edgekit/MSY_FS/fahrsimulator_msy2526_ai/predict_pipeline/config.yaml") + with config_file_path.open("r", encoding="utf-8") as f: + cfg = yaml.safe_load(f) + + database_path = cfg["database"]["path"] + table_name = cfg["database"]["table"] + row_key = cfg["database"]["key"] + + + sample = getLastEntryFromSQLite(database_path, table_name, row_key) + valid, sample = replace_nan(sample, config_file_path=config_file_path) + + if not valid: + print("Sample invalid: more than 50% NaN.") + message = buildMessage(valid, None, config_file_path, sample=sample) + sendMessage(config_file_path, message) + return + + model_path = cfg["model"]["path"] + scaler_path = cfg["scaler"]["path"] + use_scaling = cfg["scaler"]["use_scaling"] + + sample = scale_sample(sample, use_scaling=use_scaling, scaler_path=scaler_path) + sample_np = sample_to_numpy(sample) + + prediction = callModel(model_path=model_path, sample=sample_np) + + message = buildMessage(valid, prediction, config_file_path, sample=sample) + sendMessage(config_file_path, message) + + +if __name__ == "__main__": + main() + + +