llmproxy/backend/admin.py
Oliver Hofmann 562f6ecd9c Init
2026-04-27 18:54:27 +02:00

113 lines
3.7 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from database import get_db
import crud, schemas
from models import User, APIKey, Quota
app = FastAPI(title="Ollama Proxy Admin API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
async def require_admin_auth(request: Request):
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
api_key = auth_header.replace("Bearer ", "")
elif auth_header.startswith("sk-"):
api_key = auth_header
else:
raise HTTPException(status_code=401, detail="Invalid or missing API key")
db = next(get_db())
db_key = crud.verify_api_key(db, api_key)
if not db_key:
raise HTTPException(status_code=401, detail="Invalid API key")
db_user = db.query(User).filter(User.id == db_key.user_id).first()
if not db_user or db_user.username != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
request.state.user = db_user
request.state.db = db
@app.get("/api/users", response_model=list[schemas.User])
async def read_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
_ = Depends(require_admin_auth)
):
users = db.query(User).offset(skip).limit(limit).all()
return users
@app.post("/api/users", response_model=schemas.User)
async def create_user(
user: schemas.UserCreate,
db: Session = Depends(get_db),
_ = Depends(require_admin_auth)
):
db_user = crud.get_user_by_username(db, username=user.username)
if db_user:
raise HTTPException(status_code=400, detail="Username already registered")
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, username=user.username, email=user.email, password=user.password)
@app.post("/api/api-keys", response_model=schemas.APIKey)
async def create_api_key(
api_key: schemas.APIKeyCreate,
db: Session = Depends(get_db),
_ = Depends(require_admin_auth)
):
db_user = db.query(User).filter(User.id == api_key.user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
return crud.create_api_key(db=db, user_id=api_key.user_id, name=api_key.name)
@app.get("/api/api-keys", response_model=list[schemas.APIKey])
async def read_api_keys(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
_ = Depends(require_admin_auth)
):
api_keys = db.query(APIKey).offset(skip).limit(limit).all()
return api_keys
@app.put("/api/api-keys/{api_key_id}/deactivate")
async def deactivate_api_key(
api_key_id: int,
db: Session = Depends(get_db),
_ = Depends(require_admin_auth)
):
db_key = db.query(APIKey).filter(APIKey.id == api_key_id).first()
if not db_key:
raise HTTPException(status_code=404, detail="API key not found")
db_key.is_active = False
db.commit()
return {"message": "API key deactivated"}
@app.put("/api/quotas/{user_id}", response_model=schemas.Quota)
async def update_quota(
user_id: int,
quota: schemas.QuotaCreate,
db: Session = Depends(get_db),
_ = Depends(require_admin_auth)
):
db_quota = db.query(Quota).filter(Quota.user_id == user_id).first()
if not db_quota:
raise HTTPException(status_code=404, detail="Quota not found")
for key, value in quota.dict(exclude_unset=True).items():
setattr(db_quota, key, value)
db.commit()
db.refresh(db_quota)
return db_quota