Files
checklist/backend/app/main.py
2025-11-25 08:43:54 -03:00

1938 lines
69 KiB
Python

# ============= LOGO CONFIGURABLE =============
from fastapi import FastAPI, File, UploadFile, Form, Depends, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import func, case
from typing import List, Optional
import os
import boto3
from botocore.client import Config
import uuid
from app.core import config as app_config
from app.core.database import engine, get_db, Base
from app.core.security import verify_password, get_password_hash, create_access_token, decode_access_token
from app import models, schemas
import shutil
from datetime import datetime, timedelta
import sys
import requests
# Función para enviar notificaciones al webhook
def send_answer_notification(answer, question, mechanic, db):
"""Envía notificación al webhook cuando se responde una pregunta marcada"""
try:
if not app_config.settings.NOTIFICACION_ENDPOINT:
print("No hay endpoint de notificación configurado")
return
# Obtener datos de la inspección
inspection = db.query(models.Inspection).filter(
models.Inspection.id == answer.inspection_id
).first()
if not inspection:
return
# Preparar datos para enviar
notification_data = {
"tipo": "respuesta_pregunta",
"pregunta": {
"id": question.id,
"texto": question.text,
"seccion": question.section
},
"respuesta": {
"id": answer.id,
"valor": answer.answer_value,
"estado": answer.status,
"comentario": answer.comment,
"puntos": answer.points_earned
},
"inspeccion": {
"id": inspection.id,
"vehiculo_placa": inspection.vehicle_plate,
"vehiculo_marca": inspection.vehicle_brand,
"vehiculo_modelo": inspection.vehicle_model,
"cliente": inspection.client_name,
"or_number": inspection.or_number
},
"mecanico": {
"id": mechanic.id,
"nombre": mechanic.full_name,
"email": mechanic.email
},
"timestamp": datetime.utcnow().isoformat()
}
# Enviar al webhook
response = requests.post(
app_config.settings.NOTIFICACION_ENDPOINT,
json=notification_data,
timeout=5
)
if response.status_code == 200:
print(f"✅ Notificación enviada para pregunta {question.id}")
else:
print(f"⚠️ Error al enviar notificación: {response.status_code}")
except Exception as e:
print(f"❌ Error enviando notificación: {e}")
# No lanzamos excepción para no interrumpir el flujo normal
BACKEND_VERSION = "1.0.25"
app = FastAPI(title="Checklist Inteligente API", version=BACKEND_VERSION)
# S3/MinIO configuration
S3_ENDPOINT = app_config.MINIO_ENDPOINT
S3_ACCESS_KEY = app_config.MINIO_ACCESS_KEY
S3_SECRET_KEY = app_config.MINIO_SECRET_KEY
S3_IMAGE_BUCKET = app_config.MINIO_IMAGE_BUCKET
S3_PDF_BUCKET = app_config.MINIO_PDF_BUCKET
s3_client = boto3.client(
's3',
endpoint_url=S3_ENDPOINT,
aws_access_key_id=S3_ACCESS_KEY,
aws_secret_access_key=S3_SECRET_KEY,
config=Config(signature_version='s3v4'),
region_name='us-east-1'
)
# Crear tablas
Base.metadata.create_all(bind=engine)
# Información visual al iniciar el backend
print("\n================ BACKEND STARTUP INFO ================")
print(f"Backend version: {BACKEND_VERSION}")
print(f"Database URL: {app_config.settings.DATABASE_URL}")
print(f"Environment: {app_config.settings.ENVIRONMENT}")
print(f"MinIO endpoint: {app_config.MINIO_ENDPOINT}")
print("====================================================\n", flush=True)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
security = HTTPBearer()
# Dependency para obtener usuario actual
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
token = credentials.credentials
# Verificar si es un API token (comienza con "syntria_")
if token.startswith("syntria_"):
api_token = db.query(models.APIToken).filter(
models.APIToken.token == token,
models.APIToken.is_active == True
).first()
if not api_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API Token inválido o inactivo"
)
# Actualizar último uso
api_token.last_used_at = datetime.utcnow()
db.commit()
# Obtener usuario
user = db.query(models.User).filter(models.User.id == api_token.user_id).first()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Usuario inválido o inactivo"
)
return user
# Si no es API token, es JWT token
payload = decode_access_token(token)
print(f"Token payload: {payload}") # Debug
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido o expirado"
)
user_id = int(payload.get("sub"))
print(f"Looking for user ID: {user_id}") # Debug
user = db.query(models.User).filter(models.User.id == user_id).first()
if user is None:
print(f"User not found with ID: {user_id}") # Debug
raise HTTPException(status_code=404, detail="Usuario no encontrado")
return user
@app.post("/api/config/logo", response_model=dict)
async def upload_logo(
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Sube un logo y lo guarda en MinIO, actualiza la configuración."""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="Solo administradores pueden cambiar el logo")
# Subir imagen a MinIO
file_extension = file.filename.split(".")[-1]
now = datetime.now()
folder = f"logo"
file_name = f"logo_{now.strftime('%Y%m%d_%H%M%S')}.{file_extension}"
s3_key = f"{folder}/{file_name}"
s3_client.upload_fileobj(file.file, S3_IMAGE_BUCKET, s3_key, ExtraArgs={"ContentType": file.content_type})
logo_url = f"{S3_ENDPOINT}/{S3_IMAGE_BUCKET}/{s3_key}"
# Guardar en configuración (puedes tener una tabla Config o usar AIConfiguration)
config = db.query(models.AIConfiguration).filter(models.AIConfiguration.is_active == True).first()
if config:
config.logo_url = logo_url
db.commit()
db.refresh(config)
# Si no hay config, solo retorna la url
return {"logo_url": logo_url}
@app.get("/api/config/logo", response_model=dict)
def get_logo_url(
db: Session = Depends(get_db)
):
config = db.query(models.AIConfiguration).filter(models.AIConfiguration.is_active == True).first()
if config and getattr(config, "logo_url", None):
return {"logo_url": config.logo_url}
# Default logo (puedes poner una url por defecto)
return {"logo_url": f"{S3_ENDPOINT}/{S3_IMAGE_BUCKET}/logo/default_logo.png"}
# ============= AUTH ENDPOINTS =============
@app.post("/api/auth/register", response_model=schemas.User)
def register(user: schemas.UserCreate, db: Session = Depends(get_db)):
# Verificar si usuario existe
db_user = db.query(models.User).filter(models.User.username == user.username).first()
if db_user:
raise HTTPException(status_code=400, detail="Usuario ya existe")
# Crear usuario
hashed_password = get_password_hash(user.password)
db_user = models.User(
username=user.username,
email=user.email,
full_name=user.full_name,
role=user.role,
password_hash=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.post("/api/auth/login", response_model=schemas.Token)
def login(user_login: schemas.UserLogin, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.username == user_login.username).first()
if not user or not verify_password(user_login.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Usuario o contraseña incorrectos"
)
access_token = create_access_token(data={"sub": str(user.id), "role": user.role})
return {
"access_token": access_token,
"token_type": "bearer",
"user": user
}
@app.get("/api/auth/me", response_model=schemas.User)
def get_me(current_user: models.User = Depends(get_current_user)):
return current_user
# ============= USER ENDPOINTS =============
@app.get("/api/users", response_model=List[schemas.User])
def get_users(
skip: int = 0,
limit: int = 100,
active_only: bool = False,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede ver todos los usuarios
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para ver usuarios")
query = db.query(models.User)
if active_only:
query = query.filter(models.User.is_active == True)
return query.offset(skip).limit(limit).all()
@app.get("/api/users/{user_id}", response_model=schemas.User)
def get_user(
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede ver otros usuarios
if current_user.role != "admin" and current_user.id != user_id:
raise HTTPException(status_code=403, detail="No tienes permisos para ver este usuario")
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
return user
@app.post("/api/users", response_model=schemas.User)
def create_user(
user: schemas.UserCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede crear usuarios
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para crear usuarios")
# Verificar si usuario existe
db_user = db.query(models.User).filter(models.User.username == user.username).first()
if db_user:
raise HTTPException(status_code=400, detail="Usuario ya existe")
# Verificar si email existe
if user.email:
db_email = db.query(models.User).filter(models.User.email == user.email).first()
if db_email:
raise HTTPException(status_code=400, detail="Email ya está en uso")
# Crear usuario
hashed_password = get_password_hash(user.password)
db_user = models.User(
username=user.username,
email=user.email,
full_name=user.full_name,
role=user.role,
password_hash=hashed_password,
is_active=True
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.put("/api/users/{user_id}", response_model=schemas.User)
def update_user(
user_id: int,
user_update: schemas.UserUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede actualizar otros usuarios
if current_user.role != "admin" and current_user.id != user_id:
raise HTTPException(status_code=403, detail="No tienes permisos para actualizar este usuario")
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
# Actualizar campos
if user_update.username is not None:
# Verificar si username está en uso
existing = db.query(models.User).filter(
models.User.username == user_update.username,
models.User.id != user_id
).first()
if existing:
raise HTTPException(status_code=400, detail="Nombre de usuario ya está en uso")
db_user.username = user_update.username
if user_update.email is not None:
# Verificar si email está en uso
existing = db.query(models.User).filter(
models.User.email == user_update.email,
models.User.id != user_id
).first()
if existing:
raise HTTPException(status_code=400, detail="Email ya está en uso")
db_user.email = user_update.email
if user_update.full_name is not None:
db_user.full_name = user_update.full_name
# Solo admin puede cambiar roles
if user_update.role is not None:
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para cambiar roles")
db_user.role = user_update.role
db.commit()
db.refresh(db_user)
return db_user
@app.patch("/api/users/{user_id}/deactivate")
def deactivate_user(
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede inactivar usuarios
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para inactivar usuarios")
# No permitir auto-inactivación
if current_user.id == user_id:
raise HTTPException(status_code=400, detail="No puedes inactivar tu propio usuario")
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
db_user.is_active = False
db.commit()
return {"message": "Usuario inactivado correctamente", "user_id": user_id}
@app.patch("/api/users/{user_id}/activate")
def activate_user(
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede activar usuarios
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para activar usuarios")
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
db_user.is_active = True
db.commit()
return {"message": "Usuario activado correctamente", "user_id": user_id}
@app.patch("/api/users/{user_id}/password")
def change_user_password(
user_id: int,
password_update: schemas.AdminPasswordUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede cambiar contraseñas de otros usuarios
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para cambiar contraseñas")
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
# Cambiar contraseña
db_user.password_hash = get_password_hash(password_update.new_password)
db.commit()
return {"message": "Contraseña actualizada correctamente"}
@app.patch("/api/users/me/password")
def change_my_password(
password_update: schemas.UserPasswordUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Verificar contraseña actual
if not verify_password(password_update.current_password, current_user.password_hash):
raise HTTPException(status_code=400, detail="Contraseña actual incorrecta")
# Cambiar contraseña
current_user.password_hash = get_password_hash(password_update.new_password)
db.commit()
return {"message": "Contraseña actualizada correctamente"}
@app.put("/api/users/me", response_model=schemas.User)
def update_my_profile(
user_update: schemas.UserUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Actualizar email
if user_update.email is not None:
# Verificar si email está en uso
existing = db.query(models.User).filter(
models.User.email == user_update.email,
models.User.id != current_user.id
).first()
if existing:
raise HTTPException(status_code=400, detail="Email ya está en uso")
current_user.email = user_update.email
# Actualizar nombre
if user_update.full_name is not None:
current_user.full_name = user_update.full_name
# No permitir cambio de rol desde perfil
db.commit()
db.refresh(current_user)
return current_user
# ============= API TOKENS ENDPOINTS =============
@app.get("/api/users/me/tokens", response_model=List[schemas.APIToken])
def get_my_tokens(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Obtener todos mis API tokens"""
tokens = db.query(models.APIToken).filter(
models.APIToken.user_id == current_user.id
).all()
return tokens
@app.post("/api/users/me/tokens", response_model=schemas.APITokenWithValue)
def create_my_token(
token_create: schemas.APITokenCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Generar un nuevo API token"""
from app.core.security import generate_api_token
# Generar token único
token_value = generate_api_token()
# Crear registro
api_token = models.APIToken(
user_id=current_user.id,
token=token_value,
description=token_create.description,
is_active=True
)
db.add(api_token)
db.commit()
db.refresh(api_token)
# Retornar con el token completo (solo esta vez)
return schemas.APITokenWithValue(
id=api_token.id,
token=api_token.token,
description=api_token.description,
is_active=api_token.is_active,
last_used_at=api_token.last_used_at,
created_at=api_token.created_at
)
@app.delete("/api/users/me/tokens/{token_id}")
def delete_my_token(
token_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Revocar uno de mis API tokens"""
api_token = db.query(models.APIToken).filter(
models.APIToken.id == token_id,
models.APIToken.user_id == current_user.id
).first()
if not api_token:
raise HTTPException(status_code=404, detail="Token no encontrado")
api_token.is_active = False
db.commit()
return {"message": "Token revocado correctamente", "token_id": token_id}
@app.get("/api/users/{user_id}/tokens", response_model=List[schemas.APIToken])
def get_user_tokens(
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Obtener tokens de un usuario (solo admin)"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos")
tokens = db.query(models.APIToken).filter(
models.APIToken.user_id == user_id
).all()
return tokens
@app.delete("/api/users/{user_id}/tokens/{token_id}")
def delete_user_token(
user_id: int,
token_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Revocar token de un usuario (solo admin)"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos")
api_token = db.query(models.APIToken).filter(
models.APIToken.id == token_id,
models.APIToken.user_id == user_id
).first()
if not api_token:
raise HTTPException(status_code=404, detail="Token no encontrado")
api_token.is_active = False
db.commit()
return {"message": "Token revocado correctamente", "token_id": token_id}
# ============= CHECKLIST ENDPOINTS =============
@app.get("/api/checklists", response_model=List[schemas.Checklist])
def get_checklists(
skip: int = 0,
limit: int = 100,
active_only: bool = False,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
query = db.query(models.Checklist)
if active_only:
query = query.filter(models.Checklist.is_active == True)
return query.offset(skip).limit(limit).all()
@app.get("/api/checklists/{checklist_id}", response_model=schemas.ChecklistWithQuestions)
def get_checklist(checklist_id: int, db: Session = Depends(get_db)):
checklist = db.query(models.Checklist).options(
joinedload(models.Checklist.questions)
).filter(models.Checklist.id == checklist_id).first()
if not checklist:
raise HTTPException(status_code=404, detail="Checklist no encontrado")
return checklist
@app.post("/api/checklists", response_model=schemas.Checklist)
def create_checklist(
checklist: schemas.ChecklistCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No autorizado")
db_checklist = models.Checklist(**checklist.dict(), created_by=current_user.id)
db.add(db_checklist)
db.commit()
db.refresh(db_checklist)
return db_checklist
@app.put("/api/checklists/{checklist_id}", response_model=schemas.Checklist)
def update_checklist(
checklist_id: int,
checklist: schemas.ChecklistUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No autorizado")
db_checklist = db.query(models.Checklist).filter(models.Checklist.id == checklist_id).first()
if not db_checklist:
raise HTTPException(status_code=404, detail="Checklist no encontrado")
for key, value in checklist.dict(exclude_unset=True).items():
setattr(db_checklist, key, value)
db.commit()
db.refresh(db_checklist)
return db_checklist
# ============= QUESTION ENDPOINTS =============
@app.post("/api/questions", response_model=schemas.Question)
def create_question(
question: schemas.QuestionCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No autorizado")
db_question = models.Question(**question.dict())
db.add(db_question)
# Actualizar max_score del checklist
checklist = db.query(models.Checklist).filter(
models.Checklist.id == question.checklist_id
).first()
if checklist:
checklist.max_score += question.points
db.commit()
db.refresh(db_question)
return db_question
@app.put("/api/questions/{question_id}", response_model=schemas.Question)
def update_question(
question_id: int,
question: schemas.QuestionUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No autorizado")
db_question = db.query(models.Question).filter(models.Question.id == question_id).first()
if not db_question:
raise HTTPException(status_code=404, detail="Pregunta no encontrada")
for key, value in question.dict(exclude_unset=True).items():
setattr(db_question, key, value)
db.commit()
db.refresh(db_question)
return db_question
@app.delete("/api/questions/{question_id}")
def delete_question(
question_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No autorizado")
db_question = db.query(models.Question).filter(models.Question.id == question_id).first()
if not db_question:
raise HTTPException(status_code=404, detail="Pregunta no encontrada")
db.delete(db_question)
db.commit()
return {"message": "Pregunta eliminada"}
# ============= INSPECTION ENDPOINTS =============
@app.get("/api/inspections", response_model=List[schemas.Inspection])
def get_inspections(
skip: int = 0,
limit: int = 100,
vehicle_plate: str = None,
status: str = None,
show_inactive: bool = False,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
query = db.query(models.Inspection)
# Por defecto, solo mostrar inspecciones activas
if not show_inactive:
query = query.filter(models.Inspection.is_active == True)
# Mecánicos solo ven sus inspecciones
if current_user.role == "mechanic":
query = query.filter(models.Inspection.mechanic_id == current_user.id)
if vehicle_plate:
query = query.filter(models.Inspection.vehicle_plate.contains(vehicle_plate))
if status:
query = query.filter(models.Inspection.status == status)
return query.order_by(models.Inspection.created_at.desc()).offset(skip).limit(limit).all()
@app.get("/api/inspections/{inspection_id}", response_model=schemas.InspectionDetail)
def get_inspection(
inspection_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
inspection = db.query(models.Inspection).options(
joinedload(models.Inspection.checklist).joinedload(models.Checklist.questions),
joinedload(models.Inspection.mechanic),
joinedload(models.Inspection.answers).joinedload(models.Answer.question),
joinedload(models.Inspection.answers).joinedload(models.Answer.media_files)
).filter(models.Inspection.id == inspection_id).first()
if not inspection:
raise HTTPException(status_code=404, detail="Inspección no encontrada")
return inspection
@app.post("/api/inspections", response_model=schemas.Inspection)
def create_inspection(
inspection: schemas.InspectionCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Obtener max_score del checklist
checklist = db.query(models.Checklist).filter(
models.Checklist.id == inspection.checklist_id
).first()
if not checklist:
raise HTTPException(status_code=404, detail="Checklist no encontrado")
db_inspection = models.Inspection(
**inspection.dict(),
mechanic_id=current_user.id,
max_score=checklist.max_score
)
db.add(db_inspection)
db.commit()
db.refresh(db_inspection)
return db_inspection
@app.put("/api/inspections/{inspection_id}", response_model=schemas.Inspection)
def update_inspection(
inspection_id: int,
inspection: schemas.InspectionUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
db_inspection = db.query(models.Inspection).filter(
models.Inspection.id == inspection_id
).first()
if not db_inspection:
raise HTTPException(status_code=404, detail="Inspección no encontrada")
for key, value in inspection.dict(exclude_unset=True).items():
setattr(db_inspection, key, value)
db.commit()
db.refresh(db_inspection)
return db_inspection
@app.post("/api/inspections/{inspection_id}/complete", response_model=schemas.Inspection)
def complete_inspection(
inspection_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
inspection = db.query(models.Inspection).filter(
models.Inspection.id == inspection_id
).first()
if not inspection:
raise HTTPException(status_code=404, detail="Inspección no encontrada")
# Calcular score
answers = db.query(models.Answer).filter(models.Answer.inspection_id == inspection_id).all()
total_score = sum(a.points_earned for a in answers)
flagged_count = sum(1 for a in answers if a.is_flagged)
inspection.score = total_score
inspection.percentage = (total_score / inspection.max_score * 100) if inspection.max_score > 0 else 0
inspection.flagged_items_count = flagged_count
inspection.status = "completed"
inspection.completed_at = datetime.utcnow()
# Generar PDF con miniaturas de imágenes y subir a MinIO
from reportlab.lib.pagesizes import A4
from reportlab.lib import colors
from reportlab.lib.units import inch
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image as RLImage
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_CENTER
from io import BytesIO
import requests
buffer = BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=A4, rightMargin=30, leftMargin=30, topMargin=30, bottomMargin=30)
elements = []
styles = getSampleStyleSheet()
title_style = styles['Title']
normal_style = styles['Normal']
header_style = ParagraphStyle('Header', parent=styles['Heading2'], alignment=TA_CENTER, spaceAfter=12)
# Portada
elements.append(Paragraph(f"Informe de Inspección #{inspection.id}", title_style))
elements.append(Spacer(1, 12))
elements.append(Paragraph(f"Vehículo: {inspection.vehicle_brand or ''} {inspection.vehicle_model or ''} - Placa: {inspection.vehicle_plate}", normal_style))
elements.append(Paragraph(f"Cliente: {inspection.client_name or ''}", normal_style))
mechanic = db.query(models.User).filter(models.User.id == inspection.mechanic_id).first()
checklist = db.query(models.Checklist).filter(models.Checklist.id == inspection.checklist_id).first()
elements.append(Paragraph(f"Mecánico: {mechanic.full_name if mechanic else ''}", normal_style))
elements.append(Paragraph(f"Checklist: {checklist.name if checklist else ''}", normal_style))
elements.append(Paragraph(f"Fecha: {inspection.started_at.strftime('%d/%m/%Y %H:%M') if inspection.started_at else ''}", normal_style))
elements.append(Spacer(1, 18))
# Tabla de respuestas con miniaturas
answers = db.query(models.Answer).options(joinedload(models.Answer.media_files)).join(models.Question).filter(models.Answer.inspection_id == inspection_id).order_by(models.Question.section, models.Question.order).all()
table_data = [["Sección", "Pregunta", "Respuesta", "Estado", "Comentario", "Miniaturas"]]
for ans in answers:
question = ans.question
media_imgs = []
for media in ans.media_files:
if media.file_type == "image":
try:
img_resp = requests.get(media.file_path)
if img_resp.status_code == 200:
img_bytes = BytesIO(img_resp.content)
rl_img = RLImage(img_bytes, width=0.7*inch, height=0.7*inch)
media_imgs.append(rl_img)
except Exception as e:
print(f"Error cargando imagen {media.file_path}: {e}")
row = [
question.section or "",
question.text,
ans.answer_value,
ans.status,
ans.comment or "",
media_imgs if media_imgs else ""
]
table_data.append(row)
table = Table(table_data, colWidths=[1.2*inch, 2.5*inch, 1*inch, 0.8*inch, 2*inch, 1.5*inch])
table.setStyle(TableStyle([
('BACKGROUND', (0,0), (-1,0), colors.lightgrey),
('TEXTCOLOR', (0,0), (-1,0), colors.black),
('ALIGN', (0,0), (-1,-1), 'LEFT'),
('VALIGN', (0,0), (-1,-1), 'TOP'),
('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold'),
('FONTSIZE', (0,0), (-1,0), 10),
('BOTTOMPADDING', (0,0), (-1,0), 8),
('GRID', (0,0), (-1,-1), 0.5, colors.grey),
]))
elements.append(table)
elements.append(Spacer(1, 18))
elements.append(Paragraph(f"Generado por Checklist Inteligente - {datetime.now().strftime('%d/%m/%Y %H:%M')}", header_style))
try:
doc.build(elements)
except Exception as e:
print(f"Error al generar PDF: {e}")
buffer.seek(0)
now = datetime.now()
folder = f"{now.year}/{now.month:02d}"
filename = f"inspeccion_{inspection_id}_{inspection.vehicle_plate or 'sin-patente'}.pdf"
s3_key = f"{folder}/{filename}"
buffer.seek(0)
s3_client.upload_fileobj(buffer, S3_PDF_BUCKET, s3_key, ExtraArgs={"ContentType": "application/pdf"})
pdf_url = f"{S3_ENDPOINT}/{S3_PDF_BUCKET}/{s3_key}"
inspection.pdf_url = pdf_url
db.commit()
db.refresh(inspection)
return inspection
@app.patch("/api/inspections/{inspection_id}/deactivate")
def deactivate_inspection(
inspection_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede inactivar
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para inactivar inspecciones")
inspection = db.query(models.Inspection).filter(
models.Inspection.id == inspection_id
).first()
if not inspection:
raise HTTPException(status_code=404, detail="Inspección no encontrada")
inspection.is_active = False
inspection.status = "inactive"
db.commit()
db.refresh(inspection)
return {"message": "Inspección inactivada correctamente", "inspection_id": inspection_id}
# ============= ANSWER ENDPOINTS =============
@app.post("/api/answers", response_model=schemas.Answer)
def create_answer(
answer: schemas.AnswerCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Obtener la pregunta para saber los puntos
question = db.query(models.Question).filter(models.Question.id == answer.question_id).first()
if not question:
raise HTTPException(status_code=404, detail="Pregunta no encontrada")
# Calcular puntos según status
points_earned = 0
if answer.status == "ok":
points_earned = question.points
elif answer.status == "warning":
points_earned = int(question.points * 0.5)
# Buscar si ya existe una respuesta para esta inspección y pregunta
existing_answer = db.query(models.Answer).filter(
models.Answer.inspection_id == answer.inspection_id,
models.Answer.question_id == answer.question_id
).first()
if existing_answer:
# Actualizar la respuesta existente
# Si status es pass/fail, no poner valor por defecto en answer_value
if answer.status in ["pass", "fail"] and not answer.answer_value:
existing_answer.answer_value = None
else:
existing_answer.answer_value = answer.answer_value
existing_answer.status = answer.status
existing_answer.comment = getattr(answer, "comment", None)
existing_answer.ai_analysis = getattr(answer, "ai_analysis", None)
existing_answer.is_flagged = getattr(answer, "is_flagged", False)
existing_answer.points_earned = points_earned
existing_answer.updated_at = datetime.utcnow()
db.commit()
db.refresh(existing_answer)
# Enviar notificación si la pregunta lo requiere
if question.send_notification:
send_answer_notification(existing_answer, question, current_user, db)
return existing_answer
else:
# Si status es pass/fail y no hay valor, no poner valor por defecto en answer_value
answer_data = answer.dict()
if answer.status in ["pass", "fail"] and not answer.answer_value:
answer_data["answer_value"] = None
db_answer = models.Answer(
**answer_data,
points_earned=points_earned
)
db.add(db_answer)
db.commit()
db.refresh(db_answer)
# Enviar notificación si la pregunta lo requiere
if question.send_notification:
send_answer_notification(db_answer, question, current_user, db)
return db_answer
@app.put("/api/answers/{answer_id}", response_model=schemas.Answer)
def update_answer(
answer_id: int,
answer: schemas.AnswerUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
db_answer = db.query(models.Answer).filter(models.Answer.id == answer_id).first()
if not db_answer:
raise HTTPException(status_code=404, detail="Respuesta no encontrada")
# Recalcular puntos si cambió el status
if answer.status and answer.status != db_answer.status:
question = db.query(models.Question).filter(
models.Question.id == db_answer.question_id
).first()
if answer.status == "ok":
db_answer.points_earned = question.points
elif answer.status == "warning":
db_answer.points_earned = int(question.points * 0.5)
else:
db_answer.points_earned = 0
for key, value in answer.dict(exclude_unset=True).items():
setattr(db_answer, key, value)
db.commit()
db.refresh(db_answer)
return db_answer
# ============= MEDIA FILE ENDPOINTS =============
@app.post("/api/answers/{answer_id}/upload", response_model=schemas.MediaFile)
async def upload_photo(
answer_id: int,
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Verificar que la respuesta existe
answer = db.query(models.Answer).filter(models.Answer.id == answer_id).first()
if not answer:
raise HTTPException(status_code=404, detail="Respuesta no encontrada")
# Subir imagen a S3/MinIO
file_extension = file.filename.split(".")[-1]
now = datetime.now()
folder = f"{now.year}/{now.month:02d}"
file_name = f"answer_{answer_id}_{uuid.uuid4().hex}.{file_extension}"
s3_key = f"{folder}/{file_name}"
s3_client.upload_fileobj(file.file, S3_IMAGE_BUCKET, s3_key, ExtraArgs={"ContentType": file.content_type})
# Generar URL pública (ajusta si usas presigned)
image_url = f"{S3_ENDPOINT}/{S3_IMAGE_BUCKET}/{s3_key}"
# Crear registro en BD
media_file = models.MediaFile(
answer_id=answer_id,
file_path=image_url,
file_type="image"
)
db.add(media_file)
db.commit()
db.refresh(media_file)
return media_file
# ============= AI ANALYSIS =============
@app.get("/api/ai/models", response_model=List[schemas.AIModelInfo])
def get_available_ai_models(current_user: models.User = Depends(get_current_user)):
"""Obtener lista de modelos de IA disponibles"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="Solo administradores pueden ver modelos de IA")
models_list = [
# OpenAI Models
{
"id": "gpt-4o",
"name": "GPT-4o (Recomendado)",
"provider": "openai",
"description": "Modelo multimodal más avanzado de OpenAI, rápido y preciso para análisis de imágenes"
},
{
"id": "gpt-4o-mini",
"name": "GPT-4o Mini",
"provider": "openai",
"description": "Versión compacta y económica de GPT-4o, ideal para análisis rápidos"
},
{
"id": "gpt-4-turbo",
"name": "GPT-4 Turbo",
"provider": "openai",
"description": "Modelo potente con capacidades de visión y contexto amplio"
},
{
"id": "gpt-4-vision-preview",
"name": "GPT-4 Vision (Preview)",
"provider": "openai",
"description": "Modelo específico para análisis de imágenes (versión previa)"
},
# Gemini Models - Actualizados a versiones 2.0, 2.5 y 3.0
{
"id": "gemini-3-pro-preview",
"name": "Gemini 3 Pro Preview (Último)",
"provider": "gemini",
"description": "Modelo de próxima generación en preview, máxima capacidad de análisis"
},
{
"id": "gemini-2.5-pro",
"name": "Gemini 2.5 Pro (Recomendado)",
"provider": "gemini",
"description": "Último modelo estable con excelente análisis visual y razonamiento avanzado"
},
{
"id": "gemini-2.5-flash",
"name": "Gemini 2.5 Flash",
"provider": "gemini",
"description": "Versión rápida del 2.5, ideal para inspecciones en tiempo real"
},
{
"id": "gemini-2.0-flash",
"name": "Gemini 2.0 Flash",
"provider": "gemini",
"description": "Modelo rápido y eficiente de la generación 2.0"
},
{
"id": "gemini-1.5-pro-latest",
"name": "Gemini 1.5 Pro Latest",
"provider": "gemini",
"description": "Versión estable 1.5 con contexto de 2M tokens"
},
{
"id": "gemini-1.5-flash-latest",
"name": "Gemini 1.5 Flash Latest",
"provider": "gemini",
"description": "Modelo 1.5 rápido para análisis básicos"
}
]
return models_list
@app.get("/api/ai/configuration", response_model=schemas.AIConfiguration)
def get_ai_configuration(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Obtener configuración de IA actual"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="Solo administradores pueden ver configuración de IA")
config = db.query(models.AIConfiguration).filter(
models.AIConfiguration.is_active == True
).first()
if not config:
raise HTTPException(status_code=404, detail="No hay configuración de IA activa")
return config
@app.post("/api/ai/configuration", response_model=schemas.AIConfiguration)
def create_ai_configuration(
config: schemas.AIConfigurationCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Crear o actualizar configuración de IA"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="Solo administradores pueden configurar IA")
# Desactivar configuraciones anteriores
db.query(models.AIConfiguration).update({"is_active": False})
# Determinar modelo por defecto según el proveedor si no se especifica
model_name = config.model_name
if not model_name:
if config.provider == "openai":
model_name = "gpt-4o"
elif config.provider == "gemini":
model_name = "gemini-2.5-pro"
else:
model_name = "default"
# Crear nueva configuración
new_config = models.AIConfiguration(
provider=config.provider,
api_key=config.api_key,
model_name=model_name,
is_active=True
)
db.add(new_config)
db.commit()
db.refresh(new_config)
return new_config
@app.put("/api/ai/configuration/{config_id}", response_model=schemas.AIConfiguration)
def update_ai_configuration(
config_id: int,
config_update: schemas.AIConfigurationUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Actualizar configuración de IA existente"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="Solo administradores pueden actualizar configuración de IA")
config = db.query(models.AIConfiguration).filter(
models.AIConfiguration.id == config_id
).first()
if not config:
raise HTTPException(status_code=404, detail="Configuración no encontrada")
# Actualizar campos
for key, value in config_update.dict(exclude_unset=True).items():
setattr(config, key, value)
db.commit()
db.refresh(config)
return config
@app.delete("/api/ai/configuration/{config_id}")
def delete_ai_configuration(
config_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Eliminar configuración de IA"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="Solo administradores pueden eliminar configuración de IA")
config = db.query(models.AIConfiguration).filter(
models.AIConfiguration.id == config_id
).first()
if not config:
raise HTTPException(status_code=404, detail="Configuración no encontrada")
db.delete(config)
db.commit()
return {"message": "Configuración eliminada correctamente"}
@app.post("/api/analyze-image")
async def analyze_image(
file: UploadFile = File(...),
question_id: int = None,
custom_prompt: str = None,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""
Analiza una imagen usando IA para sugerir respuestas
Usa la configuración de IA activa (OpenAI o Gemini)
"""
# Obtener configuración de IA activa
ai_config = db.query(models.AIConfiguration).filter(
models.AIConfiguration.is_active == True
).first()
if not ai_config:
return {
"status": "disabled",
"message": "No hay configuración de IA activa. Configure en Settings."
}
# Guardar imagen temporalmente
import base64
contents = await file.read()
image_b64 = base64.b64encode(contents).decode('utf-8')
# Obtener contexto de la pregunta si se proporciona
question_obj = None
if question_id:
question_obj = db.query(models.Question).filter(models.Question.id == question_id).first()
try:
# Construir prompt dinámico basado en la pregunta específica
if question_obj:
# Usar prompt personalizado si está disponible
if custom_prompt:
# Prompt 100% personalizado por el administrador
system_prompt = f"""Eres un mecánico experto realizando una inspección vehicular.
INSTRUCCIONES ESPECÍFICAS PARA ESTA PREGUNTA:
{custom_prompt}
PREGUNTA A RESPONDER: "{question_obj.text}"
Sección: {question_obj.section}
Analiza la imagen siguiendo EXACTAMENTE las instrucciones proporcionadas arriba.
VALIDACIÓN DE IMAGEN:
- Si la imagen NO corresponde al contexto de la pregunta (por ejemplo, si piden luces pero muestran motor), indica en "recommendation" que deben cambiar la foto
- Si la imagen es borrosa, oscura o no permite análisis, indica en "recommendation" que tomen otra foto más clara
Responde en formato JSON:
{{
"status": "ok|minor|critical",
"observations": "Análisis específico según el prompt personalizado",
"recommendation": "Si la imagen no es apropiada, indica 'Por favor tome una foto de [componente correcto]'. Si es apropiada, da recomendación técnica.",
"confidence": 0.0-1.0
}}"""
else:
# Prompt altamente específico para la pregunta
question_text = question_obj.text
question_type = question_obj.type
section = question_obj.section
system_prompt = f"""Eres un mecánico experto realizando una inspección vehicular.
PREGUNTA ESPECÍFICA A RESPONDER: "{question_text}"
Sección: {section}
Analiza la imagen ÚNICAMENTE para responder esta pregunta específica.
Sé directo y enfócate solo en lo que la pregunta solicita.
VALIDACIÓN DE IMAGEN:
- Si la imagen NO corresponde al contexto de la pregunta, indica en "recommendation" que deben cambiar la foto
- Si la imagen es borrosa o no permite análisis, indica en "recommendation" que tomen otra foto más clara
Responde en formato JSON:
{{
"status": "ok|minor|critical",
"observations": "Respuesta específica a: {question_text}",
"recommendation": "Si la imagen no es apropiada, indica 'Por favor tome una foto de [componente correcto]'. Si es apropiada, da acción técnica si aplica.",
"confidence": 0.0-1.0
}}
IMPORTANTE:
- Responde SOLO lo que la pregunta pide
- No des información genérica del vehículo
- Sé específico y técnico
- Si la pregunta es pass/fail, indica claramente si pasa o falla
- Si la pregunta es bueno/regular/malo, indica el estado específico del componente"""
user_message = f"Inspecciona la imagen y responde específicamente: {question_obj.text}"
else:
# Fallback para análisis general
system_prompt = """Eres un experto mecánico automotriz. Analiza la imagen y proporciona:
1. Estado del componente (bueno/regular/malo)
2. Nivel de criticidad (ok/minor/critical)
3. Observaciones técnicas breves
4. Recomendación de acción
Responde en formato JSON:
{
"status": "ok|minor|critical",
"observations": "descripción técnica",
"recommendation": "acción sugerida",
"confidence": 0.0-1.0
}"""
user_message = "Analiza este componente del vehículo para la inspección general."
if ai_config.provider == "openai":
import openai
openai.api_key = ai_config.api_key
response = openai.ChatCompletion.create(
model=ai_config.model_name,
messages=[
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": [
{
"type": "text",
"text": user_message
},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}
}
]
}
],
max_tokens=500
)
ai_response = response.choices[0].message.content
elif ai_config.provider == "gemini":
import google.generativeai as genai
from PIL import Image
from io import BytesIO
genai.configure(api_key=ai_config.api_key)
model = genai.GenerativeModel(ai_config.model_name)
# Convertir base64 a imagen PIL
image = Image.open(BytesIO(contents))
prompt = f"{system_prompt}\n\n{user_message}"
response = model.generate_content([prompt, image])
ai_response = response.text
else:
return {
"success": False,
"error": f"Provider {ai_config.provider} no soportado"
}
# Intentar parsear como JSON, si falla, usar texto plano
try:
import json
import re
# Limpiar markdown code blocks si existen
cleaned_response = ai_response.strip()
# Remover ```json ... ``` si existe
if cleaned_response.startswith('```'):
# Extraer contenido entre ``` markers
match = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', cleaned_response, re.DOTALL)
if match:
cleaned_response = match.group(1).strip()
analysis = json.loads(cleaned_response)
except:
# Si no es JSON válido, crear estructura básica
analysis = {
"status": "ok",
"observations": ai_response,
"recommendation": "Revisar manualmente",
"confidence": 0.7
}
return {
"success": True,
"analysis": analysis,
"raw_response": ai_response,
"model": ai_config.model_name,
"provider": ai_config.provider
}
except Exception as e:
print(f"Error en análisis AI: {e}")
import traceback
traceback.print_exc()
return {
"success": False,
"error": str(e),
"message": "Error analyzing image with AI. Please check AI configuration in Settings."
}
try:
import openai
openai.api_key = settings.OPENAI_API_KEY
# Prompt especializado para inspección vehicular
system_prompt = """Eres un experto mecánico automotriz. Analiza la imagen y proporciona:
1. Estado del componente (bueno/regular/malo)
2. Nivel de criticidad (ok/minor/critical)
3. Observaciones técnicas breves
4. Recomendación de acción
Responde en formato JSON:
{
"status": "ok|minor|critical",
"observations": "descripción técnica",
"recommendation": "acción sugerida",
"confidence": 0.0-1.0
}"""
response = openai.ChatCompletion.create(
model="gpt-4-vision-preview" if "gpt-4" in str(settings.OPENAI_API_KEY) else "gpt-4o",
messages=[
{
"role": "system",
"content": system_prompt
},
{
"role": "user",
"content": [
{
"type": "text",
"text": f"Analiza este componente del vehículo.\n{question_context}"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_b64}"
}
}
]
}
],
max_tokens=500
)
ai_response = response.choices[0].message.content
# Intentar parsear como JSON, si falla, usar texto plano
try:
import json
analysis = json.loads(ai_response)
except:
# Si no es JSON válido, crear estructura básica
analysis = {
"status": "ok",
"observations": ai_response,
"recommendation": "Revisar manualmente",
"confidence": 0.7
}
return {
"success": True,
"analysis": analysis,
"raw_response": ai_response,
"model": "gpt-4-vision"
}
except Exception as e:
print(f"Error en análisis AI: {e}")
return {
"success": False,
"error": str(e),
"message": "Error analyzing image with AI"
}
# ============= REPORTS =============
@app.get("/api/reports/dashboard", response_model=schemas.DashboardData)
def get_dashboard_data(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
mechanic_id: Optional[int] = None,
current_user: models.User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Obtener datos del dashboard de informes"""
if current_user.role not in ["admin", "asesor"]:
raise HTTPException(status_code=403, detail="No tienes permisos para acceder a reportes")
# Construir query base
query = db.query(models.Inspection)
# Aplicar filtros de fecha
if start_date:
# Parsear fecha y establecer al inicio del día en UTC-3
from datetime import timezone
local_tz = timezone(timedelta(hours=-3))
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
if start.tzinfo is None:
start = start.replace(tzinfo=local_tz)
query = query.filter(models.Inspection.started_at >= start)
if end_date:
# Parsear fecha y establecer al final del día en UTC-3
from datetime import timezone
local_tz = timezone(timedelta(hours=-3))
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
if end.tzinfo is None:
end = end.replace(tzinfo=local_tz)
query = query.filter(models.Inspection.started_at <= end)
# Filtro por mecánico
if mechanic_id:
query = query.filter(models.Inspection.mechanic_id == mechanic_id)
# Solo inspecciones activas
query = query.filter(models.Inspection.is_active == True)
# ESTADÍSTICAS GENERALES
total = query.count()
completed = query.filter(models.Inspection.status == "completed").count()
pending = total - completed
# Score promedio
avg_score_result = query.filter(
models.Inspection.score.isnot(None),
models.Inspection.max_score.isnot(None),
models.Inspection.max_score > 0
).with_entities(
func.avg(models.Inspection.score * 100.0 / models.Inspection.max_score)
).scalar()
avg_score = round(avg_score_result, 2) if avg_score_result else 0.0
# Items señalados
flagged_items = db.query(func.count(models.Answer.id))\
.filter(models.Answer.is_flagged == True)\
.join(models.Inspection)\
.filter(models.Inspection.is_active == True)
if start_date:
from datetime import timezone
local_tz = timezone(timedelta(hours=-3))
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
if start.tzinfo is None:
start = start.replace(tzinfo=local_tz)
flagged_items = flagged_items.filter(models.Inspection.started_at >= start)
if end_date:
from datetime import timezone
local_tz = timezone(timedelta(hours=-3))
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
if end.tzinfo is None:
end = end.replace(tzinfo=local_tz)
flagged_items = flagged_items.filter(models.Inspection.started_at <= end)
if mechanic_id:
flagged_items = flagged_items.filter(models.Inspection.mechanic_id == mechanic_id)
total_flagged = flagged_items.scalar() or 0
stats = schemas.InspectionStats(
total_inspections=total,
completed_inspections=completed,
pending_inspections=pending,
completion_rate=round((completed / total * 100) if total > 0 else 0, 2),
avg_score=avg_score,
total_flagged_items=total_flagged
)
# RANKING DE MECÁNICOS
mechanic_stats = db.query(
models.User.id,
models.User.full_name,
func.count(models.Inspection.id).label('total'),
func.avg(
case(
(models.Inspection.max_score > 0, models.Inspection.score * 100.0 / models.Inspection.max_score),
else_=None
)
).label('avg_score'),
func.count(case((models.Inspection.status == 'completed', 1))).label('completed')
).join(models.Inspection, models.Inspection.mechanic_id == models.User.id)\
.filter(models.User.role.in_(['mechanic', 'mecanico']))\
.filter(models.User.is_active == True)\
.filter(models.Inspection.is_active == True)
if start_date:
from datetime import timezone
local_tz = timezone(timedelta(hours=-3))
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
if start.tzinfo is None:
start = start.replace(tzinfo=local_tz)
mechanic_stats = mechanic_stats.filter(models.Inspection.started_at >= start)
if end_date:
from datetime import timezone
local_tz = timezone(timedelta(hours=-3))
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
if end.tzinfo is None:
end = end.replace(tzinfo=local_tz)
mechanic_stats = mechanic_stats.filter(models.Inspection.started_at <= end)
mechanic_stats = mechanic_stats.group_by(models.User.id, models.User.full_name)\
.order_by(func.count(models.Inspection.id).desc())\
.all()
mechanic_ranking = [
schemas.MechanicRanking(
mechanic_id=m.id,
mechanic_name=m.full_name,
total_inspections=m.total,
avg_score=round(m.avg_score, 2) if m.avg_score else 0.0,
completion_rate=round((m.completed / m.total * 100) if m.total > 0 else 0, 2)
)
for m in mechanic_stats
]
# ESTADÍSTICAS POR CHECKLIST
checklist_stats_query = db.query(
models.Checklist.id,
models.Checklist.name,
func.count(models.Inspection.id).label('total'),
func.avg(
case(
(models.Inspection.max_score > 0, models.Inspection.score * 100.0 / models.Inspection.max_score),
else_=None
)
).label('avg_score')
).join(models.Inspection)\
.filter(models.Inspection.is_active == True)\
.filter(models.Checklist.is_active == True)
if start_date:
from datetime import timezone
local_tz = timezone(timedelta(hours=-3))
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
if start.tzinfo is None:
start = start.replace(tzinfo=local_tz)
checklist_stats_query = checklist_stats_query.filter(models.Inspection.started_at >= start)
if end_date:
from datetime import timezone
local_tz = timezone(timedelta(hours=-3))
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
if end.tzinfo is None:
end = end.replace(tzinfo=local_tz)
checklist_stats_query = checklist_stats_query.filter(models.Inspection.started_at <= end)
if mechanic_id:
checklist_stats_query = checklist_stats_query.filter(models.Inspection.mechanic_id == mechanic_id)
checklist_stats_query = checklist_stats_query.group_by(models.Checklist.id, models.Checklist.name)
checklist_stats_data = checklist_stats_query.all()
checklist_stats = [
schemas.ChecklistStats(
checklist_id=c.id,
checklist_name=c.name,
total_inspections=c.total,
avg_score=round(c.avg_score, 2) if c.avg_score else 0.0
)
for c in checklist_stats_data
]
# INSPECCIONES POR FECHA (últimos 30 días)
end_date_obj = datetime.fromisoformat(end_date) if end_date else datetime.now()
start_date_obj = datetime.fromisoformat(start_date) if start_date else end_date_obj - timedelta(days=30)
inspections_by_date_query = db.query(
func.date(models.Inspection.started_at).label('date'),
func.count(models.Inspection.id).label('count')
).filter(
models.Inspection.started_at.between(start_date_obj, end_date_obj),
models.Inspection.is_active == True
)
if mechanic_id:
inspections_by_date_query = inspections_by_date_query.filter(
models.Inspection.mechanic_id == mechanic_id
)
inspections_by_date_data = inspections_by_date_query.group_by(
func.date(models.Inspection.started_at)
).all()
inspections_by_date = {
str(d.date): d.count for d in inspections_by_date_data
}
# RATIO PASS/FAIL
pass_fail_data = db.query(
models.Answer.answer_value,
func.count(models.Answer.id).label('count')
).join(models.Inspection)\
.filter(models.Inspection.is_active == True)\
.filter(models.Answer.answer_value.in_(['pass', 'fail', 'good', 'bad', 'regular']))
if start_date:
from datetime import timezone
local_tz = timezone(timedelta(hours=-3))
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
if start.tzinfo is None:
start = start.replace(tzinfo=local_tz)
pass_fail_data = pass_fail_data.filter(models.Inspection.started_at >= start)
if end_date:
from datetime import timezone
local_tz = timezone(timedelta(hours=-3))
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
if end.tzinfo is None:
end = end.replace(tzinfo=local_tz)
pass_fail_data = pass_fail_data.filter(models.Inspection.started_at <= end)
if mechanic_id:
pass_fail_data = pass_fail_data.filter(models.Inspection.mechanic_id == mechanic_id)
pass_fail_data = pass_fail_data.group_by(models.Answer.answer_value).all()
pass_fail_ratio = {d.answer_value: d.count for d in pass_fail_data}
return schemas.DashboardData(
stats=stats,
mechanic_ranking=mechanic_ranking,
checklist_stats=checklist_stats,
inspections_by_date=inspections_by_date,
pass_fail_ratio=pass_fail_ratio
)
@app.get("/api/reports/inspections")
def get_inspections_report(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
mechanic_id: Optional[int] = None,
checklist_id: Optional[int] = None,
status: Optional[str] = None,
limit: int = 100,
current_user: models.User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Obtener lista de inspecciones con filtros"""
if current_user.role not in ["admin", "asesor"]:
raise HTTPException(status_code=403, detail="No tienes permisos para acceder a reportes")
# Query base con select_from explícito
query = db.query(
models.Inspection.id,
models.Inspection.vehicle_plate,
models.Inspection.checklist_id,
models.Checklist.name.label('checklist_name'),
models.User.full_name.label('mechanic_name'),
models.Inspection.status,
models.Inspection.score,
models.Inspection.max_score,
models.Inspection.started_at,
models.Inspection.completed_at,
func.coalesce(
func.count(case((models.Answer.is_flagged == True, 1))),
0
).label('flagged_items')
).select_from(models.Inspection)\
.join(models.Checklist, models.Inspection.checklist_id == models.Checklist.id)\
.join(models.User, models.Inspection.mechanic_id == models.User.id)\
.outerjoin(models.Answer, models.Answer.inspection_id == models.Inspection.id)\
.filter(models.Inspection.is_active == True)
# Aplicar filtros
if start_date:
from datetime import timezone
local_tz = timezone(timedelta(hours=-3))
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
if start.tzinfo is None:
start = start.replace(tzinfo=local_tz)
query = query.filter(models.Inspection.started_at >= start)
if end_date:
from datetime import timezone
local_tz = timezone(timedelta(hours=-3))
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
if end.tzinfo is None:
end = end.replace(tzinfo=local_tz)
query = query.filter(models.Inspection.started_at <= end)
if mechanic_id:
query = query.filter(models.Inspection.mechanic_id == mechanic_id)
if checklist_id:
query = query.filter(models.Inspection.checklist_id == checklist_id)
if status:
query = query.filter(models.Inspection.status == status)
# Group by y order
query = query.group_by(
models.Inspection.id,
models.Checklist.name,
models.User.full_name
).order_by(models.Inspection.started_at.desc())\
.limit(limit)
results = query.all()
return [
{
"id": r.id,
"vehicle_plate": r.vehicle_plate,
"checklist_id": r.checklist_id,
"checklist_name": r.checklist_name,
"mechanic_name": r.mechanic_name,
"status": r.status,
"score": r.score,
"max_score": r.max_score,
"flagged_items": r.flagged_items,
"started_at": r.started_at.isoformat() if r.started_at else None,
"completed_at": r.completed_at.isoformat() if r.completed_at else None
}
for r in results
]
@app.get("/api/inspections/{inspection_id}/pdf")
def export_inspection_to_pdf(
inspection_id: int,
current_user: models.User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Descargar el PDF guardado en MinIO para la inspección"""
from fastapi.responses import StreamingResponse
import requests
# Obtener inspección
inspection = db.query(models.Inspection).filter(
models.Inspection.id == inspection_id
).first()
if not inspection:
raise HTTPException(status_code=404, detail="Inspección no encontrada")
if current_user.role not in ["admin", "asesor"] and inspection.mechanic_id != current_user.id:
raise HTTPException(status_code=403, detail="No tienes permisos para ver esta inspección")
# Si existe pdf_url, descargar desde MinIO y devolverlo
if inspection.pdf_url:
try:
pdf_resp = requests.get(inspection.pdf_url, stream=True)
if pdf_resp.status_code == 200:
filename = inspection.pdf_url.split("/")[-1]
return StreamingResponse(pdf_resp.raw, media_type="application/pdf", headers={
"Content-Disposition": f"attachment; filename={filename}"
})
else:
raise HTTPException(status_code=404, detail="No se pudo descargar el PDF desde MinIO")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error al descargar PDF: {e}")
else:
raise HTTPException(status_code=404, detail="La inspección no tiene PDF generado")
# ============= HEALTH CHECK =============
@app.get("/")
def root():
return {"message": "Checklist Inteligente API", "version": "1.0.0", "status": "running"}
@app.get("/health")
def health_check():
return {"status": "healthy"}