import json import logging import os import secrets import time from logging.handlers import RotatingFileHandler from pathlib import Path from fastapi import FastAPI, HTTPException, Depends, Request from fastapi.responses import JSONResponse, StreamingResponse from sqlalchemy.orm import Session from database import get_db import crud import httpx _log_dir = Path(os.getenv("LOG_FILE", "logs/usage.log")).parent _log_dir.mkdir(parents=True, exist_ok=True) _fmt = logging.Formatter("%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S") # Rotating usage log (8 KB per file, 3 backups) _usage_handler = RotatingFileHandler(str(_log_dir / "usage.log"), maxBytes=8192, backupCount=3, encoding="utf-8") _usage_handler.setFormatter(_fmt) usage_log = logging.getLogger("proxy.usage") usage_log.setLevel(logging.INFO) usage_log.addHandler(_usage_handler) usage_log.propagate = False # Rotating error log (64 KB per file, 5 backups) _error_handler = RotatingFileHandler(str(_log_dir / "error.log"), maxBytes=65536, backupCount=5, encoding="utf-8") _error_handler.setFormatter(_fmt) error_log = logging.getLogger("proxy.error") error_log.setLevel(logging.ERROR) error_log.addHandler(_error_handler) error_log.propagate = False def _content_to_str(content) -> str: """Normalize OpenAI content: string or array of content parts → plain string.""" if isinstance(content, list): return " ".join( part.get("text", "") if isinstance(part, dict) else str(part) for part in content ) return content or "" def _last_user_msg(messages: list, max_len: int = 120) -> str: for msg in reversed(messages): if msg.get("role") == "user": text = _content_to_str(msg.get("content")).replace("\n", " ").strip() return text[:max_len] + ("…" if len(text) > max_len else "") return "" async def require_api_key(request: Request, db: Session = Depends(get_db)): auth_header = request.headers.get("Authorization", "") x_api_key = request.headers.get("x-api-key", "") auth_token = request.headers.get("anthropic-auth-token", "") if auth_header.startswith("Bearer "): api_key = auth_header[7:] elif auth_header.startswith("sk-"): api_key = auth_header elif x_api_key: api_key = x_api_key elif auth_token: api_key = auth_token else: raise HTTPException(status_code=401, detail="Invalid or missing API key") db_key = crud.verify_api_key(db, api_key) if not db_key: raise HTTPException(status_code=401, detail="Invalid API key") request.state.api_key_id = db_key.id request.state.api_key_name = db_key.name app = FastAPI(title="Ollama Proxy", dependencies=[Depends(require_api_key)]) @app.on_event("startup") def apply_env_settings(): """Write env-configured values into DB so they take effect until next restart.""" db = next(get_db()) try: if url := os.getenv("OLLAMA_URL"): crud.set_setting(db, "ollama_url", url) db.commit() finally: db.close() @app.exception_handler(Exception) async def unhandled_exception_handler(request: Request, exc: Exception): error_log.error("Unhandled exception | %s %s | %s: %s", request.method, request.url.path, type(exc).__name__, exc, exc_info=exc) return JSONResponse(status_code=500, content={"error": {"message": "Internal server error", "type": "server_error"}}) def _backend_headers() -> dict: key = os.getenv("BACKEND_API_KEY") return {"Authorization": f"Bearer {key}"} if key else {} async def proxy_request(url: str, method: str = "GET", json_data: dict = None): async with httpx.AsyncClient(timeout=300.0) as client: response = await client.request(method=method, url=url, json=json_data, headers=_backend_headers()) return response @app.post("/api/generate") async def generate(request: Request, db: Session = Depends(get_db)): ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434")) body = await request.json() force_model = crud.get_setting(db, "force_model") or None if force_model: body = {**body, "model": force_model} if not body.get("model"): raise HTTPException(status_code=422, detail="Field 'model' is required") prompt_tokens = crud.count_tokens(body.get("prompt", "")) if not crud.check_and_increment_quota(db, request.state.api_key_id, tokens=prompt_tokens, requests=1): raise HTTPException(status_code=429, detail="Quota exceeded") prompt_preview = (body.get("prompt", "").replace("\n", " ").strip())[:120] usage_log.info('%s | /api/generate | %s | ~%d tokens | "%s"', request.state.api_key_name, body.get("model", "?"), prompt_tokens, prompt_preview) start = time.monotonic() try: response = await proxy_request(f"{ollama_url}/api/generate", method="POST", json_data=body) resp_json = response.json() usage_log.info('%s | /api/generate | %s | actual ↑%d ↓%d tokens | %.1fs', request.state.api_key_name, body.get("model", "?"), resp_json.get("prompt_eval_count", 0), resp_json.get("eval_count", 0), time.monotonic() - start) return JSONResponse(content=resp_json, status_code=response.status_code) except Exception as exc: error_log.error("Proxy error | %s | /api/generate | %s | %s: %s", request.state.api_key_name, body.get("model", "?"), type(exc).__name__, exc, exc_info=exc) raise @app.post("/api/chat") async def chat(request: Request, db: Session = Depends(get_db)): ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434")) body = await request.json() force_model = crud.get_setting(db, "force_model") or None if force_model: body = {**body, "model": force_model} if not body.get("model"): raise HTTPException(status_code=422, detail="Field 'model' is required") messages = body.get("messages", []) prompt_tokens = sum(crud.count_tokens(_content_to_str(msg.get("content"))) for msg in messages) if not crud.check_and_increment_quota(db, request.state.api_key_id, tokens=prompt_tokens, requests=1): raise HTTPException(status_code=429, detail="Quota exceeded") usage_log.info('%s | /api/chat | %s | ~%d tokens | "%s"', request.state.api_key_name, body.get("model", "?"), prompt_tokens, _last_user_msg(messages)) start = time.monotonic() try: response = await proxy_request(f"{ollama_url}/api/chat", method="POST", json_data=body) resp_json = response.json() usage_log.info('%s | /api/chat | %s | actual ↑%d ↓%d tokens | %.1fs', request.state.api_key_name, body.get("model", "?"), resp_json.get("prompt_eval_count", 0), resp_json.get("eval_count", 0), time.monotonic() - start) return JSONResponse(content=resp_json, status_code=response.status_code) except Exception as exc: error_log.error("Proxy error | %s | /api/chat | %s | %s: %s", request.state.api_key_name, body.get("model", "?"), type(exc).__name__, exc, exc_info=exc) raise @app.get("/api/tags") async def list_models(db: Session = Depends(get_db)): ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434")) response = await proxy_request(f"{ollama_url}/api/tags", method="GET") return JSONResponse(content=response.json(), status_code=response.status_code) @app.get("/version") async def version(): return {"version": os.getenv("APP_VERSION", "dev")} @app.get("/api/ps") async def running_models(db: Session = Depends(get_db)): ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434")) response = await proxy_request(f"{ollama_url}/api/ps", method="GET") return JSONResponse(content=response.json(), status_code=response.status_code) @app.get("/api/versions") async def versions(db: Session = Depends(get_db)): ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434")) response = await proxy_request(f"{ollama_url}/api/versions", method="GET") return JSONResponse(content=response.json(), status_code=response.status_code) # --- Anthropic Messages API compatibility layer --- def _anthropic_content_to_str(content) -> str: """Flatten Anthropic content (string or block array) to a plain string.""" if isinstance(content, str): return content if isinstance(content, list): parts = [] for block in content: if not isinstance(block, dict): continue if block.get("type") == "text": parts.append(block.get("text", "")) elif block.get("type") == "tool_result": raw = block.get("content", "") if isinstance(raw, list): raw = " ".join(r.get("text", "") for r in raw if isinstance(r, dict) and r.get("type") == "text") parts.append(str(raw)) return " ".join(parts) return str(content) if content else "" def _anthropic_messages_to_ollama(messages: list, system: str = None) -> list: """Transform Anthropic messages array to Ollama /api/chat format.""" result = [] if system: result.append({"role": "system", "content": system}) for msg in messages: role = msg.get("role") content = msg.get("content") if role == "assistant" and isinstance(content, list): text = " ".join(b.get("text", "") for b in content if isinstance(b, dict) and b.get("type") == "text") tool_calls = [ {"function": {"name": b["name"], "arguments": b.get("input", {})}} for b in content if isinstance(b, dict) and b.get("type") == "tool_use" ] entry = {"role": "assistant", "content": text} if tool_calls: entry["tool_calls"] = tool_calls result.append(entry) elif role == "user" and isinstance(content, list): text_parts = [] for block in content: if not isinstance(block, dict): continue if block.get("type") == "tool_result": if text_parts: result.append({"role": "user", "content": " ".join(text_parts)}) text_parts = [] raw = block.get("content", "") if isinstance(raw, list): raw = " ".join(r.get("text", "") for r in raw if isinstance(r, dict) and r.get("type") == "text") result.append({"role": "tool", "content": str(raw)}) elif block.get("type") == "text": text_parts.append(block.get("text", "")) if text_parts: result.append({"role": "user", "content": " ".join(text_parts)}) else: result.append({"role": role, "content": _anthropic_content_to_str(content)}) return result def _anthropic_tools_to_ollama(tools: list) -> list: """Transform Anthropic tools to Ollama/OpenAI function format.""" return [ { "type": "function", "function": { "name": t["name"], "description": t.get("description", ""), "parameters": t.get("input_schema", {}), }, } for t in tools ] def _ollama_to_anthropic_response(ollama_resp: dict, model_name: str, msg_id: str) -> dict: """Transform an Ollama /api/chat response to Anthropic Messages API format.""" msg = ollama_resp.get("message", {}) text = msg.get("content", "") tool_calls = msg.get("tool_calls") or [] content_blocks = [] if text: content_blocks.append({"type": "text", "text": text}) stop_reason = "end_turn" for i, tc in enumerate(tool_calls): stop_reason = "tool_use" fn = tc.get("function", {}) args = fn.get("arguments", {}) if isinstance(args, str): try: args = json.loads(args) except json.JSONDecodeError: args = {} content_blocks.append({ "type": "tool_use", "id": f"toolu_{msg_id}_{i}", "name": fn.get("name", ""), "input": args, }) return { "id": f"msg_{msg_id}", "type": "message", "role": "assistant", "content": content_blocks, "model": model_name, "stop_reason": stop_reason, "stop_sequence": None, "usage": { "input_tokens": ollama_resp.get("prompt_eval_count", 0), "output_tokens": ollama_resp.get("eval_count", 0), }, } @app.post("/v1/messages") async def anthropic_messages(request: Request, db: Session = Depends(get_db)): ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434")) body = await request.json() force_model = crud.get_setting(db, "force_model") or None model_name = force_model or os.getenv("ANTHROPIC_DEFAULT_MODEL") or body.get("model") if not model_name: raise HTTPException(status_code=422, detail="Field 'model' is required") anthropic_msgs = body.get("messages", []) system = body.get("system") system_str = _anthropic_content_to_str(system) if system else "" all_text = system_str + " ".join(_anthropic_content_to_str(m.get("content")) for m in anthropic_msgs) prompt_tokens = crud.count_tokens(all_text) if not crud.check_and_increment_quota(db, request.state.api_key_id, tokens=prompt_tokens, requests=1): raise HTTPException(status_code=429, detail="Quota exceeded") ollama_messages = _anthropic_messages_to_ollama(anthropic_msgs, system=system_str) ollama_body: dict = {"model": model_name, "messages": ollama_messages, "stream": body.get("stream", False)} if tools := body.get("tools"): ollama_body["tools"] = _anthropic_tools_to_ollama(tools) msg_id = secrets.token_hex(12) target = f"{ollama_url}/api/chat" usage_log.info('%s | /v1/messages | %s | ~%d tokens | "%s"', request.state.api_key_name, model_name, prompt_tokens, _last_user_msg(ollama_messages)) start = time.monotonic() if body.get("stream"): # Backend wird immer non-streaming aufgerufen; der Dev-Proxy baut SSE selbst auf. # Das ist nötig, weil vorgelagerte Proxys (z.B. Produktiv-Proxy) /api/chat # nur non-streaming exponieren. non_stream_body = {**ollama_body, "stream": False} async def generate(): try: response = await proxy_request(target, method="POST", json_data=non_stream_body) ollama_resp = response.json() except Exception as exc: error_log.error("Stream error | %s | /v1/messages | %s | %s: %s", request.state.api_key_name, model_name, type(exc).__name__, exc, exc_info=exc) raise msg = ollama_resp.get("message", {}) text = msg.get("content", "") input_tokens = ollama_resp.get("prompt_eval_count", 0) output_tokens = ollama_resp.get("eval_count", 0) yield f"event: message_start\ndata: {json.dumps({'type': 'message_start', 'message': {'id': f'msg_{msg_id}', 'type': 'message', 'role': 'assistant', 'content': [], 'model': model_name, 'stop_reason': None, 'stop_sequence': None, 'usage': {'input_tokens': input_tokens, 'output_tokens': 0}}})}\n\n" yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n" yield f"event: ping\ndata: {json.dumps({'type': 'ping'})}\n\n" if text: yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': text}})}\n\n" yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n" yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None}, 'usage': {'output_tokens': output_tokens}})}\n\n" yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" usage_log.info('%s | /v1/messages | %s | actual ↑%d ↓%d tokens | %.1fs', request.state.api_key_name, model_name, input_tokens, output_tokens, time.monotonic() - start) return StreamingResponse( generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) try: response = await proxy_request(target, method="POST", json_data=ollama_body) result = _ollama_to_anthropic_response(response.json(), model_name, msg_id) usage_log.info('%s | /v1/messages | %s | actual ↑%d ↓%d tokens | %.1fs', request.state.api_key_name, model_name, result["usage"]["input_tokens"], result["usage"]["output_tokens"], time.monotonic() - start) return JSONResponse(content=result, status_code=response.status_code) except Exception as exc: error_log.error("Proxy error | %s | /v1/messages | %s | %s: %s", request.state.api_key_name, model_name, type(exc).__name__, exc, exc_info=exc) raise @app.get("/v1/models") async def list_openai_models(db: Session = Depends(get_db)): ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434")) response = await proxy_request(f"{ollama_url}/v1/models", method="GET") return JSONResponse(content=response.json(), status_code=response.status_code) @app.post("/v1/chat/completions") async def openai_chat_completions(request: Request, db: Session = Depends(get_db)): ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434")) body = await request.json() force_model = crud.get_setting(db, "force_model") or None if force_model: body = {**body, "model": force_model} if not body.get("model"): raise HTTPException(status_code=422, detail="Field 'model' is required") messages = body.get("messages", []) prompt_tokens = sum(crud.count_tokens(_content_to_str(msg.get("content"))) for msg in messages) if not crud.check_and_increment_quota(db, request.state.api_key_id, tokens=prompt_tokens, requests=1): raise HTTPException(status_code=429, detail="Quota exceeded") model_name = body["model"] usage_log.info('%s | /v1/chat/completions | %s | ~%d tokens | "%s"', request.state.api_key_name, model_name, prompt_tokens, _last_user_msg(messages)) target = f"{ollama_url}/v1/chat/completions" if body.get("stream"): existing_opts = body.get("stream_options") or {} stream_body = {**body, "stream_options": {**existing_opts, "include_usage": True}} start = time.monotonic() usage_tokens = {"prompt": 0, "completion": 0} async def generate(): try: async with httpx.AsyncClient(timeout=300.0) as client: async with client.stream("POST", target, json=stream_body, headers=_backend_headers()) as resp: async for chunk in resp.aiter_bytes(): try: for line in chunk.decode("utf-8", errors="ignore").splitlines(): if line.startswith("data: ") and "[DONE]" not in line: data = json.loads(line[6:]) if u := data.get("usage"): usage_tokens["prompt"] = u.get("prompt_tokens", 0) usage_tokens["completion"] = u.get("completion_tokens", 0) except Exception: pass yield chunk except Exception as exc: error_log.error("Stream error | %s | /v1/chat/completions | %s | %s: %s", request.state.api_key_name, model_name, type(exc).__name__, exc, exc_info=exc) raise finally: usage_log.info('%s | /v1/chat/completions | %s | actual ↑%d ↓%d tokens | %.1fs', request.state.api_key_name, model_name, usage_tokens["prompt"], usage_tokens["completion"], time.monotonic() - start) return StreamingResponse( generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) start = time.monotonic() try: response = await proxy_request(target, method="POST", json_data=body) resp_json = response.json() usage = resp_json.get("usage", {}) usage_log.info('%s | /v1/chat/completions | %s | actual ↑%d ↓%d tokens | %.1fs', request.state.api_key_name, model_name, usage.get("prompt_tokens", 0), usage.get("completion_tokens", 0), time.monotonic() - start) return JSONResponse(content=resp_json, status_code=response.status_code) except Exception as exc: error_log.error("Proxy error | %s | /v1/chat/completions | %s | %s: %s", request.state.api_key_name, model_name, type(exc).__name__, exc, exc_info=exc) raise