- POST /v1/messages endpoint with full quota enforcement and auth - Accepts x-api-key and anthropic-auth-token headers (for Claude Code) - Transforms Anthropic request/response format ↔ Ollama /api/chat - Streaming support via Anthropic SSE format - Tool use support (request and response transformation) - ANTHROPIC_DEFAULT_MODEL env var for model selection without admin UI - BACKEND_API_KEY env var for forwarding auth to upstream proxies - Fix SQLite path always resolved relative to database.py location - start.sh and start_claude.sh load .env relative to script location
470 lines
22 KiB
Python
470 lines
22 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import secrets
|
|
import time
|
|
from logging.handlers import RotatingFileHandler
|
|
from pathlib import Path
|
|
|
|
from fastapi import FastAPI, HTTPException, Depends, Request
|
|
from fastapi.responses import JSONResponse, StreamingResponse
|
|
from sqlalchemy.orm import Session
|
|
from database import get_db
|
|
import crud
|
|
import httpx
|
|
|
|
_log_dir = Path(os.getenv("LOG_FILE", "logs/usage.log")).parent
|
|
_log_dir.mkdir(parents=True, exist_ok=True)
|
|
_fmt = logging.Formatter("%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
|
|
|
|
# Rotating usage log (8 KB per file, 3 backups)
|
|
_usage_handler = RotatingFileHandler(str(_log_dir / "usage.log"), maxBytes=8192, backupCount=3, encoding="utf-8")
|
|
_usage_handler.setFormatter(_fmt)
|
|
usage_log = logging.getLogger("proxy.usage")
|
|
usage_log.setLevel(logging.INFO)
|
|
usage_log.addHandler(_usage_handler)
|
|
usage_log.propagate = False
|
|
|
|
# Rotating error log (64 KB per file, 5 backups)
|
|
_error_handler = RotatingFileHandler(str(_log_dir / "error.log"), maxBytes=65536, backupCount=5, encoding="utf-8")
|
|
_error_handler.setFormatter(_fmt)
|
|
error_log = logging.getLogger("proxy.error")
|
|
error_log.setLevel(logging.ERROR)
|
|
error_log.addHandler(_error_handler)
|
|
error_log.propagate = False
|
|
|
|
def _content_to_str(content) -> str:
|
|
"""Normalize OpenAI content: string or array of content parts → plain string."""
|
|
if isinstance(content, list):
|
|
return " ".join(
|
|
part.get("text", "") if isinstance(part, dict) else str(part)
|
|
for part in content
|
|
)
|
|
return content or ""
|
|
|
|
|
|
def _last_user_msg(messages: list, max_len: int = 120) -> str:
|
|
for msg in reversed(messages):
|
|
if msg.get("role") == "user":
|
|
text = _content_to_str(msg.get("content")).replace("\n", " ").strip()
|
|
return text[:max_len] + ("…" if len(text) > max_len else "")
|
|
return ""
|
|
|
|
async def require_api_key(request: Request, db: Session = Depends(get_db)):
|
|
auth_header = request.headers.get("Authorization", "")
|
|
x_api_key = request.headers.get("x-api-key", "")
|
|
auth_token = request.headers.get("anthropic-auth-token", "")
|
|
if auth_header.startswith("Bearer "):
|
|
api_key = auth_header[7:]
|
|
elif auth_header.startswith("sk-"):
|
|
api_key = auth_header
|
|
elif x_api_key:
|
|
api_key = x_api_key
|
|
elif auth_token:
|
|
api_key = auth_token
|
|
else:
|
|
raise HTTPException(status_code=401, detail="Invalid or missing API key")
|
|
db_key = crud.verify_api_key(db, api_key)
|
|
if not db_key:
|
|
raise HTTPException(status_code=401, detail="Invalid API key")
|
|
request.state.api_key_id = db_key.id
|
|
request.state.api_key_name = db_key.name
|
|
|
|
app = FastAPI(title="Ollama Proxy", dependencies=[Depends(require_api_key)])
|
|
|
|
@app.on_event("startup")
|
|
def apply_env_settings():
|
|
"""Write env-configured values into DB so they take effect until next restart."""
|
|
db = next(get_db())
|
|
try:
|
|
if url := os.getenv("OLLAMA_URL"):
|
|
crud.set_setting(db, "ollama_url", url)
|
|
db.commit()
|
|
finally:
|
|
db.close()
|
|
|
|
@app.exception_handler(Exception)
|
|
async def unhandled_exception_handler(request: Request, exc: Exception):
|
|
error_log.error("Unhandled exception | %s %s | %s: %s",
|
|
request.method, request.url.path, type(exc).__name__, exc, exc_info=exc)
|
|
return JSONResponse(status_code=500, content={"error": {"message": "Internal server error", "type": "server_error"}})
|
|
|
|
def _backend_headers() -> dict:
|
|
key = os.getenv("BACKEND_API_KEY")
|
|
return {"Authorization": f"Bearer {key}"} if key else {}
|
|
|
|
|
|
async def proxy_request(url: str, method: str = "GET", json_data: dict = None):
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
response = await client.request(method=method, url=url, json=json_data, headers=_backend_headers())
|
|
return response
|
|
|
|
@app.post("/api/generate")
|
|
async def generate(request: Request, db: Session = Depends(get_db)):
|
|
ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434"))
|
|
body = await request.json()
|
|
force_model = crud.get_setting(db, "force_model") or None
|
|
if force_model:
|
|
body = {**body, "model": force_model}
|
|
if not body.get("model"):
|
|
raise HTTPException(status_code=422, detail="Field 'model' is required")
|
|
prompt_tokens = crud.count_tokens(body.get("prompt", ""))
|
|
|
|
if not crud.check_and_increment_quota(db, request.state.api_key_id, tokens=prompt_tokens, requests=1):
|
|
raise HTTPException(status_code=429, detail="Quota exceeded")
|
|
|
|
prompt_preview = (body.get("prompt", "").replace("\n", " ").strip())[:120]
|
|
usage_log.info('%s | /api/generate | %s | ~%d tokens | "%s"',
|
|
request.state.api_key_name, body.get("model", "?"), prompt_tokens, prompt_preview)
|
|
start = time.monotonic()
|
|
try:
|
|
response = await proxy_request(f"{ollama_url}/api/generate", method="POST", json_data=body)
|
|
resp_json = response.json()
|
|
usage_log.info('%s | /api/generate | %s | actual ↑%d ↓%d tokens | %.1fs',
|
|
request.state.api_key_name, body.get("model", "?"),
|
|
resp_json.get("prompt_eval_count", 0), resp_json.get("eval_count", 0),
|
|
time.monotonic() - start)
|
|
return JSONResponse(content=resp_json, status_code=response.status_code)
|
|
except Exception as exc:
|
|
error_log.error("Proxy error | %s | /api/generate | %s | %s: %s",
|
|
request.state.api_key_name, body.get("model", "?"), type(exc).__name__, exc, exc_info=exc)
|
|
raise
|
|
|
|
@app.post("/api/chat")
|
|
async def chat(request: Request, db: Session = Depends(get_db)):
|
|
ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434"))
|
|
body = await request.json()
|
|
force_model = crud.get_setting(db, "force_model") or None
|
|
if force_model:
|
|
body = {**body, "model": force_model}
|
|
if not body.get("model"):
|
|
raise HTTPException(status_code=422, detail="Field 'model' is required")
|
|
messages = body.get("messages", [])
|
|
prompt_tokens = sum(crud.count_tokens(_content_to_str(msg.get("content"))) for msg in messages)
|
|
|
|
if not crud.check_and_increment_quota(db, request.state.api_key_id, tokens=prompt_tokens, requests=1):
|
|
raise HTTPException(status_code=429, detail="Quota exceeded")
|
|
|
|
usage_log.info('%s | /api/chat | %s | ~%d tokens | "%s"',
|
|
request.state.api_key_name, body.get("model", "?"), prompt_tokens, _last_user_msg(messages))
|
|
start = time.monotonic()
|
|
try:
|
|
response = await proxy_request(f"{ollama_url}/api/chat", method="POST", json_data=body)
|
|
resp_json = response.json()
|
|
usage_log.info('%s | /api/chat | %s | actual ↑%d ↓%d tokens | %.1fs',
|
|
request.state.api_key_name, body.get("model", "?"),
|
|
resp_json.get("prompt_eval_count", 0), resp_json.get("eval_count", 0),
|
|
time.monotonic() - start)
|
|
return JSONResponse(content=resp_json, status_code=response.status_code)
|
|
except Exception as exc:
|
|
error_log.error("Proxy error | %s | /api/chat | %s | %s: %s",
|
|
request.state.api_key_name, body.get("model", "?"), type(exc).__name__, exc, exc_info=exc)
|
|
raise
|
|
|
|
@app.get("/api/tags")
|
|
async def list_models(db: Session = Depends(get_db)):
|
|
ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434"))
|
|
response = await proxy_request(f"{ollama_url}/api/tags", method="GET")
|
|
return JSONResponse(content=response.json(), status_code=response.status_code)
|
|
|
|
@app.get("/version")
|
|
async def version():
|
|
return {"version": os.getenv("APP_VERSION", "dev")}
|
|
|
|
@app.get("/api/ps")
|
|
async def running_models(db: Session = Depends(get_db)):
|
|
ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434"))
|
|
response = await proxy_request(f"{ollama_url}/api/ps", method="GET")
|
|
return JSONResponse(content=response.json(), status_code=response.status_code)
|
|
|
|
@app.get("/api/versions")
|
|
async def versions(db: Session = Depends(get_db)):
|
|
ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434"))
|
|
response = await proxy_request(f"{ollama_url}/api/versions", method="GET")
|
|
return JSONResponse(content=response.json(), status_code=response.status_code)
|
|
|
|
|
|
# --- Anthropic Messages API compatibility layer ---
|
|
|
|
def _anthropic_content_to_str(content) -> str:
|
|
"""Flatten Anthropic content (string or block array) to a plain string."""
|
|
if isinstance(content, str):
|
|
return content
|
|
if isinstance(content, list):
|
|
parts = []
|
|
for block in content:
|
|
if not isinstance(block, dict):
|
|
continue
|
|
if block.get("type") == "text":
|
|
parts.append(block.get("text", ""))
|
|
elif block.get("type") == "tool_result":
|
|
raw = block.get("content", "")
|
|
if isinstance(raw, list):
|
|
raw = " ".join(r.get("text", "") for r in raw if isinstance(r, dict) and r.get("type") == "text")
|
|
parts.append(str(raw))
|
|
return " ".join(parts)
|
|
return str(content) if content else ""
|
|
|
|
|
|
def _anthropic_messages_to_ollama(messages: list, system: str = None) -> list:
|
|
"""Transform Anthropic messages array to Ollama /api/chat format."""
|
|
result = []
|
|
if system:
|
|
result.append({"role": "system", "content": system})
|
|
for msg in messages:
|
|
role = msg.get("role")
|
|
content = msg.get("content")
|
|
if role == "assistant" and isinstance(content, list):
|
|
text = " ".join(b.get("text", "") for b in content if isinstance(b, dict) and b.get("type") == "text")
|
|
tool_calls = [
|
|
{"function": {"name": b["name"], "arguments": b.get("input", {})}}
|
|
for b in content if isinstance(b, dict) and b.get("type") == "tool_use"
|
|
]
|
|
entry = {"role": "assistant", "content": text}
|
|
if tool_calls:
|
|
entry["tool_calls"] = tool_calls
|
|
result.append(entry)
|
|
elif role == "user" and isinstance(content, list):
|
|
text_parts = []
|
|
for block in content:
|
|
if not isinstance(block, dict):
|
|
continue
|
|
if block.get("type") == "tool_result":
|
|
if text_parts:
|
|
result.append({"role": "user", "content": " ".join(text_parts)})
|
|
text_parts = []
|
|
raw = block.get("content", "")
|
|
if isinstance(raw, list):
|
|
raw = " ".join(r.get("text", "") for r in raw if isinstance(r, dict) and r.get("type") == "text")
|
|
result.append({"role": "tool", "content": str(raw)})
|
|
elif block.get("type") == "text":
|
|
text_parts.append(block.get("text", ""))
|
|
if text_parts:
|
|
result.append({"role": "user", "content": " ".join(text_parts)})
|
|
else:
|
|
result.append({"role": role, "content": _anthropic_content_to_str(content)})
|
|
return result
|
|
|
|
|
|
def _anthropic_tools_to_ollama(tools: list) -> list:
|
|
"""Transform Anthropic tools to Ollama/OpenAI function format."""
|
|
return [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": t["name"],
|
|
"description": t.get("description", ""),
|
|
"parameters": t.get("input_schema", {}),
|
|
},
|
|
}
|
|
for t in tools
|
|
]
|
|
|
|
|
|
def _ollama_to_anthropic_response(ollama_resp: dict, model_name: str, msg_id: str) -> dict:
|
|
"""Transform an Ollama /api/chat response to Anthropic Messages API format."""
|
|
msg = ollama_resp.get("message", {})
|
|
text = msg.get("content", "")
|
|
tool_calls = msg.get("tool_calls") or []
|
|
|
|
content_blocks = []
|
|
if text:
|
|
content_blocks.append({"type": "text", "text": text})
|
|
|
|
stop_reason = "end_turn"
|
|
for i, tc in enumerate(tool_calls):
|
|
stop_reason = "tool_use"
|
|
fn = tc.get("function", {})
|
|
args = fn.get("arguments", {})
|
|
if isinstance(args, str):
|
|
try:
|
|
args = json.loads(args)
|
|
except json.JSONDecodeError:
|
|
args = {}
|
|
content_blocks.append({
|
|
"type": "tool_use",
|
|
"id": f"toolu_{msg_id}_{i}",
|
|
"name": fn.get("name", ""),
|
|
"input": args,
|
|
})
|
|
|
|
return {
|
|
"id": f"msg_{msg_id}",
|
|
"type": "message",
|
|
"role": "assistant",
|
|
"content": content_blocks,
|
|
"model": model_name,
|
|
"stop_reason": stop_reason,
|
|
"stop_sequence": None,
|
|
"usage": {
|
|
"input_tokens": ollama_resp.get("prompt_eval_count", 0),
|
|
"output_tokens": ollama_resp.get("eval_count", 0),
|
|
},
|
|
}
|
|
|
|
|
|
@app.post("/v1/messages")
|
|
async def anthropic_messages(request: Request, db: Session = Depends(get_db)):
|
|
ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434"))
|
|
body = await request.json()
|
|
|
|
force_model = crud.get_setting(db, "force_model") or None
|
|
model_name = force_model or os.getenv("ANTHROPIC_DEFAULT_MODEL") or body.get("model")
|
|
if not model_name:
|
|
raise HTTPException(status_code=422, detail="Field 'model' is required")
|
|
|
|
anthropic_msgs = body.get("messages", [])
|
|
system = body.get("system")
|
|
|
|
system_str = _anthropic_content_to_str(system) if system else ""
|
|
all_text = system_str + " ".join(_anthropic_content_to_str(m.get("content")) for m in anthropic_msgs)
|
|
prompt_tokens = crud.count_tokens(all_text)
|
|
|
|
if not crud.check_and_increment_quota(db, request.state.api_key_id, tokens=prompt_tokens, requests=1):
|
|
raise HTTPException(status_code=429, detail="Quota exceeded")
|
|
|
|
ollama_messages = _anthropic_messages_to_ollama(anthropic_msgs, system=system_str)
|
|
ollama_body: dict = {"model": model_name, "messages": ollama_messages, "stream": body.get("stream", False)}
|
|
if tools := body.get("tools"):
|
|
ollama_body["tools"] = _anthropic_tools_to_ollama(tools)
|
|
|
|
msg_id = secrets.token_hex(12)
|
|
target = f"{ollama_url}/api/chat"
|
|
|
|
usage_log.info('%s | /v1/messages | %s | ~%d tokens | "%s"',
|
|
request.state.api_key_name, model_name, prompt_tokens, _last_user_msg(ollama_messages))
|
|
start = time.monotonic()
|
|
|
|
if body.get("stream"):
|
|
# Backend wird immer non-streaming aufgerufen; der Dev-Proxy baut SSE selbst auf.
|
|
# Das ist nötig, weil vorgelagerte Proxys (z.B. Produktiv-Proxy) /api/chat
|
|
# nur non-streaming exponieren.
|
|
non_stream_body = {**ollama_body, "stream": False}
|
|
|
|
async def generate():
|
|
try:
|
|
response = await proxy_request(target, method="POST", json_data=non_stream_body)
|
|
ollama_resp = response.json()
|
|
except Exception as exc:
|
|
error_log.error("Stream error | %s | /v1/messages | %s | %s: %s",
|
|
request.state.api_key_name, model_name, type(exc).__name__, exc, exc_info=exc)
|
|
raise
|
|
|
|
msg = ollama_resp.get("message", {})
|
|
text = msg.get("content", "")
|
|
input_tokens = ollama_resp.get("prompt_eval_count", 0)
|
|
output_tokens = ollama_resp.get("eval_count", 0)
|
|
|
|
yield f"event: message_start\ndata: {json.dumps({'type': 'message_start', 'message': {'id': f'msg_{msg_id}', 'type': 'message', 'role': 'assistant', 'content': [], 'model': model_name, 'stop_reason': None, 'stop_sequence': None, 'usage': {'input_tokens': input_tokens, 'output_tokens': 0}}})}\n\n"
|
|
yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n"
|
|
yield f"event: ping\ndata: {json.dumps({'type': 'ping'})}\n\n"
|
|
if text:
|
|
yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': text}})}\n\n"
|
|
yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n"
|
|
yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None}, 'usage': {'output_tokens': output_tokens}})}\n\n"
|
|
yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n"
|
|
usage_log.info('%s | /v1/messages | %s | actual ↑%d ↓%d tokens | %.1fs',
|
|
request.state.api_key_name, model_name,
|
|
input_tokens, output_tokens,
|
|
time.monotonic() - start)
|
|
|
|
return StreamingResponse(
|
|
generate(),
|
|
media_type="text/event-stream",
|
|
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
|
|
)
|
|
|
|
try:
|
|
response = await proxy_request(target, method="POST", json_data=ollama_body)
|
|
result = _ollama_to_anthropic_response(response.json(), model_name, msg_id)
|
|
usage_log.info('%s | /v1/messages | %s | actual ↑%d ↓%d tokens | %.1fs',
|
|
request.state.api_key_name, model_name,
|
|
result["usage"]["input_tokens"], result["usage"]["output_tokens"],
|
|
time.monotonic() - start)
|
|
return JSONResponse(content=result, status_code=response.status_code)
|
|
except Exception as exc:
|
|
error_log.error("Proxy error | %s | /v1/messages | %s | %s: %s",
|
|
request.state.api_key_name, model_name, type(exc).__name__, exc, exc_info=exc)
|
|
raise
|
|
|
|
@app.get("/v1/models")
|
|
async def list_openai_models(db: Session = Depends(get_db)):
|
|
ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434"))
|
|
response = await proxy_request(f"{ollama_url}/v1/models", method="GET")
|
|
return JSONResponse(content=response.json(), status_code=response.status_code)
|
|
|
|
@app.post("/v1/chat/completions")
|
|
async def openai_chat_completions(request: Request, db: Session = Depends(get_db)):
|
|
ollama_url = crud.get_setting(db, "ollama_url", os.getenv("OLLAMA_URL", "http://localhost:11434"))
|
|
|
|
body = await request.json()
|
|
force_model = crud.get_setting(db, "force_model") or None
|
|
if force_model:
|
|
body = {**body, "model": force_model}
|
|
if not body.get("model"):
|
|
raise HTTPException(status_code=422, detail="Field 'model' is required")
|
|
messages = body.get("messages", [])
|
|
prompt_tokens = sum(crud.count_tokens(_content_to_str(msg.get("content"))) for msg in messages)
|
|
|
|
if not crud.check_and_increment_quota(db, request.state.api_key_id, tokens=prompt_tokens, requests=1):
|
|
raise HTTPException(status_code=429, detail="Quota exceeded")
|
|
|
|
model_name = body["model"]
|
|
|
|
usage_log.info('%s | /v1/chat/completions | %s | ~%d tokens | "%s"',
|
|
request.state.api_key_name, model_name, prompt_tokens, _last_user_msg(messages))
|
|
|
|
target = f"{ollama_url}/v1/chat/completions"
|
|
|
|
if body.get("stream"):
|
|
existing_opts = body.get("stream_options") or {}
|
|
stream_body = {**body, "stream_options": {**existing_opts, "include_usage": True}}
|
|
start = time.monotonic()
|
|
usage_tokens = {"prompt": 0, "completion": 0}
|
|
|
|
async def generate():
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
async with client.stream("POST", target, json=stream_body, headers=_backend_headers()) as resp:
|
|
async for chunk in resp.aiter_bytes():
|
|
try:
|
|
for line in chunk.decode("utf-8", errors="ignore").splitlines():
|
|
if line.startswith("data: ") and "[DONE]" not in line:
|
|
data = json.loads(line[6:])
|
|
if u := data.get("usage"):
|
|
usage_tokens["prompt"] = u.get("prompt_tokens", 0)
|
|
usage_tokens["completion"] = u.get("completion_tokens", 0)
|
|
except Exception:
|
|
pass
|
|
yield chunk
|
|
except Exception as exc:
|
|
error_log.error("Stream error | %s | /v1/chat/completions | %s | %s: %s",
|
|
request.state.api_key_name, model_name, type(exc).__name__, exc, exc_info=exc)
|
|
raise
|
|
finally:
|
|
usage_log.info('%s | /v1/chat/completions | %s | actual ↑%d ↓%d tokens | %.1fs',
|
|
request.state.api_key_name, model_name,
|
|
usage_tokens["prompt"], usage_tokens["completion"],
|
|
time.monotonic() - start)
|
|
|
|
return StreamingResponse(
|
|
generate(),
|
|
media_type="text/event-stream",
|
|
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
|
|
)
|
|
|
|
start = time.monotonic()
|
|
try:
|
|
response = await proxy_request(target, method="POST", json_data=body)
|
|
resp_json = response.json()
|
|
usage = resp_json.get("usage", {})
|
|
usage_log.info('%s | /v1/chat/completions | %s | actual ↑%d ↓%d tokens | %.1fs',
|
|
request.state.api_key_name, model_name,
|
|
usage.get("prompt_tokens", 0), usage.get("completion_tokens", 0),
|
|
time.monotonic() - start)
|
|
return JSONResponse(content=resp_json, status_code=response.status_code)
|
|
except Exception as exc:
|
|
error_log.error("Proxy error | %s | /v1/chat/completions | %s | %s: %s",
|
|
request.state.api_key_name, model_name, type(exc).__name__, exc, exc_info=exc)
|
|
raise
|