1721 lines
60 KiB
Python
1721 lines
60 KiB
Python
from fastapi import FastAPI, Depends, HTTPException, status, UploadFile, File
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from sqlalchemy.orm import Session, joinedload
|
|
from sqlalchemy import func, case
|
|
from typing import List, Optional
|
|
import os
|
|
import boto3
|
|
from botocore.client import Config
|
|
import uuid
|
|
# S3/MinIO configuration
|
|
S3_ENDPOINT = os.getenv('MINIO_ENDPOINT', 'http://localhost:9000')
|
|
S3_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY', 'minioadmin')
|
|
S3_SECRET_KEY = os.getenv('MINIO_SECRET_KEY', 'minioadmin')
|
|
S3_IMAGE_BUCKET = os.getenv('MINIO_IMAGE_BUCKET', 'images')
|
|
S3_PDF_BUCKET = os.getenv('MINIO_PDF_BUCKET', 'pdfs')
|
|
|
|
s3_client = boto3.client(
|
|
's3',
|
|
endpoint_url=S3_ENDPOINT,
|
|
aws_access_key_id=S3_ACCESS_KEY,
|
|
aws_secret_access_key=S3_SECRET_KEY,
|
|
config=Config(signature_version='s3v4'),
|
|
region_name='us-east-1'
|
|
)
|
|
import shutil
|
|
from datetime import datetime, timedelta
|
|
|
|
from app.core.database import engine, get_db, Base
|
|
from app.core.security import verify_password, get_password_hash, create_access_token, decode_access_token
|
|
from app import models, schemas
|
|
|
|
# Crear tablas
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
app = FastAPI(title="Checklist Inteligente API", version="1.0.0")
|
|
|
|
# CORS
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["http://localhost:5173", "http://localhost:3000"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
security = HTTPBearer()
|
|
|
|
# Dependency para obtener usuario actual
|
|
def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
token = credentials.credentials
|
|
|
|
# Verificar si es un API token (comienza con "syntria_")
|
|
if token.startswith("syntria_"):
|
|
api_token = db.query(models.APIToken).filter(
|
|
models.APIToken.token == token,
|
|
models.APIToken.is_active == True
|
|
).first()
|
|
|
|
if not api_token:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="API Token inválido o inactivo"
|
|
)
|
|
|
|
# Actualizar último uso
|
|
api_token.last_used_at = datetime.utcnow()
|
|
db.commit()
|
|
|
|
# Obtener usuario
|
|
user = db.query(models.User).filter(models.User.id == api_token.user_id).first()
|
|
if not user or not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Usuario inválido o inactivo"
|
|
)
|
|
|
|
return user
|
|
|
|
# Si no es API token, es JWT token
|
|
payload = decode_access_token(token)
|
|
print(f"Token payload: {payload}") # Debug
|
|
if payload is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Token inválido o expirado"
|
|
)
|
|
|
|
user_id = int(payload.get("sub"))
|
|
print(f"Looking for user ID: {user_id}") # Debug
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if user is None:
|
|
print(f"User not found with ID: {user_id}") # Debug
|
|
raise HTTPException(status_code=404, detail="Usuario no encontrado")
|
|
|
|
return user
|
|
|
|
|
|
# ============= AUTH ENDPOINTS =============
|
|
@app.post("/api/auth/register", response_model=schemas.User)
|
|
def register(user: schemas.UserCreate, db: Session = Depends(get_db)):
|
|
# Verificar si usuario existe
|
|
db_user = db.query(models.User).filter(models.User.username == user.username).first()
|
|
if db_user:
|
|
raise HTTPException(status_code=400, detail="Usuario ya existe")
|
|
|
|
# Crear usuario
|
|
hashed_password = get_password_hash(user.password)
|
|
db_user = models.User(
|
|
username=user.username,
|
|
email=user.email,
|
|
full_name=user.full_name,
|
|
role=user.role,
|
|
password_hash=hashed_password
|
|
)
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
|
|
@app.post("/api/auth/login", response_model=schemas.Token)
|
|
def login(user_login: schemas.UserLogin, db: Session = Depends(get_db)):
|
|
user = db.query(models.User).filter(models.User.username == user_login.username).first()
|
|
if not user or not verify_password(user_login.password, user.password_hash):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Usuario o contraseña incorrectos"
|
|
)
|
|
|
|
access_token = create_access_token(data={"sub": str(user.id), "role": user.role})
|
|
return {
|
|
"access_token": access_token,
|
|
"token_type": "bearer",
|
|
"user": user
|
|
}
|
|
|
|
|
|
@app.get("/api/auth/me", response_model=schemas.User)
|
|
def get_me(current_user: models.User = Depends(get_current_user)):
|
|
return current_user
|
|
|
|
|
|
# ============= USER ENDPOINTS =============
|
|
@app.get("/api/users", response_model=List[schemas.User])
|
|
def get_users(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
active_only: bool = False,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
# Solo admin puede ver todos los usuarios
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No tienes permisos para ver usuarios")
|
|
|
|
query = db.query(models.User)
|
|
if active_only:
|
|
query = query.filter(models.User.is_active == True)
|
|
|
|
return query.offset(skip).limit(limit).all()
|
|
|
|
|
|
@app.get("/api/users/{user_id}", response_model=schemas.User)
|
|
def get_user(
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
# Solo admin puede ver otros usuarios
|
|
if current_user.role != "admin" and current_user.id != user_id:
|
|
raise HTTPException(status_code=403, detail="No tienes permisos para ver este usuario")
|
|
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="Usuario no encontrado")
|
|
|
|
return user
|
|
|
|
|
|
@app.post("/api/users", response_model=schemas.User)
|
|
def create_user(
|
|
user: schemas.UserCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
# Solo admin puede crear usuarios
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No tienes permisos para crear usuarios")
|
|
|
|
# Verificar si usuario existe
|
|
db_user = db.query(models.User).filter(models.User.username == user.username).first()
|
|
if db_user:
|
|
raise HTTPException(status_code=400, detail="Usuario ya existe")
|
|
|
|
# Verificar si email existe
|
|
if user.email:
|
|
db_email = db.query(models.User).filter(models.User.email == user.email).first()
|
|
if db_email:
|
|
raise HTTPException(status_code=400, detail="Email ya está en uso")
|
|
|
|
# Crear usuario
|
|
hashed_password = get_password_hash(user.password)
|
|
db_user = models.User(
|
|
username=user.username,
|
|
email=user.email,
|
|
full_name=user.full_name,
|
|
role=user.role,
|
|
password_hash=hashed_password,
|
|
is_active=True
|
|
)
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
|
|
@app.put("/api/users/{user_id}", response_model=schemas.User)
|
|
def update_user(
|
|
user_id: int,
|
|
user_update: schemas.UserUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
# Solo admin puede actualizar otros usuarios
|
|
if current_user.role != "admin" and current_user.id != user_id:
|
|
raise HTTPException(status_code=403, detail="No tienes permisos para actualizar este usuario")
|
|
|
|
db_user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if not db_user:
|
|
raise HTTPException(status_code=404, detail="Usuario no encontrado")
|
|
|
|
# Actualizar campos
|
|
if user_update.username is not None:
|
|
# Verificar si username está en uso
|
|
existing = db.query(models.User).filter(
|
|
models.User.username == user_update.username,
|
|
models.User.id != user_id
|
|
).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Nombre de usuario ya está en uso")
|
|
db_user.username = user_update.username
|
|
|
|
if user_update.email is not None:
|
|
# Verificar si email está en uso
|
|
existing = db.query(models.User).filter(
|
|
models.User.email == user_update.email,
|
|
models.User.id != user_id
|
|
).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Email ya está en uso")
|
|
db_user.email = user_update.email
|
|
|
|
if user_update.full_name is not None:
|
|
db_user.full_name = user_update.full_name
|
|
|
|
# Solo admin puede cambiar roles
|
|
if user_update.role is not None:
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No tienes permisos para cambiar roles")
|
|
db_user.role = user_update.role
|
|
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
|
|
@app.patch("/api/users/{user_id}/deactivate")
|
|
def deactivate_user(
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
# Solo admin puede inactivar usuarios
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No tienes permisos para inactivar usuarios")
|
|
|
|
# No permitir auto-inactivación
|
|
if current_user.id == user_id:
|
|
raise HTTPException(status_code=400, detail="No puedes inactivar tu propio usuario")
|
|
|
|
db_user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if not db_user:
|
|
raise HTTPException(status_code=404, detail="Usuario no encontrado")
|
|
|
|
db_user.is_active = False
|
|
db.commit()
|
|
|
|
return {"message": "Usuario inactivado correctamente", "user_id": user_id}
|
|
|
|
|
|
@app.patch("/api/users/{user_id}/activate")
|
|
def activate_user(
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
# Solo admin puede activar usuarios
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No tienes permisos para activar usuarios")
|
|
|
|
db_user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if not db_user:
|
|
raise HTTPException(status_code=404, detail="Usuario no encontrado")
|
|
|
|
db_user.is_active = True
|
|
db.commit()
|
|
|
|
return {"message": "Usuario activado correctamente", "user_id": user_id}
|
|
|
|
|
|
@app.patch("/api/users/{user_id}/password")
|
|
def change_user_password(
|
|
user_id: int,
|
|
password_update: schemas.AdminPasswordUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
# Solo admin puede cambiar contraseñas de otros usuarios
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No tienes permisos para cambiar contraseñas")
|
|
|
|
db_user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if not db_user:
|
|
raise HTTPException(status_code=404, detail="Usuario no encontrado")
|
|
|
|
# Cambiar contraseña
|
|
db_user.password_hash = get_password_hash(password_update.new_password)
|
|
db.commit()
|
|
|
|
return {"message": "Contraseña actualizada correctamente"}
|
|
|
|
|
|
@app.patch("/api/users/me/password")
|
|
def change_my_password(
|
|
password_update: schemas.UserPasswordUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
# Verificar contraseña actual
|
|
if not verify_password(password_update.current_password, current_user.password_hash):
|
|
raise HTTPException(status_code=400, detail="Contraseña actual incorrecta")
|
|
|
|
# Cambiar contraseña
|
|
current_user.password_hash = get_password_hash(password_update.new_password)
|
|
db.commit()
|
|
|
|
return {"message": "Contraseña actualizada correctamente"}
|
|
|
|
|
|
@app.put("/api/users/me", response_model=schemas.User)
|
|
def update_my_profile(
|
|
user_update: schemas.UserUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
# Actualizar email
|
|
if user_update.email is not None:
|
|
# Verificar si email está en uso
|
|
existing = db.query(models.User).filter(
|
|
models.User.email == user_update.email,
|
|
models.User.id != current_user.id
|
|
).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Email ya está en uso")
|
|
current_user.email = user_update.email
|
|
|
|
# Actualizar nombre
|
|
if user_update.full_name is not None:
|
|
current_user.full_name = user_update.full_name
|
|
|
|
# No permitir cambio de rol desde perfil
|
|
db.commit()
|
|
db.refresh(current_user)
|
|
return current_user
|
|
|
|
|
|
# ============= API TOKENS ENDPOINTS =============
|
|
@app.get("/api/users/me/tokens", response_model=List[schemas.APIToken])
|
|
def get_my_tokens(
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
"""Obtener todos mis API tokens"""
|
|
tokens = db.query(models.APIToken).filter(
|
|
models.APIToken.user_id == current_user.id
|
|
).all()
|
|
return tokens
|
|
|
|
|
|
@app.post("/api/users/me/tokens", response_model=schemas.APITokenWithValue)
|
|
def create_my_token(
|
|
token_create: schemas.APITokenCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
"""Generar un nuevo API token"""
|
|
from app.core.security import generate_api_token
|
|
|
|
# Generar token único
|
|
token_value = generate_api_token()
|
|
|
|
# Crear registro
|
|
api_token = models.APIToken(
|
|
user_id=current_user.id,
|
|
token=token_value,
|
|
description=token_create.description,
|
|
is_active=True
|
|
)
|
|
|
|
db.add(api_token)
|
|
db.commit()
|
|
db.refresh(api_token)
|
|
|
|
# Retornar con el token completo (solo esta vez)
|
|
return schemas.APITokenWithValue(
|
|
id=api_token.id,
|
|
token=api_token.token,
|
|
description=api_token.description,
|
|
is_active=api_token.is_active,
|
|
last_used_at=api_token.last_used_at,
|
|
created_at=api_token.created_at
|
|
)
|
|
|
|
|
|
@app.delete("/api/users/me/tokens/{token_id}")
|
|
def delete_my_token(
|
|
token_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
"""Revocar uno de mis API tokens"""
|
|
api_token = db.query(models.APIToken).filter(
|
|
models.APIToken.id == token_id,
|
|
models.APIToken.user_id == current_user.id
|
|
).first()
|
|
|
|
if not api_token:
|
|
raise HTTPException(status_code=404, detail="Token no encontrado")
|
|
|
|
api_token.is_active = False
|
|
db.commit()
|
|
|
|
return {"message": "Token revocado correctamente", "token_id": token_id}
|
|
|
|
|
|
@app.get("/api/users/{user_id}/tokens", response_model=List[schemas.APIToken])
|
|
def get_user_tokens(
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
"""Obtener tokens de un usuario (solo admin)"""
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No tienes permisos")
|
|
|
|
tokens = db.query(models.APIToken).filter(
|
|
models.APIToken.user_id == user_id
|
|
).all()
|
|
return tokens
|
|
|
|
|
|
@app.delete("/api/users/{user_id}/tokens/{token_id}")
|
|
def delete_user_token(
|
|
user_id: int,
|
|
token_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
"""Revocar token de un usuario (solo admin)"""
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No tienes permisos")
|
|
|
|
api_token = db.query(models.APIToken).filter(
|
|
models.APIToken.id == token_id,
|
|
models.APIToken.user_id == user_id
|
|
).first()
|
|
|
|
if not api_token:
|
|
raise HTTPException(status_code=404, detail="Token no encontrado")
|
|
|
|
api_token.is_active = False
|
|
db.commit()
|
|
|
|
return {"message": "Token revocado correctamente", "token_id": token_id}
|
|
|
|
|
|
# ============= CHECKLIST ENDPOINTS =============
|
|
@app.get("/api/checklists", response_model=List[schemas.Checklist])
|
|
def get_checklists(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
active_only: bool = False,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
query = db.query(models.Checklist)
|
|
if active_only:
|
|
query = query.filter(models.Checklist.is_active == True)
|
|
return query.offset(skip).limit(limit).all()
|
|
|
|
|
|
@app.get("/api/checklists/{checklist_id}", response_model=schemas.ChecklistWithQuestions)
|
|
def get_checklist(checklist_id: int, db: Session = Depends(get_db)):
|
|
checklist = db.query(models.Checklist).options(
|
|
joinedload(models.Checklist.questions)
|
|
).filter(models.Checklist.id == checklist_id).first()
|
|
|
|
if not checklist:
|
|
raise HTTPException(status_code=404, detail="Checklist no encontrado")
|
|
|
|
return checklist
|
|
|
|
|
|
@app.post("/api/checklists", response_model=schemas.Checklist)
|
|
def create_checklist(
|
|
checklist: schemas.ChecklistCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No autorizado")
|
|
|
|
db_checklist = models.Checklist(**checklist.dict(), created_by=current_user.id)
|
|
db.add(db_checklist)
|
|
db.commit()
|
|
db.refresh(db_checklist)
|
|
return db_checklist
|
|
|
|
|
|
@app.put("/api/checklists/{checklist_id}", response_model=schemas.Checklist)
|
|
def update_checklist(
|
|
checklist_id: int,
|
|
checklist: schemas.ChecklistUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No autorizado")
|
|
|
|
db_checklist = db.query(models.Checklist).filter(models.Checklist.id == checklist_id).first()
|
|
if not db_checklist:
|
|
raise HTTPException(status_code=404, detail="Checklist no encontrado")
|
|
|
|
for key, value in checklist.dict(exclude_unset=True).items():
|
|
setattr(db_checklist, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_checklist)
|
|
return db_checklist
|
|
|
|
|
|
# ============= QUESTION ENDPOINTS =============
|
|
@app.post("/api/questions", response_model=schemas.Question)
|
|
def create_question(
|
|
question: schemas.QuestionCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No autorizado")
|
|
|
|
db_question = models.Question(**question.dict())
|
|
db.add(db_question)
|
|
|
|
# Actualizar max_score del checklist
|
|
checklist = db.query(models.Checklist).filter(
|
|
models.Checklist.id == question.checklist_id
|
|
).first()
|
|
if checklist:
|
|
checklist.max_score += question.points
|
|
|
|
db.commit()
|
|
db.refresh(db_question)
|
|
return db_question
|
|
|
|
|
|
@app.put("/api/questions/{question_id}", response_model=schemas.Question)
|
|
def update_question(
|
|
question_id: int,
|
|
question: schemas.QuestionUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No autorizado")
|
|
|
|
db_question = db.query(models.Question).filter(models.Question.id == question_id).first()
|
|
if not db_question:
|
|
raise HTTPException(status_code=404, detail="Pregunta no encontrada")
|
|
|
|
for key, value in question.dict(exclude_unset=True).items():
|
|
setattr(db_question, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_question)
|
|
return db_question
|
|
|
|
|
|
@app.delete("/api/questions/{question_id}")
|
|
def delete_question(
|
|
question_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No autorizado")
|
|
|
|
db_question = db.query(models.Question).filter(models.Question.id == question_id).first()
|
|
if not db_question:
|
|
raise HTTPException(status_code=404, detail="Pregunta no encontrada")
|
|
|
|
db.delete(db_question)
|
|
db.commit()
|
|
return {"message": "Pregunta eliminada"}
|
|
|
|
|
|
# ============= INSPECTION ENDPOINTS =============
|
|
@app.get("/api/inspections", response_model=List[schemas.Inspection])
|
|
def get_inspections(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
vehicle_plate: str = None,
|
|
status: str = None,
|
|
show_inactive: bool = False,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
query = db.query(models.Inspection)
|
|
|
|
# Por defecto, solo mostrar inspecciones activas
|
|
if not show_inactive:
|
|
query = query.filter(models.Inspection.is_active == True)
|
|
|
|
# Mecánicos solo ven sus inspecciones
|
|
if current_user.role == "mechanic":
|
|
query = query.filter(models.Inspection.mechanic_id == current_user.id)
|
|
|
|
if vehicle_plate:
|
|
query = query.filter(models.Inspection.vehicle_plate.contains(vehicle_plate))
|
|
|
|
if status:
|
|
query = query.filter(models.Inspection.status == status)
|
|
|
|
return query.order_by(models.Inspection.created_at.desc()).offset(skip).limit(limit).all()
|
|
|
|
|
|
@app.get("/api/inspections/{inspection_id}", response_model=schemas.InspectionDetail)
|
|
def get_inspection(
|
|
inspection_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
inspection = db.query(models.Inspection).options(
|
|
joinedload(models.Inspection.checklist).joinedload(models.Checklist.questions),
|
|
joinedload(models.Inspection.mechanic),
|
|
joinedload(models.Inspection.answers).joinedload(models.Answer.question),
|
|
joinedload(models.Inspection.answers).joinedload(models.Answer.media_files)
|
|
).filter(models.Inspection.id == inspection_id).first()
|
|
|
|
if not inspection:
|
|
raise HTTPException(status_code=404, detail="Inspección no encontrada")
|
|
|
|
return inspection
|
|
|
|
|
|
@app.post("/api/inspections", response_model=schemas.Inspection)
|
|
def create_inspection(
|
|
inspection: schemas.InspectionCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
# Obtener max_score del checklist
|
|
checklist = db.query(models.Checklist).filter(
|
|
models.Checklist.id == inspection.checklist_id
|
|
).first()
|
|
|
|
if not checklist:
|
|
raise HTTPException(status_code=404, detail="Checklist no encontrado")
|
|
|
|
db_inspection = models.Inspection(
|
|
**inspection.dict(),
|
|
mechanic_id=current_user.id,
|
|
max_score=checklist.max_score
|
|
)
|
|
db.add(db_inspection)
|
|
db.commit()
|
|
db.refresh(db_inspection)
|
|
return db_inspection
|
|
|
|
|
|
@app.put("/api/inspections/{inspection_id}", response_model=schemas.Inspection)
|
|
def update_inspection(
|
|
inspection_id: int,
|
|
inspection: schemas.InspectionUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
db_inspection = db.query(models.Inspection).filter(
|
|
models.Inspection.id == inspection_id
|
|
).first()
|
|
|
|
if not db_inspection:
|
|
raise HTTPException(status_code=404, detail="Inspección no encontrada")
|
|
|
|
for key, value in inspection.dict(exclude_unset=True).items():
|
|
setattr(db_inspection, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_inspection)
|
|
return db_inspection
|
|
|
|
|
|
@app.post("/api/inspections/{inspection_id}/complete", response_model=schemas.Inspection)
|
|
def complete_inspection(
|
|
inspection_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
inspection = db.query(models.Inspection).filter(
|
|
models.Inspection.id == inspection_id
|
|
).first()
|
|
|
|
if not inspection:
|
|
raise HTTPException(status_code=404, detail="Inspección no encontrada")
|
|
|
|
# Calcular score
|
|
answers = db.query(models.Answer).filter(models.Answer.inspection_id == inspection_id).all()
|
|
total_score = sum(a.points_earned for a in answers)
|
|
flagged_count = sum(1 for a in answers if a.is_flagged)
|
|
|
|
inspection.score = total_score
|
|
inspection.percentage = (total_score / inspection.max_score * 100) if inspection.max_score > 0 else 0
|
|
inspection.flagged_items_count = flagged_count
|
|
inspection.status = "completed"
|
|
inspection.completed_at = datetime.utcnow()
|
|
|
|
db.commit()
|
|
db.refresh(inspection)
|
|
return inspection
|
|
|
|
|
|
@app.patch("/api/inspections/{inspection_id}/deactivate")
|
|
def deactivate_inspection(
|
|
inspection_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
# Solo admin puede inactivar
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="No tienes permisos para inactivar inspecciones")
|
|
|
|
inspection = db.query(models.Inspection).filter(
|
|
models.Inspection.id == inspection_id
|
|
).first()
|
|
|
|
if not inspection:
|
|
raise HTTPException(status_code=404, detail="Inspección no encontrada")
|
|
|
|
inspection.is_active = False
|
|
inspection.status = "inactive"
|
|
|
|
db.commit()
|
|
db.refresh(inspection)
|
|
|
|
return {"message": "Inspección inactivada correctamente", "inspection_id": inspection_id}
|
|
|
|
|
|
# ============= ANSWER ENDPOINTS =============
|
|
@app.post("/api/answers", response_model=schemas.Answer)
|
|
def create_answer(
|
|
answer: schemas.AnswerCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
# Obtener la pregunta para saber los puntos
|
|
question = db.query(models.Question).filter(models.Question.id == answer.question_id).first()
|
|
|
|
if not question:
|
|
raise HTTPException(status_code=404, detail="Pregunta no encontrada")
|
|
|
|
# Calcular puntos según status
|
|
points_earned = 0
|
|
if answer.status == "ok":
|
|
points_earned = question.points
|
|
elif answer.status == "warning":
|
|
points_earned = int(question.points * 0.5)
|
|
|
|
db_answer = models.Answer(
|
|
**answer.dict(),
|
|
points_earned=points_earned
|
|
)
|
|
db.add(db_answer)
|
|
db.commit()
|
|
db.refresh(db_answer)
|
|
return db_answer
|
|
|
|
|
|
@app.put("/api/answers/{answer_id}", response_model=schemas.Answer)
|
|
def update_answer(
|
|
answer_id: int,
|
|
answer: schemas.AnswerUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
db_answer = db.query(models.Answer).filter(models.Answer.id == answer_id).first()
|
|
|
|
if not db_answer:
|
|
raise HTTPException(status_code=404, detail="Respuesta no encontrada")
|
|
|
|
# Recalcular puntos si cambió el status
|
|
if answer.status and answer.status != db_answer.status:
|
|
question = db.query(models.Question).filter(
|
|
models.Question.id == db_answer.question_id
|
|
).first()
|
|
|
|
if answer.status == "ok":
|
|
db_answer.points_earned = question.points
|
|
elif answer.status == "warning":
|
|
db_answer.points_earned = int(question.points * 0.5)
|
|
else:
|
|
db_answer.points_earned = 0
|
|
|
|
for key, value in answer.dict(exclude_unset=True).items():
|
|
setattr(db_answer, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_answer)
|
|
return db_answer
|
|
|
|
|
|
# ============= MEDIA FILE ENDPOINTS =============
|
|
@app.post("/api/answers/{answer_id}/upload", response_model=schemas.MediaFile)
|
|
async def upload_photo(
|
|
answer_id: int,
|
|
file: UploadFile = File(...),
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
# Verificar que la respuesta existe
|
|
answer = db.query(models.Answer).filter(models.Answer.id == answer_id).first()
|
|
if not answer:
|
|
raise HTTPException(status_code=404, detail="Respuesta no encontrada")
|
|
|
|
# Subir imagen a S3/MinIO
|
|
file_extension = file.filename.split(".")[-1]
|
|
now = datetime.now()
|
|
folder = f"{now.year}/{now.month:02d}"
|
|
file_name = f"answer_{answer_id}_{uuid.uuid4().hex}.{file_extension}"
|
|
s3_key = f"{folder}/{file_name}"
|
|
s3_client.upload_fileobj(file.file, S3_IMAGE_BUCKET, s3_key, ExtraArgs={"ContentType": file.content_type})
|
|
# Generar URL pública (ajusta si usas presigned)
|
|
image_url = f"{S3_ENDPOINT}/{S3_IMAGE_BUCKET}/{s3_key}"
|
|
# Crear registro en BD
|
|
media_file = models.MediaFile(
|
|
answer_id=answer_id,
|
|
file_path=image_url,
|
|
file_type="image"
|
|
)
|
|
db.add(media_file)
|
|
db.commit()
|
|
db.refresh(media_file)
|
|
return media_file
|
|
|
|
|
|
# ============= AI ANALYSIS =============
|
|
@app.get("/api/ai/models", response_model=List[schemas.AIModelInfo])
|
|
def get_available_ai_models(current_user: models.User = Depends(get_current_user)):
|
|
"""Obtener lista de modelos de IA disponibles"""
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="Solo administradores pueden ver modelos de IA")
|
|
|
|
models_list = [
|
|
# OpenAI Models
|
|
{
|
|
"id": "gpt-4o",
|
|
"name": "GPT-4o (Recomendado)",
|
|
"provider": "openai",
|
|
"description": "Modelo multimodal más avanzado de OpenAI, rápido y preciso para análisis de imágenes"
|
|
},
|
|
{
|
|
"id": "gpt-4o-mini",
|
|
"name": "GPT-4o Mini",
|
|
"provider": "openai",
|
|
"description": "Versión compacta y económica de GPT-4o, ideal para análisis rápidos"
|
|
},
|
|
{
|
|
"id": "gpt-4-turbo",
|
|
"name": "GPT-4 Turbo",
|
|
"provider": "openai",
|
|
"description": "Modelo potente con capacidades de visión y contexto amplio"
|
|
},
|
|
{
|
|
"id": "gpt-4-vision-preview",
|
|
"name": "GPT-4 Vision (Preview)",
|
|
"provider": "openai",
|
|
"description": "Modelo específico para análisis de imágenes (versión previa)"
|
|
},
|
|
# Gemini Models - Actualizados a versiones 2.0, 2.5 y 3.0
|
|
{
|
|
"id": "gemini-3-pro-preview",
|
|
"name": "Gemini 3 Pro Preview (Último)",
|
|
"provider": "gemini",
|
|
"description": "Modelo de próxima generación en preview, máxima capacidad de análisis"
|
|
},
|
|
{
|
|
"id": "gemini-2.5-pro",
|
|
"name": "Gemini 2.5 Pro (Recomendado)",
|
|
"provider": "gemini",
|
|
"description": "Último modelo estable con excelente análisis visual y razonamiento avanzado"
|
|
},
|
|
{
|
|
"id": "gemini-2.5-flash",
|
|
"name": "Gemini 2.5 Flash",
|
|
"provider": "gemini",
|
|
"description": "Versión rápida del 2.5, ideal para inspecciones en tiempo real"
|
|
},
|
|
{
|
|
"id": "gemini-2.0-flash",
|
|
"name": "Gemini 2.0 Flash",
|
|
"provider": "gemini",
|
|
"description": "Modelo rápido y eficiente de la generación 2.0"
|
|
},
|
|
{
|
|
"id": "gemini-1.5-pro-latest",
|
|
"name": "Gemini 1.5 Pro Latest",
|
|
"provider": "gemini",
|
|
"description": "Versión estable 1.5 con contexto de 2M tokens"
|
|
},
|
|
{
|
|
"id": "gemini-1.5-flash-latest",
|
|
"name": "Gemini 1.5 Flash Latest",
|
|
"provider": "gemini",
|
|
"description": "Modelo 1.5 rápido para análisis básicos"
|
|
}
|
|
]
|
|
|
|
return models_list
|
|
|
|
|
|
@app.get("/api/ai/configuration", response_model=schemas.AIConfiguration)
|
|
def get_ai_configuration(
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
"""Obtener configuración de IA actual"""
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="Solo administradores pueden ver configuración de IA")
|
|
|
|
config = db.query(models.AIConfiguration).filter(
|
|
models.AIConfiguration.is_active == True
|
|
).first()
|
|
|
|
if not config:
|
|
raise HTTPException(status_code=404, detail="No hay configuración de IA activa")
|
|
|
|
return config
|
|
|
|
|
|
@app.post("/api/ai/configuration", response_model=schemas.AIConfiguration)
|
|
def create_ai_configuration(
|
|
config: schemas.AIConfigurationCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
"""Crear o actualizar configuración de IA"""
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="Solo administradores pueden configurar IA")
|
|
|
|
# Desactivar configuraciones anteriores
|
|
db.query(models.AIConfiguration).update({"is_active": False})
|
|
|
|
# Crear nueva configuración
|
|
new_config = models.AIConfiguration(
|
|
provider=config.provider,
|
|
api_key=config.api_key,
|
|
model_name=config.model_name,
|
|
is_active=True
|
|
)
|
|
|
|
db.add(new_config)
|
|
db.commit()
|
|
db.refresh(new_config)
|
|
|
|
return new_config
|
|
|
|
|
|
@app.put("/api/ai/configuration/{config_id}", response_model=schemas.AIConfiguration)
|
|
def update_ai_configuration(
|
|
config_id: int,
|
|
config_update: schemas.AIConfigurationUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
"""Actualizar configuración de IA existente"""
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="Solo administradores pueden actualizar configuración de IA")
|
|
|
|
config = db.query(models.AIConfiguration).filter(
|
|
models.AIConfiguration.id == config_id
|
|
).first()
|
|
|
|
if not config:
|
|
raise HTTPException(status_code=404, detail="Configuración no encontrada")
|
|
|
|
# Actualizar campos
|
|
for key, value in config_update.dict(exclude_unset=True).items():
|
|
setattr(config, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(config)
|
|
|
|
return config
|
|
|
|
|
|
@app.delete("/api/ai/configuration/{config_id}")
|
|
def delete_ai_configuration(
|
|
config_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
"""Eliminar configuración de IA"""
|
|
if current_user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="Solo administradores pueden eliminar configuración de IA")
|
|
|
|
config = db.query(models.AIConfiguration).filter(
|
|
models.AIConfiguration.id == config_id
|
|
).first()
|
|
|
|
if not config:
|
|
raise HTTPException(status_code=404, detail="Configuración no encontrada")
|
|
|
|
db.delete(config)
|
|
db.commit()
|
|
|
|
return {"message": "Configuración eliminada correctamente"}
|
|
|
|
|
|
@app.post("/api/analyze-image")
|
|
async def analyze_image(
|
|
file: UploadFile = File(...),
|
|
question_id: int = None,
|
|
custom_prompt: str = None,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)
|
|
):
|
|
"""
|
|
Analiza una imagen usando IA para sugerir respuestas
|
|
Usa la configuración de IA activa (OpenAI o Gemini)
|
|
"""
|
|
# Obtener configuración de IA activa
|
|
ai_config = db.query(models.AIConfiguration).filter(
|
|
models.AIConfiguration.is_active == True
|
|
).first()
|
|
|
|
if not ai_config:
|
|
return {
|
|
"status": "disabled",
|
|
"message": "No hay configuración de IA activa. Configure en Settings."
|
|
}
|
|
|
|
# Guardar imagen temporalmente
|
|
import base64
|
|
|
|
contents = await file.read()
|
|
image_b64 = base64.b64encode(contents).decode('utf-8')
|
|
|
|
# Obtener contexto de la pregunta si se proporciona
|
|
question_obj = None
|
|
if question_id:
|
|
question_obj = db.query(models.Question).filter(models.Question.id == question_id).first()
|
|
|
|
try:
|
|
# Construir prompt dinámico basado en la pregunta específica
|
|
if question_obj:
|
|
# Usar prompt personalizado si está disponible
|
|
if custom_prompt:
|
|
# Prompt 100% personalizado por el administrador
|
|
system_prompt = f"""Eres un mecánico experto realizando una inspección vehicular.
|
|
|
|
INSTRUCCIONES ESPECÍFICAS PARA ESTA PREGUNTA:
|
|
{custom_prompt}
|
|
|
|
PREGUNTA A RESPONDER: "{question_obj.text}"
|
|
Sección: {question_obj.section}
|
|
|
|
Analiza la imagen siguiendo EXACTAMENTE las instrucciones proporcionadas arriba.
|
|
|
|
VALIDACIÓN DE IMAGEN:
|
|
- Si la imagen NO corresponde al contexto de la pregunta (por ejemplo, si piden luces pero muestran motor), indica en "recommendation" que deben cambiar la foto
|
|
- Si la imagen es borrosa, oscura o no permite análisis, indica en "recommendation" que tomen otra foto más clara
|
|
|
|
Responde en formato JSON:
|
|
{{
|
|
"status": "ok|minor|critical",
|
|
"observations": "Análisis específico según el prompt personalizado",
|
|
"recommendation": "Si la imagen no es apropiada, indica 'Por favor tome una foto de [componente correcto]'. Si es apropiada, da recomendación técnica.",
|
|
"confidence": 0.0-1.0
|
|
}}"""
|
|
else:
|
|
# Prompt altamente específico para la pregunta
|
|
question_text = question_obj.text
|
|
question_type = question_obj.type
|
|
section = question_obj.section
|
|
|
|
system_prompt = f"""Eres un mecánico experto realizando una inspección vehicular.
|
|
|
|
PREGUNTA ESPECÍFICA A RESPONDER: "{question_text}"
|
|
Sección: {section}
|
|
|
|
Analiza la imagen ÚNICAMENTE para responder esta pregunta específica.
|
|
Sé directo y enfócate solo en lo que la pregunta solicita.
|
|
|
|
VALIDACIÓN DE IMAGEN:
|
|
- Si la imagen NO corresponde al contexto de la pregunta, indica en "recommendation" que deben cambiar la foto
|
|
- Si la imagen es borrosa o no permite análisis, indica en "recommendation" que tomen otra foto más clara
|
|
|
|
Responde en formato JSON:
|
|
{{
|
|
"status": "ok|minor|critical",
|
|
"observations": "Respuesta específica a: {question_text}",
|
|
"recommendation": "Si la imagen no es apropiada, indica 'Por favor tome una foto de [componente correcto]'. Si es apropiada, da acción técnica si aplica.",
|
|
"confidence": 0.0-1.0
|
|
}}
|
|
|
|
IMPORTANTE:
|
|
- Responde SOLO lo que la pregunta pide
|
|
- No des información genérica del vehículo
|
|
- Sé específico y técnico
|
|
- Si la pregunta es pass/fail, indica claramente si pasa o falla
|
|
- Si la pregunta es bueno/regular/malo, indica el estado específico del componente"""
|
|
|
|
user_message = f"Inspecciona la imagen y responde específicamente: {question_obj.text}"
|
|
else:
|
|
# Fallback para análisis general
|
|
system_prompt = """Eres un experto mecánico automotriz. Analiza la imagen y proporciona:
|
|
1. Estado del componente (bueno/regular/malo)
|
|
2. Nivel de criticidad (ok/minor/critical)
|
|
3. Observaciones técnicas breves
|
|
4. Recomendación de acción
|
|
|
|
Responde en formato JSON:
|
|
{
|
|
"status": "ok|minor|critical",
|
|
"observations": "descripción técnica",
|
|
"recommendation": "acción sugerida",
|
|
"confidence": 0.0-1.0
|
|
}"""
|
|
user_message = "Analiza este componente del vehículo para la inspección general."
|
|
|
|
if ai_config.provider == "openai":
|
|
import openai
|
|
openai.api_key = ai_config.api_key
|
|
|
|
response = openai.ChatCompletion.create(
|
|
model=ai_config.model_name,
|
|
messages=[
|
|
{"role": "system", "content": system_prompt},
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": user_message
|
|
},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}
|
|
}
|
|
]
|
|
}
|
|
],
|
|
max_tokens=500
|
|
)
|
|
|
|
ai_response = response.choices[0].message.content
|
|
|
|
elif ai_config.provider == "gemini":
|
|
import google.generativeai as genai
|
|
from PIL import Image
|
|
from io import BytesIO
|
|
|
|
genai.configure(api_key=ai_config.api_key)
|
|
model = genai.GenerativeModel(ai_config.model_name)
|
|
|
|
# Convertir base64 a imagen PIL
|
|
image = Image.open(BytesIO(contents))
|
|
|
|
prompt = f"{system_prompt}\n\n{user_message}"
|
|
response = model.generate_content([prompt, image])
|
|
ai_response = response.text
|
|
|
|
else:
|
|
return {
|
|
"success": False,
|
|
"error": f"Provider {ai_config.provider} no soportado"
|
|
}
|
|
|
|
# Intentar parsear como JSON, si falla, usar texto plano
|
|
try:
|
|
import json
|
|
import re
|
|
|
|
# Limpiar markdown code blocks si existen
|
|
cleaned_response = ai_response.strip()
|
|
|
|
# Remover ```json ... ``` si existe
|
|
if cleaned_response.startswith('```'):
|
|
# Extraer contenido entre ``` markers
|
|
match = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', cleaned_response, re.DOTALL)
|
|
if match:
|
|
cleaned_response = match.group(1).strip()
|
|
|
|
analysis = json.loads(cleaned_response)
|
|
except:
|
|
# Si no es JSON válido, crear estructura básica
|
|
analysis = {
|
|
"status": "ok",
|
|
"observations": ai_response,
|
|
"recommendation": "Revisar manualmente",
|
|
"confidence": 0.7
|
|
}
|
|
|
|
return {
|
|
"success": True,
|
|
"analysis": analysis,
|
|
"raw_response": ai_response,
|
|
"model": ai_config.model_name,
|
|
"provider": ai_config.provider
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error en análisis AI: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"message": "Error analyzing image with AI. Please check AI configuration in Settings."
|
|
}
|
|
|
|
try:
|
|
import openai
|
|
openai.api_key = settings.OPENAI_API_KEY
|
|
|
|
# Prompt especializado para inspección vehicular
|
|
system_prompt = """Eres un experto mecánico automotriz. Analiza la imagen y proporciona:
|
|
1. Estado del componente (bueno/regular/malo)
|
|
2. Nivel de criticidad (ok/minor/critical)
|
|
3. Observaciones técnicas breves
|
|
4. Recomendación de acción
|
|
|
|
Responde en formato JSON:
|
|
{
|
|
"status": "ok|minor|critical",
|
|
"observations": "descripción técnica",
|
|
"recommendation": "acción sugerida",
|
|
"confidence": 0.0-1.0
|
|
}"""
|
|
|
|
response = openai.ChatCompletion.create(
|
|
model="gpt-4-vision-preview" if "gpt-4" in str(settings.OPENAI_API_KEY) else "gpt-4o",
|
|
messages=[
|
|
{
|
|
"role": "system",
|
|
"content": system_prompt
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": f"Analiza este componente del vehículo.\n{question_context}"
|
|
},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{image_b64}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
],
|
|
max_tokens=500
|
|
)
|
|
|
|
ai_response = response.choices[0].message.content
|
|
|
|
# Intentar parsear como JSON, si falla, usar texto plano
|
|
try:
|
|
import json
|
|
analysis = json.loads(ai_response)
|
|
except:
|
|
# Si no es JSON válido, crear estructura básica
|
|
analysis = {
|
|
"status": "ok",
|
|
"observations": ai_response,
|
|
"recommendation": "Revisar manualmente",
|
|
"confidence": 0.7
|
|
}
|
|
|
|
return {
|
|
"success": True,
|
|
"analysis": analysis,
|
|
"raw_response": ai_response,
|
|
"model": "gpt-4-vision"
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error en análisis AI: {e}")
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"message": "Error analyzing image with AI"
|
|
}
|
|
|
|
|
|
# ============= REPORTS =============
|
|
@app.get("/api/reports/dashboard", response_model=schemas.DashboardData)
|
|
def get_dashboard_data(
|
|
start_date: Optional[str] = None,
|
|
end_date: Optional[str] = None,
|
|
mechanic_id: Optional[int] = None,
|
|
current_user: models.User = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Obtener datos del dashboard de informes"""
|
|
if current_user.role not in ["admin", "asesor"]:
|
|
raise HTTPException(status_code=403, detail="No tienes permisos para acceder a reportes")
|
|
|
|
# Construir query base
|
|
query = db.query(models.Inspection)
|
|
|
|
# Aplicar filtros de fecha
|
|
if start_date:
|
|
# Parsear fecha y establecer al inicio del día en UTC-3
|
|
from datetime import timezone
|
|
local_tz = timezone(timedelta(hours=-3))
|
|
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
if start.tzinfo is None:
|
|
start = start.replace(tzinfo=local_tz)
|
|
query = query.filter(models.Inspection.started_at >= start)
|
|
if end_date:
|
|
# Parsear fecha y establecer al final del día en UTC-3
|
|
from datetime import timezone
|
|
local_tz = timezone(timedelta(hours=-3))
|
|
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
|
|
if end.tzinfo is None:
|
|
end = end.replace(tzinfo=local_tz)
|
|
query = query.filter(models.Inspection.started_at <= end)
|
|
|
|
# Filtro por mecánico
|
|
if mechanic_id:
|
|
query = query.filter(models.Inspection.mechanic_id == mechanic_id)
|
|
|
|
# Solo inspecciones activas
|
|
query = query.filter(models.Inspection.is_active == True)
|
|
|
|
# ESTADÍSTICAS GENERALES
|
|
total = query.count()
|
|
completed = query.filter(models.Inspection.status == "completed").count()
|
|
pending = total - completed
|
|
|
|
# Score promedio
|
|
avg_score_result = query.filter(
|
|
models.Inspection.score.isnot(None),
|
|
models.Inspection.max_score.isnot(None),
|
|
models.Inspection.max_score > 0
|
|
).with_entities(
|
|
func.avg(models.Inspection.score * 100.0 / models.Inspection.max_score)
|
|
).scalar()
|
|
avg_score = round(avg_score_result, 2) if avg_score_result else 0.0
|
|
|
|
# Items señalados
|
|
flagged_items = db.query(func.count(models.Answer.id))\
|
|
.filter(models.Answer.is_flagged == True)\
|
|
.join(models.Inspection)\
|
|
.filter(models.Inspection.is_active == True)
|
|
|
|
if start_date:
|
|
from datetime import timezone
|
|
local_tz = timezone(timedelta(hours=-3))
|
|
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
if start.tzinfo is None:
|
|
start = start.replace(tzinfo=local_tz)
|
|
flagged_items = flagged_items.filter(models.Inspection.started_at >= start)
|
|
if end_date:
|
|
from datetime import timezone
|
|
local_tz = timezone(timedelta(hours=-3))
|
|
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
|
|
if end.tzinfo is None:
|
|
end = end.replace(tzinfo=local_tz)
|
|
flagged_items = flagged_items.filter(models.Inspection.started_at <= end)
|
|
if mechanic_id:
|
|
flagged_items = flagged_items.filter(models.Inspection.mechanic_id == mechanic_id)
|
|
|
|
total_flagged = flagged_items.scalar() or 0
|
|
|
|
stats = schemas.InspectionStats(
|
|
total_inspections=total,
|
|
completed_inspections=completed,
|
|
pending_inspections=pending,
|
|
completion_rate=round((completed / total * 100) if total > 0 else 0, 2),
|
|
avg_score=avg_score,
|
|
total_flagged_items=total_flagged
|
|
)
|
|
|
|
# RANKING DE MECÁNICOS
|
|
mechanic_stats = db.query(
|
|
models.User.id,
|
|
models.User.full_name,
|
|
func.count(models.Inspection.id).label('total'),
|
|
func.avg(
|
|
case(
|
|
(models.Inspection.max_score > 0, models.Inspection.score * 100.0 / models.Inspection.max_score),
|
|
else_=None
|
|
)
|
|
).label('avg_score'),
|
|
func.count(case((models.Inspection.status == 'completed', 1))).label('completed')
|
|
).join(models.Inspection, models.Inspection.mechanic_id == models.User.id)\
|
|
.filter(models.User.role.in_(['mechanic', 'mecanico']))\
|
|
.filter(models.User.is_active == True)\
|
|
.filter(models.Inspection.is_active == True)
|
|
|
|
if start_date:
|
|
from datetime import timezone
|
|
local_tz = timezone(timedelta(hours=-3))
|
|
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
if start.tzinfo is None:
|
|
start = start.replace(tzinfo=local_tz)
|
|
mechanic_stats = mechanic_stats.filter(models.Inspection.started_at >= start)
|
|
if end_date:
|
|
from datetime import timezone
|
|
local_tz = timezone(timedelta(hours=-3))
|
|
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
|
|
if end.tzinfo is None:
|
|
end = end.replace(tzinfo=local_tz)
|
|
mechanic_stats = mechanic_stats.filter(models.Inspection.started_at <= end)
|
|
|
|
mechanic_stats = mechanic_stats.group_by(models.User.id, models.User.full_name)\
|
|
.order_by(func.count(models.Inspection.id).desc())\
|
|
.all()
|
|
|
|
mechanic_ranking = [
|
|
schemas.MechanicRanking(
|
|
mechanic_id=m.id,
|
|
mechanic_name=m.full_name,
|
|
total_inspections=m.total,
|
|
avg_score=round(m.avg_score, 2) if m.avg_score else 0.0,
|
|
completion_rate=round((m.completed / m.total * 100) if m.total > 0 else 0, 2)
|
|
)
|
|
for m in mechanic_stats
|
|
]
|
|
|
|
# ESTADÍSTICAS POR CHECKLIST
|
|
checklist_stats_query = db.query(
|
|
models.Checklist.id,
|
|
models.Checklist.name,
|
|
func.count(models.Inspection.id).label('total'),
|
|
func.avg(
|
|
case(
|
|
(models.Inspection.max_score > 0, models.Inspection.score * 100.0 / models.Inspection.max_score),
|
|
else_=None
|
|
)
|
|
).label('avg_score')
|
|
).join(models.Inspection)\
|
|
.filter(models.Inspection.is_active == True)\
|
|
.filter(models.Checklist.is_active == True)
|
|
|
|
if start_date:
|
|
from datetime import timezone
|
|
local_tz = timezone(timedelta(hours=-3))
|
|
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
if start.tzinfo is None:
|
|
start = start.replace(tzinfo=local_tz)
|
|
checklist_stats_query = checklist_stats_query.filter(models.Inspection.started_at >= start)
|
|
if end_date:
|
|
from datetime import timezone
|
|
local_tz = timezone(timedelta(hours=-3))
|
|
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
|
|
if end.tzinfo is None:
|
|
end = end.replace(tzinfo=local_tz)
|
|
checklist_stats_query = checklist_stats_query.filter(models.Inspection.started_at <= end)
|
|
if mechanic_id:
|
|
checklist_stats_query = checklist_stats_query.filter(models.Inspection.mechanic_id == mechanic_id)
|
|
|
|
checklist_stats_query = checklist_stats_query.group_by(models.Checklist.id, models.Checklist.name)
|
|
checklist_stats_data = checklist_stats_query.all()
|
|
|
|
checklist_stats = [
|
|
schemas.ChecklistStats(
|
|
checklist_id=c.id,
|
|
checklist_name=c.name,
|
|
total_inspections=c.total,
|
|
avg_score=round(c.avg_score, 2) if c.avg_score else 0.0
|
|
)
|
|
for c in checklist_stats_data
|
|
]
|
|
|
|
# INSPECCIONES POR FECHA (últimos 30 días)
|
|
end_date_obj = datetime.fromisoformat(end_date) if end_date else datetime.now()
|
|
start_date_obj = datetime.fromisoformat(start_date) if start_date else end_date_obj - timedelta(days=30)
|
|
|
|
inspections_by_date_query = db.query(
|
|
func.date(models.Inspection.started_at).label('date'),
|
|
func.count(models.Inspection.id).label('count')
|
|
).filter(
|
|
models.Inspection.started_at.between(start_date_obj, end_date_obj),
|
|
models.Inspection.is_active == True
|
|
)
|
|
|
|
if mechanic_id:
|
|
inspections_by_date_query = inspections_by_date_query.filter(
|
|
models.Inspection.mechanic_id == mechanic_id
|
|
)
|
|
|
|
inspections_by_date_data = inspections_by_date_query.group_by(
|
|
func.date(models.Inspection.started_at)
|
|
).all()
|
|
|
|
inspections_by_date = {
|
|
str(d.date): d.count for d in inspections_by_date_data
|
|
}
|
|
|
|
# RATIO PASS/FAIL
|
|
pass_fail_data = db.query(
|
|
models.Answer.answer_value,
|
|
func.count(models.Answer.id).label('count')
|
|
).join(models.Inspection)\
|
|
.filter(models.Inspection.is_active == True)\
|
|
.filter(models.Answer.answer_value.in_(['pass', 'fail', 'good', 'bad', 'regular']))
|
|
|
|
if start_date:
|
|
from datetime import timezone
|
|
local_tz = timezone(timedelta(hours=-3))
|
|
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
if start.tzinfo is None:
|
|
start = start.replace(tzinfo=local_tz)
|
|
pass_fail_data = pass_fail_data.filter(models.Inspection.started_at >= start)
|
|
if end_date:
|
|
from datetime import timezone
|
|
local_tz = timezone(timedelta(hours=-3))
|
|
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
|
|
if end.tzinfo is None:
|
|
end = end.replace(tzinfo=local_tz)
|
|
pass_fail_data = pass_fail_data.filter(models.Inspection.started_at <= end)
|
|
if mechanic_id:
|
|
pass_fail_data = pass_fail_data.filter(models.Inspection.mechanic_id == mechanic_id)
|
|
|
|
pass_fail_data = pass_fail_data.group_by(models.Answer.answer_value).all()
|
|
|
|
pass_fail_ratio = {d.answer_value: d.count for d in pass_fail_data}
|
|
|
|
return schemas.DashboardData(
|
|
stats=stats,
|
|
mechanic_ranking=mechanic_ranking,
|
|
checklist_stats=checklist_stats,
|
|
inspections_by_date=inspections_by_date,
|
|
pass_fail_ratio=pass_fail_ratio
|
|
)
|
|
|
|
|
|
@app.get("/api/reports/inspections")
|
|
def get_inspections_report(
|
|
start_date: Optional[str] = None,
|
|
end_date: Optional[str] = None,
|
|
mechanic_id: Optional[int] = None,
|
|
checklist_id: Optional[int] = None,
|
|
status: Optional[str] = None,
|
|
limit: int = 100,
|
|
current_user: models.User = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Obtener lista de inspecciones con filtros"""
|
|
if current_user.role not in ["admin", "asesor"]:
|
|
raise HTTPException(status_code=403, detail="No tienes permisos para acceder a reportes")
|
|
|
|
# Query base con select_from explícito
|
|
query = db.query(
|
|
models.Inspection.id,
|
|
models.Inspection.vehicle_plate,
|
|
models.Inspection.checklist_id,
|
|
models.Checklist.name.label('checklist_name'),
|
|
models.User.full_name.label('mechanic_name'),
|
|
models.Inspection.status,
|
|
models.Inspection.score,
|
|
models.Inspection.max_score,
|
|
models.Inspection.started_at,
|
|
models.Inspection.completed_at,
|
|
func.coalesce(
|
|
func.count(case((models.Answer.is_flagged == True, 1))),
|
|
0
|
|
).label('flagged_items')
|
|
).select_from(models.Inspection)\
|
|
.join(models.Checklist, models.Inspection.checklist_id == models.Checklist.id)\
|
|
.join(models.User, models.Inspection.mechanic_id == models.User.id)\
|
|
.outerjoin(models.Answer, models.Answer.inspection_id == models.Inspection.id)\
|
|
.filter(models.Inspection.is_active == True)
|
|
|
|
# Aplicar filtros
|
|
if start_date:
|
|
from datetime import timezone
|
|
local_tz = timezone(timedelta(hours=-3))
|
|
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
if start.tzinfo is None:
|
|
start = start.replace(tzinfo=local_tz)
|
|
query = query.filter(models.Inspection.started_at >= start)
|
|
if end_date:
|
|
from datetime import timezone
|
|
local_tz = timezone(timedelta(hours=-3))
|
|
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
|
|
if end.tzinfo is None:
|
|
end = end.replace(tzinfo=local_tz)
|
|
query = query.filter(models.Inspection.started_at <= end)
|
|
if mechanic_id:
|
|
query = query.filter(models.Inspection.mechanic_id == mechanic_id)
|
|
if checklist_id:
|
|
query = query.filter(models.Inspection.checklist_id == checklist_id)
|
|
if status:
|
|
query = query.filter(models.Inspection.status == status)
|
|
|
|
# Group by y order
|
|
query = query.group_by(
|
|
models.Inspection.id,
|
|
models.Checklist.name,
|
|
models.User.full_name
|
|
).order_by(models.Inspection.started_at.desc())\
|
|
.limit(limit)
|
|
|
|
results = query.all()
|
|
|
|
return [
|
|
{
|
|
"id": r.id,
|
|
"vehicle_plate": r.vehicle_plate,
|
|
"checklist_id": r.checklist_id,
|
|
"checklist_name": r.checklist_name,
|
|
"mechanic_name": r.mechanic_name,
|
|
"status": r.status,
|
|
"score": r.score,
|
|
"max_score": r.max_score,
|
|
"flagged_items": r.flagged_items,
|
|
"started_at": r.started_at.isoformat() if r.started_at else None,
|
|
"completed_at": r.completed_at.isoformat() if r.completed_at else None
|
|
}
|
|
for r in results
|
|
]
|
|
|
|
|
|
@app.get("/api/inspections/{inspection_id}/pdf")
|
|
def export_inspection_to_pdf(
|
|
inspection_id: int,
|
|
current_user: models.User = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Exportar inspección a PDF con imágenes"""
|
|
from fastapi.responses import StreamingResponse
|
|
from reportlab.lib.pagesizes import letter, A4
|
|
from reportlab.lib import colors
|
|
from reportlab.lib.units import inch
|
|
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image as RLImage, PageBreak
|
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
|
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_RIGHT
|
|
from io import BytesIO
|
|
import base64
|
|
|
|
# Obtener inspección
|
|
inspection = db.query(models.Inspection).filter(
|
|
models.Inspection.id == inspection_id
|
|
).first()
|
|
|
|
if not inspection:
|
|
raise HTTPException(status_code=404, detail="Inspección no encontrada")
|
|
|
|
# Verificar permisos (admin, asesor o mecánico dueño)
|
|
if current_user.role not in ["admin", "asesor"] and inspection.mechanic_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail="No tienes permisos para ver esta inspección")
|
|
|
|
# Obtener datos relacionados
|
|
checklist = db.query(models.Checklist).filter(models.Checklist.id == inspection.checklist_id).first()
|
|
mechanic = db.query(models.User).filter(models.User.id == inspection.mechanic_id).first()
|
|
answers = db.query(models.Answer).options(
|
|
joinedload(models.Answer.media_files)
|
|
).join(models.Question).filter(
|
|
models.Answer.inspection_id == inspection_id
|
|
).order_by(models.Question.section, models.Question.order).all()
|
|
|
|
# Crear PDF en memoria
|
|
buffer = BytesIO()
|
|
doc = SimpleDocTemplate(buffer, pagesize=A4, rightMargin=30, leftMargin=30, topMargin=30, bottomMargin=30)
|
|
elements = []
|
|
styles = getSampleStyleSheet()
|
|
# ...existing code for PDF generation...
|
|
doc.build(elements)
|
|
buffer.seek(0)
|
|
now = datetime.now()
|
|
folder = f"{now.year}/{now.month:02d}"
|
|
filename = f"inspeccion_{inspection_id}_{inspection.vehicle_plate or 'sin-patente'}.pdf"
|
|
s3_key = f"{folder}/{filename}"
|
|
# Subir PDF a S3/MinIO
|
|
s3_client.upload_fileobj(buffer, S3_PDF_BUCKET, s3_key, ExtraArgs={"ContentType": "application/pdf"})
|
|
pdf_url = f"{S3_ENDPOINT}/{S3_PDF_BUCKET}/{s3_key}"
|
|
# Guardar pdf_url en la inspección
|
|
inspection.pdf_url = pdf_url
|
|
db.commit()
|
|
return {"pdf_url": pdf_url}
|
|
|
|
|
|
# ============= HEALTH CHECK =============
|
|
@app.get("/")
|
|
def root():
|
|
return {"message": "Checklist Inteligente API", "version": "1.0.0", "status": "running"}
|
|
|
|
|
|
@app.get("/health")
|
|
def health_check():
|
|
return {"status": "healthy"}
|