# Imports import pandas as pd import json from pathlib import Path import numpy as np import sys import yaml import pickle sys.path.append('/home/edgekit/MSY_FS/fahrsimulator_msy2526_ai/tools') # sys.path.append(r"c:\\repo\\Fahrsimulator_MSY2526_AI\\tools") import db_helpers import joblib def _load_serialized(path: Path): suffix = path.suffix.lower() if suffix == ".pkl": with path.open("rb") as f: return pickle.load(f) if suffix == ".joblib": return joblib.load(path) raise ValueError(f"Unsupported file format: {suffix}. Use .pkl or .joblib.") def getLastEntryFromSQLite(path, table_name, key="_Id"): conn, cursor = db_helpers.connect_db(path) try: row_df = db_helpers.get_data_from_table( conn=conn, table_name=table_name, order_by={key: "DESC"}, limit=1, ) finally: db_helpers.disconnect_db(conn, cursor, commit=False) if row_df.empty: return pd.Series(dtype="object") return row_df.iloc[0] def callModel(sample, model_path): if callable(sample): raise TypeError( f"Invalid sample type: got callable `{getattr(sample, '__name__', type(sample).__name__)}`. " "Expected numpy array / pandas row." ) model_path = Path(model_path) if not model_path.is_absolute(): model_path = Path.cwd() / model_path model_path = model_path.resolve() suffix = model_path.suffix.lower() if suffix in {".pkl", ".joblib"}: model = _load_serialized(model_path) # elif suffix == ".keras": # import tensorflow as tf # model = tf.keras.models.load_model(model_path) # else: # raise ValueError(f"Unsupported model format: {suffix}. Use .pkl, .joblib, or .keras.") x = np.asarray(sample, dtype=np.float32) if x.ndim == 1: x = x.reshape(1, -1) if suffix == ".keras": x_full = x # Future model (35 features): keep this call when your new model is active. # prediction = model.predict(x_full[:, :35], verbose=0) prediction = model.predict(x_full[:, :20], verbose=0) else: if hasattr(model, "predict"): prediction = model.predict(x[:,:20]) elif callable(model): prediction = model(x[:,:20]) else: raise TypeError("Loaded model has no .predict(...) and is not callable.") prediction = np.asarray(prediction) if prediction.size == 1: return prediction.item() return prediction.squeeze() def buildMessage(valid, result: np.int32, config_file_path, sample=None): with Path(config_file_path).open("r", encoding="utf-8") as f: cfg = yaml.safe_load(f) mqtt_cfg = cfg.get("mqtt", {}) result_key = mqtt_cfg.get("publish_format", {}).get("result_key", "prediction") sample_id = None if isinstance(sample, pd.Series): sample_id = sample.get("_Id", sample.get("_id")) elif isinstance(sample, dict): sample_id = sample.get("_Id", sample.get("_id")) message = { "valid": bool(valid), "_id": sample_id, result_key: np.asarray(result).tolist() if isinstance(result, np.ndarray) else result, } return message def convert_int64(obj): if isinstance(obj, np.int64): return int(obj) # If the object is a dictionary or list, recursively convert its values elif isinstance(obj, dict): return {key: convert_int64(value) for key, value in obj.items()} elif isinstance(obj, list): return [convert_int64(item) for item in obj] return obj def sendMessage(config_file_path, message): # Load the configuration with Path(config_file_path).open("r", encoding="utf-8") as f: cfg = yaml.safe_load(f) # Get MQTT configuration mqtt_cfg = cfg.get("mqtt", {}) topic = mqtt_cfg.get("topic", "ml/predictions") # Convert message to ensure no np.int64 values remain message = convert_int64(message) # Serialize the message to JSON payload = json.dumps(message, ensure_ascii=False) print(payload) # Later: publish via MQTT using config parameters above. # Example (kept commented intentionally): # import paho.mqtt.client as mqtt # client = mqtt.Client(client_id=mqtt_cfg.get("client_id", "predictor-01")) # if "username" in mqtt_cfg and mqtt_cfg.get("username"): # client.username_pw_set(mqtt_cfg["username"], mqtt_cfg.get("password")) # client.connect(mqtt_cfg.get("host", "localhost"), int(mqtt_cfg.get("port", 1883)), 60) # client.publish( # topic=topic, # payload=payload, # qos=int(mqtt_cfg.get("qos", 1)), # retain=bool(mqtt_cfg.get("retain", False)), # ) # client.disconnect() return def replace_nan(sample, config_file_path: Path): with config_file_path.open("r", encoding="utf-8") as f: cfg = yaml.safe_load(f) fallback_list = cfg.get("fallback", []) fallback_map = {} for item in fallback_list: if isinstance(item, dict): fallback_map.update(item) if sample.empty: return False, sample nan_ratio = sample.isna().mean() valid = nan_ratio <= 0.5 if valid and fallback_map: sample = sample.fillna(value=fallback_map) return valid, sample def sample_to_numpy(sample, drop_cols=("_Id", "start_time")): if isinstance(sample, pd.Series): sample = sample.drop(labels=list(drop_cols), errors="ignore") return sample.to_numpy() if isinstance(sample, pd.DataFrame): sample = sample.drop(columns=list(drop_cols), errors="ignore") return sample.to_numpy() return np.asarray(sample) def scale_sample(sample, use_scaling=False, scaler_path=None): if not use_scaling or scaler_path is None: return sample scaler_path = Path(scaler_path) if not scaler_path.is_absolute(): scaler_path = Path.cwd() / scaler_path scaler_path = scaler_path.resolve() normalizer = _load_serialized(scaler_path) # normalizer format from model_training/tools/scaler.py: # {"scalers": {...}, "method": "...", "scope": "..."} scalers = normalizer.get("scalers", {}) if isinstance(normalizer, dict) else {} scope = normalizer.get("scope", "global") if isinstance(normalizer, dict) else "global" if scope == "global": scaler = scalers.get("global") else: scaler = scalers.get("global", next(iter(scalers.values()), None)) # Optional fallback if the stored object is already a raw scaler. if scaler is None and hasattr(normalizer, "transform"): scaler = normalizer if scaler is None or not hasattr(scaler, "transform"): return sample df = sample.to_frame().T if isinstance(sample, pd.Series) else sample.copy() feature_names = getattr(scaler, "feature_names_in_", None) if feature_names is None: return sample # Keep columns not in the normalizer unchanged. cols_to_scale = [c for c in df.columns if c in set(feature_names)] if cols_to_scale: df.loc[:, cols_to_scale] = scaler.transform(df.loc[:, cols_to_scale]) return df.iloc[0] if isinstance(sample, pd.Series) else df def main(): pd.set_option('future.no_silent_downcasting', True) # kann ggf raus config_file_path = Path("/home/edgekit/MSY_FS/fahrsimulator_msy2526_ai/predict_pipeline/config.yaml") with config_file_path.open("r", encoding="utf-8") as f: cfg = yaml.safe_load(f) database_path = cfg["database"]["path"] table_name = cfg["database"]["table"] row_key = cfg["database"]["key"] sample = getLastEntryFromSQLite(database_path, table_name, row_key) valid, sample = replace_nan(sample, config_file_path=config_file_path) if not valid: print("Sample invalid: more than 50% NaN.") message = buildMessage(valid, None, config_file_path, sample=sample) sendMessage(config_file_path, message) return model_path = cfg["model"]["path"] scaler_path = cfg["scaler"]["path"] use_scaling = cfg["scaler"]["use_scaling"] sample = scale_sample(sample, use_scaling=use_scaling, scaler_path=scaler_path) sample_np = sample_to_numpy(sample) prediction = callModel(model_path=model_path, sample=sample_np) message = buildMessage(valid, prediction, config_file_path, sample=sample) sendMessage(config_file_path, message) if __name__ == "__main__": main()