🔧 Patrones que ahora elimina: ✅ Análisis Automático (90% confianza): ✅ Análisis IA (95% confianza): ✅ 🤖 Análisis Automático (98% confianza): ✅ 🤖 Análisis IA (100% confianza): Backend actualizado a v1.0.81
3143 lines
116 KiB
Python
3143 lines
116 KiB
Python
|
||
# ============= LOGO CONFIGURABLE =============
|
||
|
||
from fastapi import FastAPI, File, UploadFile, Form, Depends, HTTPException, status
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||
from sqlalchemy.orm import Session, joinedload
|
||
from sqlalchemy import func, case
|
||
from typing import List, Optional
|
||
from io import BytesIO
|
||
import os
|
||
import boto3
|
||
from botocore.client import Config
|
||
import uuid
|
||
from app.core import config as app_config
|
||
from app.core.database import engine, get_db, Base
|
||
from app.core.security import verify_password, get_password_hash, create_access_token, decode_access_token
|
||
from app import models, schemas
|
||
import shutil
|
||
from datetime import datetime, timedelta
|
||
import sys
|
||
import requests
|
||
|
||
# Función para enviar notificaciones al webhook
|
||
def send_answer_notification(answer, question, mechanic, db):
|
||
"""Envía notificación al webhook cuando se responde una pregunta marcada"""
|
||
try:
|
||
if not app_config.settings.NOTIFICACION_ENDPOINT:
|
||
print("No hay endpoint de notificación configurado")
|
||
return
|
||
|
||
# Obtener datos de la inspección
|
||
inspection = db.query(models.Inspection).filter(
|
||
models.Inspection.id == answer.inspection_id
|
||
).first()
|
||
|
||
if not inspection:
|
||
return
|
||
|
||
# Preparar datos para enviar
|
||
notification_data = {
|
||
"tipo": "respuesta_pregunta",
|
||
"pregunta": {
|
||
"id": question.id,
|
||
"texto": question.text,
|
||
"seccion": question.section
|
||
},
|
||
"respuesta": {
|
||
"id": answer.id,
|
||
"valor": answer.answer_value,
|
||
"estado": answer.status,
|
||
"comentario": answer.comment,
|
||
"puntos": answer.points_earned
|
||
},
|
||
"inspeccion": {
|
||
"id": inspection.id,
|
||
"vehiculo_placa": inspection.vehicle_plate,
|
||
"vehiculo_marca": inspection.vehicle_brand,
|
||
"vehiculo_modelo": inspection.vehicle_model,
|
||
"pedido": inspection.order_number,
|
||
"or_number": inspection.or_number
|
||
},
|
||
"mecanico": {
|
||
"id": mechanic.id,
|
||
"nombre": mechanic.full_name,
|
||
"email": mechanic.email
|
||
},
|
||
"timestamp": datetime.utcnow().isoformat()
|
||
}
|
||
|
||
# Enviar al webhook
|
||
response = requests.post(
|
||
app_config.settings.NOTIFICACION_ENDPOINT,
|
||
json=notification_data,
|
||
timeout=5
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
print(f"✅ Notificación enviada para pregunta {question.id}")
|
||
else:
|
||
print(f"⚠️ Error al enviar notificación: {response.status_code}")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Error enviando notificación: {e}")
|
||
# No lanzamos excepción para no interrumpir el flujo normal
|
||
|
||
|
||
def send_completed_inspection_to_n8n(inspection, db):
|
||
"""Envía la inspección completa con todas las respuestas e imágenes a n8n"""
|
||
try:
|
||
if not app_config.settings.NOTIFICACION_ENDPOINT:
|
||
print("No hay endpoint de notificación configurado")
|
||
return
|
||
|
||
print(f"\n🚀 Enviando inspección #{inspection.id} a n8n...")
|
||
|
||
# Obtener datos del mecánico
|
||
mechanic = db.query(models.User).filter(models.User.id == inspection.mechanic_id).first()
|
||
|
||
# Obtener checklist
|
||
checklist = db.query(models.Checklist).filter(models.Checklist.id == inspection.checklist_id).first()
|
||
|
||
# Obtener todas las respuestas con sus imágenes
|
||
answers = db.query(models.Answer).options(
|
||
joinedload(models.Answer.media_files),
|
||
joinedload(models.Answer.question)
|
||
).filter(models.Answer.inspection_id == inspection.id).all()
|
||
|
||
# Preparar respuestas con imágenes
|
||
respuestas_data = []
|
||
for answer in answers:
|
||
# Obtener URLs de imágenes
|
||
imagenes = []
|
||
for media in answer.media_files:
|
||
if media.file_type == "image":
|
||
# Extraer filename del file_path (última parte de la URL)
|
||
filename = media.file_path.split('/')[-1] if media.file_path else "imagen.jpg"
|
||
|
||
imagenes.append({
|
||
"id": media.id,
|
||
"url": media.file_path,
|
||
"filename": filename
|
||
})
|
||
|
||
respuestas_data.append({
|
||
"id": answer.id,
|
||
"pregunta": {
|
||
"id": answer.question.id,
|
||
"texto": answer.question.text,
|
||
"seccion": answer.question.section,
|
||
"orden": answer.question.order
|
||
},
|
||
"respuesta": answer.answer_value,
|
||
"estado": answer.status,
|
||
"comentario": answer.comment,
|
||
"puntos_obtenidos": answer.points_earned,
|
||
"es_critico": answer.is_flagged,
|
||
"imagenes": imagenes,
|
||
"ai_analysis": answer.ai_analysis
|
||
})
|
||
|
||
# Preparar datos completos de la inspección
|
||
inspeccion_data = {
|
||
"tipo": "inspeccion_completada",
|
||
"inspeccion": {
|
||
"id": inspection.id,
|
||
"estado": inspection.status,
|
||
"or_number": inspection.or_number,
|
||
"work_order_number": inspection.work_order_number,
|
||
"vehiculo": {
|
||
"placa": inspection.vehicle_plate,
|
||
"marca": inspection.vehicle_brand,
|
||
"modelo": inspection.vehicle_model,
|
||
"kilometraje": inspection.vehicle_km
|
||
},
|
||
"pedido": inspection.order_number,
|
||
"mecanico": {
|
||
"id": mechanic.id if mechanic else None,
|
||
"nombre": mechanic.full_name if mechanic else None,
|
||
"email": mechanic.email if mechanic else None,
|
||
"codigo_operario": inspection.mechanic_employee_code
|
||
},
|
||
"checklist": {
|
||
"id": checklist.id if checklist else None,
|
||
"nombre": checklist.name if checklist else None
|
||
},
|
||
"puntuacion": {
|
||
"obtenida": inspection.score,
|
||
"maxima": inspection.max_score,
|
||
"porcentaje": round(inspection.percentage, 2),
|
||
"items_criticos": inspection.flagged_items_count
|
||
},
|
||
"fechas": {
|
||
"inicio": inspection.started_at.isoformat() if inspection.started_at else None,
|
||
"completado": inspection.completed_at.isoformat() if inspection.completed_at else None
|
||
},
|
||
"pdf_url": inspection.pdf_url,
|
||
"firma": inspection.signature_data
|
||
},
|
||
"respuestas": respuestas_data,
|
||
"timestamp": datetime.utcnow().isoformat()
|
||
}
|
||
|
||
# Enviar al webhook de n8n
|
||
print(f"📤 Enviando {len(respuestas_data)} respuestas con imágenes a n8n...")
|
||
response = requests.post(
|
||
app_config.settings.NOTIFICACION_ENDPOINT,
|
||
json=inspeccion_data,
|
||
timeout=30 # Timeout más largo para inspecciones completas
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
print(f"✅ Inspección #{inspection.id} enviada exitosamente a n8n")
|
||
print(f" - {len(respuestas_data)} respuestas")
|
||
print(f" - {sum(len(r['imagenes']) for r in respuestas_data)} imágenes")
|
||
else:
|
||
print(f"⚠️ Error al enviar inspección a n8n: {response.status_code}")
|
||
print(f" Response: {response.text[:200]}")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Error enviando inspección a n8n: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
# No lanzamos excepción para no interrumpir el flujo normal
|
||
|
||
|
||
BACKEND_VERSION = "1.0.81"
|
||
app = FastAPI(title="Checklist Inteligente API", version=BACKEND_VERSION)
|
||
|
||
# S3/MinIO configuration
|
||
S3_ENDPOINT = app_config.MINIO_ENDPOINT
|
||
S3_ACCESS_KEY = app_config.MINIO_ACCESS_KEY
|
||
S3_SECRET_KEY = app_config.MINIO_SECRET_KEY
|
||
S3_IMAGE_BUCKET = app_config.MINIO_IMAGE_BUCKET
|
||
S3_PDF_BUCKET = app_config.MINIO_PDF_BUCKET
|
||
|
||
s3_client = boto3.client(
|
||
's3',
|
||
endpoint_url=S3_ENDPOINT,
|
||
aws_access_key_id=S3_ACCESS_KEY,
|
||
aws_secret_access_key=S3_SECRET_KEY,
|
||
config=Config(signature_version='s3v4'),
|
||
region_name='us-east-1'
|
||
)
|
||
|
||
# Crear tablas
|
||
Base.metadata.create_all(bind=engine)
|
||
|
||
# Información visual al iniciar el backend
|
||
print("\n================ BACKEND STARTUP INFO ================")
|
||
print(f"Backend version: {BACKEND_VERSION}")
|
||
print(f"Database URL: {app_config.settings.DATABASE_URL}")
|
||
print(f"Environment: {app_config.settings.ENVIRONMENT}")
|
||
print(f"MinIO endpoint: {app_config.MINIO_ENDPOINT}")
|
||
print("====================================================\n", flush=True)
|
||
|
||
# CORS
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["http://localhost:5173", "http://localhost:3000"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
security = HTTPBearer()
|
||
|
||
# Dependency para obtener usuario actual
|
||
def get_current_user(
|
||
credentials: HTTPAuthorizationCredentials = Depends(security),
|
||
db: Session = Depends(get_db)
|
||
):
|
||
token = credentials.credentials
|
||
|
||
# Verificar si es un API token (comienza con "syntria_")
|
||
if token.startswith("syntria_"):
|
||
api_token = db.query(models.APIToken).filter(
|
||
models.APIToken.token == token,
|
||
models.APIToken.is_active == True
|
||
).first()
|
||
|
||
if not api_token:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="API Token inválido o inactivo"
|
||
)
|
||
|
||
# Actualizar último uso
|
||
api_token.last_used_at = datetime.utcnow()
|
||
db.commit()
|
||
|
||
# Obtener usuario
|
||
user = db.query(models.User).filter(models.User.id == api_token.user_id).first()
|
||
if not user or not user.is_active:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Usuario inválido o inactivo"
|
||
)
|
||
|
||
return user
|
||
|
||
# Si no es API token, es JWT token
|
||
payload = decode_access_token(token)
|
||
print(f"Token payload: {payload}") # Debug
|
||
if payload is None:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Token inválido o expirado"
|
||
)
|
||
|
||
user_id = int(payload.get("sub"))
|
||
print(f"Looking for user ID: {user_id}") # Debug
|
||
user = db.query(models.User).filter(models.User.id == user_id).first()
|
||
if user is None:
|
||
print(f"User not found with ID: {user_id}") # Debug
|
||
raise HTTPException(status_code=404, detail="Usuario no encontrado")
|
||
|
||
return user
|
||
|
||
|
||
@app.post("/api/config/logo", response_model=dict)
|
||
async def upload_logo(
|
||
file: UploadFile = File(...),
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Sube un logo y lo guarda en MinIO, actualiza la configuración."""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="Solo administradores pueden cambiar el logo")
|
||
|
||
# Subir imagen a MinIO
|
||
file_extension = file.filename.split(".")[-1]
|
||
now = datetime.now()
|
||
folder = f"logo"
|
||
file_name = f"logo_{now.strftime('%Y%m%d_%H%M%S')}.{file_extension}"
|
||
s3_key = f"{folder}/{file_name}"
|
||
s3_client.upload_fileobj(file.file, S3_IMAGE_BUCKET, s3_key, ExtraArgs={"ContentType": file.content_type})
|
||
logo_url = f"{S3_ENDPOINT}/{S3_IMAGE_BUCKET}/{s3_key}"
|
||
|
||
# Guardar en configuración (puedes tener una tabla Config o usar AIConfiguration)
|
||
config = db.query(models.AIConfiguration).filter(models.AIConfiguration.is_active == True).first()
|
||
if config:
|
||
config.logo_url = logo_url
|
||
db.commit()
|
||
db.refresh(config)
|
||
# Si no hay config, solo retorna la url
|
||
return {"logo_url": logo_url}
|
||
|
||
@app.get("/api/config/logo", response_model=dict)
|
||
def get_logo_url(
|
||
db: Session = Depends(get_db)
|
||
):
|
||
config = db.query(models.AIConfiguration).filter(models.AIConfiguration.is_active == True).first()
|
||
if config and getattr(config, "logo_url", None):
|
||
return {"logo_url": config.logo_url}
|
||
# Default logo (puedes poner una url por defecto)
|
||
return {"logo_url": f"{S3_ENDPOINT}/{S3_IMAGE_BUCKET}/logo/default_logo.png"}
|
||
|
||
|
||
# ============= AUTH ENDPOINTS =============
|
||
@app.post("/api/auth/register", response_model=schemas.User)
|
||
def register(user: schemas.UserCreate, db: Session = Depends(get_db)):
|
||
# Verificar si usuario existe
|
||
db_user = db.query(models.User).filter(models.User.username == user.username).first()
|
||
if db_user:
|
||
raise HTTPException(status_code=400, detail="Usuario ya existe")
|
||
|
||
# Crear usuario
|
||
hashed_password = get_password_hash(user.password)
|
||
db_user = models.User(
|
||
username=user.username,
|
||
email=user.email,
|
||
full_name=user.full_name,
|
||
role=user.role,
|
||
password_hash=hashed_password
|
||
)
|
||
db.add(db_user)
|
||
db.commit()
|
||
db.refresh(db_user)
|
||
return db_user
|
||
|
||
|
||
@app.post("/api/auth/login", response_model=schemas.Token)
|
||
def login(user_login: schemas.UserLogin, db: Session = Depends(get_db)):
|
||
user = db.query(models.User).filter(models.User.username == user_login.username).first()
|
||
if not user or not verify_password(user_login.password, user.password_hash):
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Usuario o contraseña incorrectos"
|
||
)
|
||
|
||
access_token = create_access_token(data={"sub": str(user.id), "role": user.role})
|
||
return {
|
||
"access_token": access_token,
|
||
"token_type": "bearer",
|
||
"user": user
|
||
}
|
||
|
||
|
||
@app.get("/api/auth/me", response_model=schemas.User)
|
||
def get_me(current_user: models.User = Depends(get_current_user)):
|
||
return current_user
|
||
|
||
|
||
# ============= USER ENDPOINTS =============
|
||
@app.get("/api/users", response_model=List[schemas.User])
|
||
def get_users(
|
||
skip: int = 0,
|
||
limit: int = 100,
|
||
active_only: bool = False,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
# Solo admin puede ver todos los usuarios
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No tienes permisos para ver usuarios")
|
||
|
||
query = db.query(models.User)
|
||
if active_only:
|
||
query = query.filter(models.User.is_active == True)
|
||
|
||
return query.offset(skip).limit(limit).all()
|
||
|
||
|
||
@app.get("/api/users/{user_id}", response_model=schemas.User)
|
||
def get_user(
|
||
user_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
# Solo admin puede ver otros usuarios
|
||
if current_user.role != "admin" and current_user.id != user_id:
|
||
raise HTTPException(status_code=403, detail="No tienes permisos para ver este usuario")
|
||
|
||
user = db.query(models.User).filter(models.User.id == user_id).first()
|
||
if not user:
|
||
raise HTTPException(status_code=404, detail="Usuario no encontrado")
|
||
|
||
return user
|
||
|
||
|
||
@app.post("/api/users", response_model=schemas.User)
|
||
def create_user(
|
||
user: schemas.UserCreate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
# Solo admin puede crear usuarios
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No tienes permisos para crear usuarios")
|
||
|
||
# Verificar si usuario existe
|
||
db_user = db.query(models.User).filter(models.User.username == user.username).first()
|
||
if db_user:
|
||
raise HTTPException(status_code=400, detail="Usuario ya existe")
|
||
|
||
# Verificar si email existe
|
||
if user.email:
|
||
db_email = db.query(models.User).filter(models.User.email == user.email).first()
|
||
if db_email:
|
||
raise HTTPException(status_code=400, detail="Email ya está en uso")
|
||
|
||
# Crear usuario
|
||
hashed_password = get_password_hash(user.password)
|
||
db_user = models.User(
|
||
username=user.username,
|
||
email=user.email,
|
||
full_name=user.full_name,
|
||
employee_code=user.employee_code,
|
||
role=user.role,
|
||
password_hash=hashed_password,
|
||
is_active=True
|
||
)
|
||
db.add(db_user)
|
||
db.commit()
|
||
db.refresh(db_user)
|
||
return db_user
|
||
|
||
|
||
@app.put("/api/users/{user_id}", response_model=schemas.User)
|
||
def update_user(
|
||
user_id: int,
|
||
user_update: schemas.UserUpdate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
# Solo admin puede actualizar otros usuarios
|
||
if current_user.role != "admin" and current_user.id != user_id:
|
||
raise HTTPException(status_code=403, detail="No tienes permisos para actualizar este usuario")
|
||
|
||
db_user = db.query(models.User).filter(models.User.id == user_id).first()
|
||
if not db_user:
|
||
raise HTTPException(status_code=404, detail="Usuario no encontrado")
|
||
|
||
# Actualizar campos
|
||
if user_update.username is not None:
|
||
# Verificar si username está en uso
|
||
existing = db.query(models.User).filter(
|
||
models.User.username == user_update.username,
|
||
models.User.id != user_id
|
||
).first()
|
||
if existing:
|
||
raise HTTPException(status_code=400, detail="Nombre de usuario ya está en uso")
|
||
db_user.username = user_update.username
|
||
|
||
if user_update.email is not None:
|
||
# Verificar si email está en uso
|
||
existing = db.query(models.User).filter(
|
||
models.User.email == user_update.email,
|
||
models.User.id != user_id
|
||
).first()
|
||
if existing:
|
||
raise HTTPException(status_code=400, detail="Email ya está en uso")
|
||
db_user.email = user_update.email
|
||
|
||
if user_update.full_name is not None:
|
||
db_user.full_name = user_update.full_name
|
||
|
||
if user_update.employee_code is not None:
|
||
db_user.employee_code = user_update.employee_code
|
||
|
||
# Solo admin puede cambiar roles
|
||
if user_update.role is not None:
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No tienes permisos para cambiar roles")
|
||
db_user.role = user_update.role
|
||
|
||
db.commit()
|
||
db.refresh(db_user)
|
||
return db_user
|
||
|
||
|
||
@app.patch("/api/users/{user_id}/deactivate")
|
||
def deactivate_user(
|
||
user_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
# Solo admin puede inactivar usuarios
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No tienes permisos para inactivar usuarios")
|
||
|
||
# No permitir auto-inactivación
|
||
if current_user.id == user_id:
|
||
raise HTTPException(status_code=400, detail="No puedes inactivar tu propio usuario")
|
||
|
||
db_user = db.query(models.User).filter(models.User.id == user_id).first()
|
||
if not db_user:
|
||
raise HTTPException(status_code=404, detail="Usuario no encontrado")
|
||
|
||
db_user.is_active = False
|
||
db.commit()
|
||
|
||
return {"message": "Usuario inactivado correctamente", "user_id": user_id}
|
||
|
||
|
||
@app.patch("/api/users/{user_id}/activate")
|
||
def activate_user(
|
||
user_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
# Solo admin puede activar usuarios
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No tienes permisos para activar usuarios")
|
||
|
||
db_user = db.query(models.User).filter(models.User.id == user_id).first()
|
||
if not db_user:
|
||
raise HTTPException(status_code=404, detail="Usuario no encontrado")
|
||
|
||
db_user.is_active = True
|
||
db.commit()
|
||
|
||
return {"message": "Usuario activado correctamente", "user_id": user_id}
|
||
|
||
|
||
@app.patch("/api/users/{user_id}/password")
|
||
def change_user_password(
|
||
user_id: int,
|
||
password_update: schemas.AdminPasswordUpdate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
# Solo admin puede cambiar contraseñas de otros usuarios
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No tienes permisos para cambiar contraseñas")
|
||
|
||
db_user = db.query(models.User).filter(models.User.id == user_id).first()
|
||
if not db_user:
|
||
raise HTTPException(status_code=404, detail="Usuario no encontrado")
|
||
|
||
# Cambiar contraseña
|
||
db_user.password_hash = get_password_hash(password_update.new_password)
|
||
db.commit()
|
||
|
||
return {"message": "Contraseña actualizada correctamente"}
|
||
|
||
|
||
@app.patch("/api/users/me/password")
|
||
def change_my_password(
|
||
password_update: schemas.UserPasswordUpdate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
# Verificar contraseña actual
|
||
if not verify_password(password_update.current_password, current_user.password_hash):
|
||
raise HTTPException(status_code=400, detail="Contraseña actual incorrecta")
|
||
|
||
# Cambiar contraseña
|
||
current_user.password_hash = get_password_hash(password_update.new_password)
|
||
db.commit()
|
||
|
||
return {"message": "Contraseña actualizada correctamente"}
|
||
|
||
|
||
@app.put("/api/users/me", response_model=schemas.User)
|
||
def update_my_profile(
|
||
user_update: schemas.UserUpdate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
# Actualizar email
|
||
if user_update.email is not None:
|
||
# Verificar si email está en uso
|
||
existing = db.query(models.User).filter(
|
||
models.User.email == user_update.email,
|
||
models.User.id != current_user.id
|
||
).first()
|
||
if existing:
|
||
raise HTTPException(status_code=400, detail="Email ya está en uso")
|
||
current_user.email = user_update.email
|
||
|
||
# Actualizar nombre
|
||
if user_update.full_name is not None:
|
||
current_user.full_name = user_update.full_name
|
||
|
||
# No permitir cambio de rol desde perfil
|
||
db.commit()
|
||
db.refresh(current_user)
|
||
return current_user
|
||
|
||
|
||
# ============= API TOKENS ENDPOINTS =============
|
||
@app.get("/api/users/me/tokens", response_model=List[schemas.APIToken])
|
||
def get_my_tokens(
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Obtener todos mis API tokens"""
|
||
tokens = db.query(models.APIToken).filter(
|
||
models.APIToken.user_id == current_user.id
|
||
).all()
|
||
return tokens
|
||
|
||
|
||
@app.post("/api/users/me/tokens", response_model=schemas.APITokenWithValue)
|
||
def create_my_token(
|
||
token_create: schemas.APITokenCreate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Generar un nuevo API token"""
|
||
from app.core.security import generate_api_token
|
||
|
||
# Generar token único
|
||
token_value = generate_api_token()
|
||
|
||
# Crear registro
|
||
api_token = models.APIToken(
|
||
user_id=current_user.id,
|
||
token=token_value,
|
||
description=token_create.description,
|
||
is_active=True
|
||
)
|
||
|
||
db.add(api_token)
|
||
db.commit()
|
||
db.refresh(api_token)
|
||
|
||
# Retornar con el token completo (solo esta vez)
|
||
return schemas.APITokenWithValue(
|
||
id=api_token.id,
|
||
token=api_token.token,
|
||
description=api_token.description,
|
||
is_active=api_token.is_active,
|
||
last_used_at=api_token.last_used_at,
|
||
created_at=api_token.created_at
|
||
)
|
||
|
||
|
||
@app.delete("/api/users/me/tokens/{token_id}")
|
||
def delete_my_token(
|
||
token_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Revocar uno de mis API tokens"""
|
||
api_token = db.query(models.APIToken).filter(
|
||
models.APIToken.id == token_id,
|
||
models.APIToken.user_id == current_user.id
|
||
).first()
|
||
|
||
if not api_token:
|
||
raise HTTPException(status_code=404, detail="Token no encontrado")
|
||
|
||
api_token.is_active = False
|
||
db.commit()
|
||
|
||
return {"message": "Token revocado correctamente", "token_id": token_id}
|
||
|
||
|
||
@app.get("/api/users/{user_id}/tokens", response_model=List[schemas.APIToken])
|
||
def get_user_tokens(
|
||
user_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Obtener tokens de un usuario (solo admin)"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No tienes permisos")
|
||
|
||
tokens = db.query(models.APIToken).filter(
|
||
models.APIToken.user_id == user_id
|
||
).all()
|
||
return tokens
|
||
|
||
|
||
@app.delete("/api/users/{user_id}/tokens/{token_id}")
|
||
def delete_user_token(
|
||
user_id: int,
|
||
token_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Revocar token de un usuario (solo admin)"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No tienes permisos")
|
||
|
||
api_token = db.query(models.APIToken).filter(
|
||
models.APIToken.id == token_id,
|
||
models.APIToken.user_id == user_id
|
||
).first()
|
||
|
||
if not api_token:
|
||
raise HTTPException(status_code=404, detail="Token no encontrado")
|
||
|
||
api_token.is_active = False
|
||
db.commit()
|
||
|
||
return {"message": "Token revocado correctamente", "token_id": token_id}
|
||
|
||
|
||
# ============= CHECKLIST ENDPOINTS =============
|
||
@app.get("/api/checklists", response_model=List[schemas.Checklist])
|
||
def get_checklists(
|
||
skip: int = 0,
|
||
limit: int = 100,
|
||
active_only: bool = False,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
query = db.query(models.Checklist)
|
||
if active_only:
|
||
query = query.filter(models.Checklist.is_active == True)
|
||
|
||
# Si es mecánico, solo ver checklists con permiso
|
||
if current_user.role == "mechanic":
|
||
# Obtener IDs de checklists con permiso o sin permisos (acceso global)
|
||
permitted_checklist_ids = db.query(models.ChecklistPermission.checklist_id).filter(
|
||
models.ChecklistPermission.mechanic_id == current_user.id
|
||
).distinct().all()
|
||
permitted_ids = [id[0] for id in permitted_checklist_ids]
|
||
|
||
# Checklists sin permisos = acceso global
|
||
checklists_without_permissions = db.query(models.Checklist.id).outerjoin(
|
||
models.ChecklistPermission
|
||
).group_by(models.Checklist.id).having(
|
||
func.count(models.ChecklistPermission.id) == 0
|
||
).all()
|
||
global_ids = [id[0] for id in checklists_without_permissions]
|
||
|
||
all_allowed_ids = list(set(permitted_ids + global_ids))
|
||
if all_allowed_ids:
|
||
query = query.filter(models.Checklist.id.in_(all_allowed_ids))
|
||
else:
|
||
# Si no hay permisos, devolver lista vacía
|
||
return []
|
||
|
||
checklists = query.offset(skip).limit(limit).all()
|
||
|
||
# Agregar allowed_mechanics a cada checklist
|
||
for checklist in checklists:
|
||
permissions = db.query(models.ChecklistPermission.mechanic_id).filter(
|
||
models.ChecklistPermission.checklist_id == checklist.id
|
||
).all()
|
||
checklist.allowed_mechanics = [p[0] for p in permissions]
|
||
|
||
return checklists
|
||
|
||
|
||
@app.get("/api/checklists/{checklist_id}", response_model=schemas.ChecklistWithQuestions)
|
||
def get_checklist(checklist_id: int, db: Session = Depends(get_db)):
|
||
checklist = db.query(models.Checklist).filter(models.Checklist.id == checklist_id).first()
|
||
|
||
if not checklist:
|
||
raise HTTPException(status_code=404, detail="Checklist no encontrado")
|
||
|
||
# Cargar solo preguntas NO eliminadas
|
||
checklist.questions = db.query(models.Question).filter(
|
||
models.Question.checklist_id == checklist_id,
|
||
models.Question.is_deleted == False
|
||
).order_by(models.Question.order).all()
|
||
|
||
# Agregar allowed_mechanics
|
||
permissions = db.query(models.ChecklistPermission.mechanic_id).filter(
|
||
models.ChecklistPermission.checklist_id == checklist.id
|
||
).all()
|
||
checklist.allowed_mechanics = [p[0] for p in permissions]
|
||
|
||
return checklist
|
||
|
||
|
||
@app.post("/api/checklists", response_model=schemas.Checklist)
|
||
def create_checklist(
|
||
checklist: schemas.ChecklistCreate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No autorizado")
|
||
|
||
# Extraer mechanic_ids antes de crear el checklist
|
||
checklist_data = checklist.dict(exclude={'mechanic_ids'})
|
||
mechanic_ids = checklist.mechanic_ids or []
|
||
|
||
db_checklist = models.Checklist(**checklist_data, created_by=current_user.id)
|
||
db.add(db_checklist)
|
||
db.flush() # Para obtener el ID
|
||
|
||
# Crear permisos para mecánicos seleccionados
|
||
for mechanic_id in mechanic_ids:
|
||
permission = models.ChecklistPermission(
|
||
checklist_id=db_checklist.id,
|
||
mechanic_id=mechanic_id
|
||
)
|
||
db.add(permission)
|
||
|
||
db.commit()
|
||
db.refresh(db_checklist)
|
||
|
||
# Agregar allowed_mechanics a la respuesta
|
||
db_checklist.allowed_mechanics = mechanic_ids
|
||
|
||
return db_checklist
|
||
|
||
|
||
@app.put("/api/checklists/{checklist_id}", response_model=schemas.Checklist)
|
||
def update_checklist(
|
||
checklist_id: int,
|
||
checklist: schemas.ChecklistUpdate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No autorizado")
|
||
|
||
db_checklist = db.query(models.Checklist).filter(models.Checklist.id == checklist_id).first()
|
||
if not db_checklist:
|
||
raise HTTPException(status_code=404, detail="Checklist no encontrado")
|
||
|
||
# Extraer mechanic_ids si se envía
|
||
update_data = checklist.dict(exclude_unset=True, exclude={'mechanic_ids'})
|
||
mechanic_ids = checklist.mechanic_ids
|
||
|
||
# Actualizar campos del checklist
|
||
for key, value in update_data.items():
|
||
setattr(db_checklist, key, value)
|
||
|
||
# Si se proporcionan mechanic_ids, actualizar permisos
|
||
if mechanic_ids is not None:
|
||
# Eliminar permisos existentes
|
||
db.query(models.ChecklistPermission).filter(
|
||
models.ChecklistPermission.checklist_id == checklist_id
|
||
).delete()
|
||
|
||
# Crear nuevos permisos
|
||
for mechanic_id in mechanic_ids:
|
||
permission = models.ChecklistPermission(
|
||
checklist_id=checklist_id,
|
||
mechanic_id=mechanic_id
|
||
)
|
||
db.add(permission)
|
||
|
||
db.commit()
|
||
db.refresh(db_checklist)
|
||
|
||
# Agregar allowed_mechanics a la respuesta
|
||
permissions = db.query(models.ChecklistPermission.mechanic_id).filter(
|
||
models.ChecklistPermission.checklist_id == checklist_id
|
||
).all()
|
||
db_checklist.allowed_mechanics = [p[0] for p in permissions]
|
||
|
||
return db_checklist
|
||
|
||
|
||
@app.post("/api/checklists/{checklist_id}/upload-logo")
|
||
async def upload_checklist_logo(
|
||
checklist_id: int,
|
||
file: UploadFile = File(...),
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Subir logo para un checklist (solo admin)"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="Solo administradores pueden subir logos")
|
||
|
||
# Verificar que el checklist existe
|
||
checklist = db.query(models.Checklist).filter(models.Checklist.id == checklist_id).first()
|
||
if not checklist:
|
||
raise HTTPException(status_code=404, detail="Checklist no encontrado")
|
||
|
||
# Validar que es una imagen
|
||
if not file.content_type.startswith('image/'):
|
||
raise HTTPException(status_code=400, detail="El archivo debe ser una imagen")
|
||
|
||
# Subir a S3/MinIO
|
||
file_extension = file.filename.split(".")[-1]
|
||
now = datetime.now()
|
||
folder = f"checklist-logos/{now.year}/{now.month:02d}"
|
||
file_name = f"checklist_{checklist_id}_{uuid.uuid4().hex}.{file_extension}"
|
||
s3_key = f"{folder}/{file_name}"
|
||
|
||
file_content = await file.read()
|
||
s3_client.upload_fileobj(
|
||
BytesIO(file_content),
|
||
S3_IMAGE_BUCKET,
|
||
s3_key,
|
||
ExtraArgs={"ContentType": file.content_type}
|
||
)
|
||
|
||
logo_url = f"{S3_ENDPOINT}/{S3_IMAGE_BUCKET}/{s3_key}"
|
||
|
||
# Actualizar checklist
|
||
checklist.logo_url = logo_url
|
||
db.commit()
|
||
db.refresh(checklist)
|
||
|
||
return {"logo_url": logo_url, "message": "Logo subido exitosamente"}
|
||
|
||
|
||
@app.delete("/api/checklists/{checklist_id}/logo")
|
||
def delete_checklist_logo(
|
||
checklist_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Eliminar logo de un checklist (solo admin)"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="Solo administradores pueden eliminar logos")
|
||
|
||
checklist = db.query(models.Checklist).filter(models.Checklist.id == checklist_id).first()
|
||
if not checklist:
|
||
raise HTTPException(status_code=404, detail="Checklist no encontrado")
|
||
|
||
checklist.logo_url = None
|
||
db.commit()
|
||
|
||
return {"message": "Logo eliminado exitosamente"}
|
||
|
||
|
||
# ============= QUESTION ENDPOINTS =============
|
||
@app.post("/api/questions", response_model=schemas.Question)
|
||
def create_question(
|
||
question: schemas.QuestionCreate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No autorizado")
|
||
|
||
db_question = models.Question(**question.dict())
|
||
db.add(db_question)
|
||
|
||
# Actualizar max_score del checklist
|
||
checklist = db.query(models.Checklist).filter(
|
||
models.Checklist.id == question.checklist_id
|
||
).first()
|
||
if checklist:
|
||
checklist.max_score += question.points
|
||
|
||
db.commit()
|
||
db.refresh(db_question)
|
||
|
||
# Registrar auditoría
|
||
audit_log = models.QuestionAuditLog(
|
||
question_id=db_question.id,
|
||
checklist_id=question.checklist_id,
|
||
user_id=current_user.id,
|
||
action="created",
|
||
new_value=f"Pregunta creada: {question.text}",
|
||
comment=f"Sección: {question.section}, Tipo: {question.type}, Puntos: {question.points}"
|
||
)
|
||
db.add(audit_log)
|
||
db.commit()
|
||
|
||
return db_question
|
||
|
||
|
||
@app.put("/api/questions/{question_id}", response_model=schemas.Question)
|
||
def update_question(
|
||
question_id: int,
|
||
question: schemas.QuestionUpdate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No autorizado")
|
||
|
||
db_question = db.query(models.Question).filter(models.Question.id == question_id).first()
|
||
if not db_question:
|
||
raise HTTPException(status_code=404, detail="Pregunta no encontrada")
|
||
|
||
# Guardar valores anteriores para auditoría
|
||
import json
|
||
changes = []
|
||
|
||
for key, value in question.dict(exclude_unset=True).items():
|
||
old_value = getattr(db_question, key)
|
||
if old_value != value:
|
||
# Convertir a string para comparación y almacenamiento
|
||
old_str = json.dumps(old_value, ensure_ascii=False) if isinstance(old_value, (dict, list)) else str(old_value)
|
||
new_str = json.dumps(value, ensure_ascii=False) if isinstance(value, (dict, list)) else str(value)
|
||
|
||
changes.append({
|
||
'field': key,
|
||
'old': old_str,
|
||
'new': new_str
|
||
})
|
||
setattr(db_question, key, value)
|
||
|
||
db.commit()
|
||
db.refresh(db_question)
|
||
|
||
# Registrar auditoría para cada campo cambiado
|
||
for change in changes:
|
||
audit_log = models.QuestionAuditLog(
|
||
question_id=question_id,
|
||
checklist_id=db_question.checklist_id,
|
||
user_id=current_user.id,
|
||
action="updated",
|
||
field_name=change['field'],
|
||
old_value=change['old'],
|
||
new_value=change['new'],
|
||
comment=f"Campo '{change['field']}' modificado"
|
||
)
|
||
db.add(audit_log)
|
||
|
||
if changes:
|
||
db.commit()
|
||
|
||
return db_question
|
||
|
||
|
||
@app.patch("/api/checklists/{checklist_id}/questions/reorder")
|
||
def reorder_questions(
|
||
checklist_id: int,
|
||
reorder_data: List[schemas.QuestionReorder],
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Reordenar preguntas de un checklist"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No autorizado")
|
||
|
||
# Verificar que el checklist existe
|
||
checklist = db.query(models.Checklist).filter(models.Checklist.id == checklist_id).first()
|
||
if not checklist:
|
||
raise HTTPException(status_code=404, detail="Checklist no encontrado")
|
||
|
||
# Actualizar el orden de cada pregunta
|
||
for item in reorder_data:
|
||
question = db.query(models.Question).filter(
|
||
models.Question.id == item.question_id,
|
||
models.Question.checklist_id == checklist_id
|
||
).first()
|
||
|
||
if question:
|
||
old_order = question.order
|
||
question.order = item.new_order
|
||
question.updated_at = datetime.utcnow()
|
||
|
||
# Registrar auditoría
|
||
audit_log = models.QuestionAuditLog(
|
||
question_id=question.id,
|
||
checklist_id=checklist_id,
|
||
user_id=current_user.id,
|
||
action="updated",
|
||
field_name="order",
|
||
old_value=str(old_order),
|
||
new_value=str(item.new_order),
|
||
comment="Orden de pregunta actualizado"
|
||
)
|
||
db.add(audit_log)
|
||
|
||
db.commit()
|
||
|
||
return {"message": "Orden de preguntas actualizado exitosamente", "updated_count": len(reorder_data)}
|
||
|
||
|
||
@app.delete("/api/questions/{question_id}")
|
||
def delete_question(
|
||
question_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No autorizado")
|
||
|
||
db_question = db.query(models.Question).filter(models.Question.id == question_id).first()
|
||
if not db_question:
|
||
raise HTTPException(status_code=404, detail="Pregunta no encontrada")
|
||
|
||
if db_question.is_deleted:
|
||
raise HTTPException(status_code=400, detail="La pregunta ya está eliminada")
|
||
|
||
# Registrar auditoría antes de eliminar
|
||
audit_log = models.QuestionAuditLog(
|
||
question_id=question_id,
|
||
checklist_id=db_question.checklist_id,
|
||
user_id=current_user.id,
|
||
action="deleted",
|
||
old_value=f"Pregunta eliminada: {db_question.text}",
|
||
comment=f"Sección: {db_question.section}, Tipo: {db_question.type}, Puntos: {db_question.points}"
|
||
)
|
||
db.add(audit_log)
|
||
|
||
# SOFT DELETE: marcar como eliminada
|
||
db_question.is_deleted = True
|
||
db_question.updated_at = datetime.utcnow()
|
||
|
||
# También marcar como eliminadas todas las subpreguntas (en cascada)
|
||
subquestions = db.query(models.Question).filter(
|
||
models.Question.parent_question_id == question_id,
|
||
models.Question.is_deleted == False
|
||
).all()
|
||
|
||
subquestion_count = 0
|
||
for subq in subquestions:
|
||
subq.is_deleted = True
|
||
subq.updated_at = datetime.utcnow()
|
||
subquestion_count += 1
|
||
|
||
# Registrar auditoría de subpregunta
|
||
sub_audit_log = models.QuestionAuditLog(
|
||
question_id=subq.id,
|
||
checklist_id=subq.checklist_id,
|
||
user_id=current_user.id,
|
||
action="deleted",
|
||
old_value=f"Subpregunta eliminada en cascada: {subq.text}",
|
||
comment=f"Eliminada junto con pregunta padre #{question_id}"
|
||
)
|
||
db.add(sub_audit_log)
|
||
|
||
db.commit()
|
||
|
||
message = "Pregunta eliminada exitosamente"
|
||
if subquestion_count > 0:
|
||
message += f" junto con {subquestion_count} subpregunta(s)"
|
||
|
||
return {
|
||
"message": message,
|
||
"id": question_id,
|
||
"subquestions_deleted": subquestion_count,
|
||
"note": "Las respuestas históricas se mantienen intactas. Las preguntas no aparecerán en nuevas inspecciones."
|
||
}
|
||
|
||
|
||
@app.get("/api/questions/{question_id}/audit", response_model=List[schemas.QuestionAuditLog])
|
||
def get_question_audit_history(
|
||
question_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Obtener historial de cambios de una pregunta"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="Solo administradores pueden ver el historial")
|
||
|
||
audit_logs = db.query(models.QuestionAuditLog).filter(
|
||
models.QuestionAuditLog.question_id == question_id
|
||
).order_by(models.QuestionAuditLog.created_at.desc()).all()
|
||
|
||
return audit_logs
|
||
|
||
|
||
@app.get("/api/checklists/{checklist_id}/questions/audit", response_model=List[schemas.QuestionAuditLog])
|
||
def get_checklist_questions_audit_history(
|
||
checklist_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Obtener historial de cambios de todas las preguntas de un checklist"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="Solo administradores pueden ver el historial")
|
||
|
||
audit_logs = db.query(models.QuestionAuditLog).filter(
|
||
models.QuestionAuditLog.checklist_id == checklist_id
|
||
).order_by(models.QuestionAuditLog.created_at.desc()).all()
|
||
|
||
return audit_logs
|
||
|
||
|
||
# ============= INSPECTION ENDPOINTS =============
|
||
@app.get("/api/inspections", response_model=List[schemas.Inspection])
|
||
def get_inspections(
|
||
skip: int = 0,
|
||
limit: int = 100,
|
||
vehicle_plate: str = None,
|
||
status: str = None,
|
||
show_inactive: bool = False,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
query = db.query(models.Inspection)
|
||
|
||
# Por defecto, solo mostrar inspecciones activas
|
||
if not show_inactive:
|
||
query = query.filter(models.Inspection.is_active == True)
|
||
|
||
# Mecánicos solo ven sus inspecciones
|
||
if current_user.role == "mechanic":
|
||
query = query.filter(models.Inspection.mechanic_id == current_user.id)
|
||
|
||
if vehicle_plate:
|
||
query = query.filter(models.Inspection.vehicle_plate.contains(vehicle_plate))
|
||
|
||
if status:
|
||
query = query.filter(models.Inspection.status == status)
|
||
|
||
return query.order_by(models.Inspection.created_at.desc()).offset(skip).limit(limit).all()
|
||
|
||
|
||
@app.get("/api/inspections/{inspection_id}", response_model=schemas.InspectionDetail)
|
||
def get_inspection(
|
||
inspection_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
inspection = db.query(models.Inspection).options(
|
||
joinedload(models.Inspection.checklist).joinedload(models.Checklist.questions),
|
||
joinedload(models.Inspection.mechanic),
|
||
joinedload(models.Inspection.answers).joinedload(models.Answer.question),
|
||
joinedload(models.Inspection.answers).joinedload(models.Answer.media_files)
|
||
).filter(models.Inspection.id == inspection_id).first()
|
||
|
||
if not inspection:
|
||
raise HTTPException(status_code=404, detail="Inspección no encontrada")
|
||
|
||
return inspection
|
||
|
||
|
||
@app.post("/api/inspections", response_model=schemas.Inspection)
|
||
def create_inspection(
|
||
inspection: schemas.InspectionCreate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
# Obtener max_score del checklist
|
||
checklist = db.query(models.Checklist).filter(
|
||
models.Checklist.id == inspection.checklist_id
|
||
).first()
|
||
|
||
if not checklist:
|
||
raise HTTPException(status_code=404, detail="Checklist no encontrado")
|
||
|
||
# Crear inspección con el employee_code del mecánico actual
|
||
inspection_data = inspection.dict()
|
||
inspection_data['mechanic_employee_code'] = current_user.employee_code # Agregar código de operario automáticamente
|
||
|
||
db_inspection = models.Inspection(
|
||
**inspection_data,
|
||
mechanic_id=current_user.id,
|
||
max_score=checklist.max_score
|
||
)
|
||
db.add(db_inspection)
|
||
db.commit()
|
||
db.refresh(db_inspection)
|
||
return db_inspection
|
||
|
||
|
||
@app.put("/api/inspections/{inspection_id}", response_model=schemas.Inspection)
|
||
def update_inspection(
|
||
inspection_id: int,
|
||
inspection: schemas.InspectionUpdate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
db_inspection = db.query(models.Inspection).filter(
|
||
models.Inspection.id == inspection_id
|
||
).first()
|
||
|
||
if not db_inspection:
|
||
raise HTTPException(status_code=404, detail="Inspección no encontrada")
|
||
|
||
for key, value in inspection.dict(exclude_unset=True).items():
|
||
setattr(db_inspection, key, value)
|
||
|
||
db.commit()
|
||
db.refresh(db_inspection)
|
||
return db_inspection
|
||
|
||
|
||
def generate_inspection_pdf(inspection_id: int, db: Session) -> str:
|
||
"""
|
||
Genera el PDF de una inspección y lo sube a S3.
|
||
Retorna la URL del PDF generado.
|
||
"""
|
||
from reportlab.lib.pagesizes import A4
|
||
from reportlab.lib import colors
|
||
from reportlab.lib.units import inch, mm
|
||
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image as RLImage, PageBreak, KeepTogether
|
||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_RIGHT, TA_JUSTIFY
|
||
from io import BytesIO
|
||
import requests
|
||
|
||
inspection = db.query(models.Inspection).filter(models.Inspection.id == inspection_id).first()
|
||
if not inspection:
|
||
raise HTTPException(status_code=404, detail="Inspección no encontrada")
|
||
|
||
buffer = BytesIO()
|
||
doc = SimpleDocTemplate(
|
||
buffer,
|
||
pagesize=A4,
|
||
rightMargin=15*mm,
|
||
leftMargin=15*mm,
|
||
topMargin=20*mm,
|
||
bottomMargin=20*mm
|
||
)
|
||
|
||
elements = []
|
||
styles = getSampleStyleSheet()
|
||
|
||
# Estilos personalizados
|
||
title_style = ParagraphStyle(
|
||
'CustomTitle',
|
||
parent=styles['Heading1'],
|
||
fontSize=24,
|
||
textColor=colors.HexColor('#1e3a8a'),
|
||
spaceAfter=6,
|
||
alignment=TA_CENTER,
|
||
fontName='Helvetica-Bold'
|
||
)
|
||
|
||
subtitle_style = ParagraphStyle(
|
||
'CustomSubtitle',
|
||
parent=styles['Normal'],
|
||
fontSize=11,
|
||
textColor=colors.HexColor('#475569'),
|
||
spaceAfter=20,
|
||
alignment=TA_CENTER
|
||
)
|
||
|
||
section_header_style = ParagraphStyle(
|
||
'SectionHeader',
|
||
parent=styles['Heading2'],
|
||
fontSize=14,
|
||
textColor=colors.HexColor('#1e40af'),
|
||
spaceBefore=16,
|
||
spaceAfter=10,
|
||
fontName='Helvetica-Bold',
|
||
borderWidth=0,
|
||
borderColor=colors.HexColor('#3b82f6'),
|
||
borderPadding=6,
|
||
backColor=colors.HexColor('#eff6ff')
|
||
)
|
||
|
||
info_style = ParagraphStyle(
|
||
'InfoStyle',
|
||
parent=styles['Normal'],
|
||
fontSize=10,
|
||
textColor=colors.HexColor('#334155'),
|
||
spaceAfter=4
|
||
)
|
||
|
||
small_style = ParagraphStyle(
|
||
'SmallStyle',
|
||
parent=styles['Normal'],
|
||
fontSize=8,
|
||
textColor=colors.HexColor('#64748b')
|
||
)
|
||
|
||
# Estilos mejorados para preguntas y respuestas
|
||
question_style = ParagraphStyle(
|
||
'QuestionStyle',
|
||
parent=styles['Normal'],
|
||
fontSize=11,
|
||
textColor=colors.HexColor('#1f2937'),
|
||
spaceAfter=3,
|
||
fontName='Helvetica-Bold'
|
||
)
|
||
|
||
answer_style = ParagraphStyle(
|
||
'AnswerStyle',
|
||
parent=styles['Normal'],
|
||
fontSize=10,
|
||
textColor=colors.HexColor('#374151'),
|
||
spaceAfter=4
|
||
)
|
||
|
||
comment_style = ParagraphStyle(
|
||
'CommentStyle',
|
||
parent=styles['Normal'],
|
||
fontSize=9,
|
||
textColor=colors.HexColor('#6b7280'),
|
||
spaceAfter=6,
|
||
leftIndent=10,
|
||
rightIndent=10
|
||
)
|
||
|
||
# Obtener datos
|
||
mechanic = db.query(models.User).filter(models.User.id == inspection.mechanic_id).first()
|
||
checklist = db.query(models.Checklist).filter(models.Checklist.id == inspection.checklist_id).first()
|
||
|
||
# Obtener logo principal de configuración para el PDF
|
||
config = db.query(models.AIConfiguration).filter(models.AIConfiguration.is_active == True).first()
|
||
logo_url_to_use = None
|
||
if config and getattr(config, "logo_url", None):
|
||
logo_url_to_use = config.logo_url
|
||
print(f"📸 Usando logo principal de configuración: {logo_url_to_use}")
|
||
else:
|
||
print("ℹ️ No hay logo principal configurado")
|
||
|
||
# ===== PORTADA =====
|
||
elements.append(Spacer(1, 10*mm))
|
||
|
||
# Logo principal (si existe)
|
||
if logo_url_to_use:
|
||
try:
|
||
print(f"🔍 Intentando cargar logo desde: {logo_url_to_use}")
|
||
logo_resp = requests.get(logo_url_to_use, timeout=10)
|
||
print(f"📡 Respuesta del servidor: {logo_resp.status_code}")
|
||
|
||
if logo_resp.status_code == 200:
|
||
logo_bytes = BytesIO(logo_resp.content)
|
||
# Crear imagen con tamaño máximo, manteniendo proporciones
|
||
logo_img = RLImage(logo_bytes)
|
||
|
||
# Ajustar tamaño manteniendo aspect ratio (máximo 50mm de ancho)
|
||
aspect = logo_img.imageHeight / float(logo_img.imageWidth)
|
||
logo_width = 50*mm
|
||
logo_height = logo_width * aspect
|
||
|
||
# Si la altura es muy grande, ajustar por altura
|
||
if logo_height > 40*mm:
|
||
logo_height = 40*mm
|
||
logo_width = logo_height / aspect
|
||
|
||
logo_img.drawWidth = logo_width
|
||
logo_img.drawHeight = logo_height
|
||
|
||
logo_table = Table([[logo_img]], colWidths=[180*mm])
|
||
logo_table.setStyle(TableStyle([
|
||
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
|
||
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
|
||
]))
|
||
elements.append(logo_table)
|
||
elements.append(Spacer(1, 5*mm))
|
||
print(f"✅ Logo cargado correctamente ({logo_width/mm:.1f}mm x {logo_height/mm:.1f}mm)")
|
||
else:
|
||
print(f"❌ Error HTTP al cargar logo: {logo_resp.status_code}")
|
||
except Exception as e:
|
||
print(f"⚠️ Error cargando logo: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
# Título con diseño moderno
|
||
elements.append(Paragraph("📋 INFORME DE INSPECCIÓN VEHICULAR", title_style))
|
||
elements.append(Paragraph(f"N° {inspection.id}", subtitle_style))
|
||
elements.append(Spacer(1, 10*mm))
|
||
|
||
# Estilo para etiquetas de información
|
||
label_style = ParagraphStyle(
|
||
'LabelStyle',
|
||
parent=styles['Normal'],
|
||
fontSize=9,
|
||
textColor=colors.HexColor('#64748b'),
|
||
spaceAfter=2
|
||
)
|
||
|
||
value_style = ParagraphStyle(
|
||
'ValueStyle',
|
||
parent=styles['Normal'],
|
||
fontSize=11,
|
||
textColor=colors.HexColor('#1e293b'),
|
||
fontName='Helvetica-Bold'
|
||
)
|
||
|
||
# Cuadro de información del vehículo con diseño moderno
|
||
vehicle_header = Table(
|
||
[[Paragraph("<b>🚗 INFORMACIÓN DEL VEHÍCULO</b>", ParagraphStyle('veh_header', parent=info_style, fontSize=12, textColor=colors.white))]],
|
||
colWidths=[85*mm]
|
||
)
|
||
vehicle_header.setStyle(TableStyle([
|
||
('BACKGROUND', (0, 0), (-1, -1), colors.HexColor('#2563eb')),
|
||
('PADDING', (0, 0), (-1, -1), 10),
|
||
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
|
||
('ROUNDEDCORNERS', [6, 6, 0, 0]),
|
||
]))
|
||
|
||
vehicle_content = Table([
|
||
[Paragraph("Placa", label_style), Paragraph(f"{inspection.vehicle_plate}", value_style)],
|
||
[Paragraph("Marca", label_style), Paragraph(f"{inspection.vehicle_brand or 'N/A'}", value_style)],
|
||
[Paragraph("Modelo", label_style), Paragraph(f"{inspection.vehicle_model or 'N/A'}", value_style)],
|
||
[Paragraph("Kilometraje", label_style), Paragraph(f"{inspection.vehicle_km or 'N/A'} km", value_style)]
|
||
], colWidths=[25*mm, 60*mm])
|
||
vehicle_content.setStyle(TableStyle([
|
||
('PADDING', (0, 0), (-1, -1), 10),
|
||
('BACKGROUND', (0, 0), (-1, -1), colors.white),
|
||
('VALIGN', (0, 0), (-1, -1), 'TOP'),
|
||
('LINEBELOW', (0, 0), (-1, -2), 0.5, colors.HexColor('#e2e8f0')),
|
||
]))
|
||
|
||
vehicle_table = Table(
|
||
[[vehicle_header], [vehicle_content]],
|
||
colWidths=[85*mm]
|
||
)
|
||
vehicle_table.setStyle(TableStyle([
|
||
('BOX', (0, 0), (-1, -1), 1.5, colors.HexColor('#2563eb')),
|
||
('VALIGN', (0, 0), (-1, -1), 'TOP'),
|
||
('ROUNDEDCORNERS', [6, 6, 6, 6]),
|
||
]))
|
||
|
||
# Cuadro de información del cliente e inspección (sin nombre de mecánico por privacidad)
|
||
client_header = Table(
|
||
[[Paragraph("<b>📄 INFORMACIÓN DE LA INSPECCIÓN</b>", ParagraphStyle('client_header', parent=info_style, fontSize=12, textColor=colors.white))]],
|
||
colWidths=[85*mm]
|
||
)
|
||
client_header.setStyle(TableStyle([
|
||
('BACKGROUND', (0, 0), (-1, -1), colors.HexColor('#16a34a')),
|
||
('PADDING', (0, 0), (-1, -1), 10),
|
||
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
|
||
('ROUNDEDCORNERS', [6, 6, 0, 0]),
|
||
]))
|
||
|
||
client_content = Table([
|
||
[Paragraph("Nº Pedido", label_style), Paragraph(f"{inspection.order_number or 'N/A'}", value_style)],
|
||
[Paragraph("OR N°", label_style), Paragraph(f"{inspection.or_number or 'N/A'}", value_style)],
|
||
[Paragraph("Cód. Operario", label_style), Paragraph(f"{inspection.mechanic_employee_code or 'N/A'}", value_style)],
|
||
[Paragraph("Fecha", label_style), Paragraph(f"{inspection.started_at.strftime('%d/%m/%Y %H:%M') if inspection.started_at else 'N/A'}", value_style)]
|
||
], colWidths=[25*mm, 60*mm])
|
||
client_content.setStyle(TableStyle([
|
||
('PADDING', (0, 0), (-1, -1), 10),
|
||
('BACKGROUND', (0, 0), (-1, -1), colors.white),
|
||
('VALIGN', (0, 0), (-1, -1), 'TOP'),
|
||
('LINEBELOW', (0, 0), (-1, -2), 0.5, colors.HexColor('#e2e8f0')),
|
||
]))
|
||
|
||
inspection_info_table = Table(
|
||
[[client_header], [client_content]],
|
||
colWidths=[85*mm]
|
||
)
|
||
inspection_info_table.setStyle(TableStyle([
|
||
('BOX', (0, 0), (-1, -1), 1.5, colors.HexColor('#16a34a')),
|
||
('VALIGN', (0, 0), (-1, -1), 'TOP'),
|
||
('ROUNDEDCORNERS', [6, 6, 6, 6]),
|
||
]))
|
||
|
||
# Tabla con ambos cuadros lado a lado
|
||
info_table = Table([[vehicle_table, inspection_info_table]], colWidths=[90*mm, 90*mm])
|
||
info_table.setStyle(TableStyle([
|
||
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
|
||
('VALIGN', (0, 0), (-1, -1), 'TOP'),
|
||
]))
|
||
elements.append(info_table)
|
||
elements.append(Spacer(1, 8*mm))
|
||
|
||
# Resumen de puntuación con diseño mejorado
|
||
percentage = inspection.percentage
|
||
score_color = colors.HexColor('#22c55e') if percentage >= 80 else colors.HexColor('#eab308') if percentage >= 60 else colors.HexColor('#ef4444')
|
||
score_label = "EXCELENTE" if percentage >= 80 else "ACEPTABLE" if percentage >= 60 else "DEFICIENTE"
|
||
|
||
# Título de resumen
|
||
score_title = Table(
|
||
[[Paragraph("<b>📊 RESUMEN DE EVALUACIÓN</b>", ParagraphStyle('score_title', parent=info_style, fontSize=14, textColor=colors.HexColor('#1e293b'), alignment=TA_CENTER))]],
|
||
colWidths=[180*mm]
|
||
)
|
||
score_title.setStyle(TableStyle([
|
||
('PADDING', (0, 0), (-1, -1), 8),
|
||
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
|
||
]))
|
||
elements.append(score_title)
|
||
elements.append(Spacer(1, 3*mm))
|
||
|
||
# Cuadro de métricas con diseño moderno
|
||
metric_label = ParagraphStyle('metric_label', parent=small_style, fontSize=9, textColor=colors.HexColor('#64748b'), alignment=TA_CENTER)
|
||
metric_value = ParagraphStyle('metric_value', parent=info_style, fontSize=16, fontName='Helvetica-Bold', alignment=TA_CENTER)
|
||
|
||
metrics_data = [
|
||
[Paragraph("Puntuación", metric_label), Paragraph("Porcentaje", metric_label), Paragraph("Estado", metric_label), Paragraph("Ítems Críticos", metric_label)],
|
||
[
|
||
Paragraph(f"<b>{inspection.score}</b> / {inspection.max_score}", metric_value),
|
||
Paragraph(f"<b>{percentage:.1f}%</b>", metric_value),
|
||
Paragraph(f"<b>{score_label}</b>", ParagraphStyle('status_value', parent=metric_value, textColor=score_color)),
|
||
Paragraph(f"<b>{inspection.flagged_items_count}</b>", metric_value)
|
||
]
|
||
]
|
||
|
||
score_table = Table(metrics_data, colWidths=[45*mm, 45*mm, 45*mm, 45*mm])
|
||
score_table.setStyle(TableStyle([
|
||
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#f8fafc')),
|
||
('BACKGROUND', (0, 1), (-1, -1), colors.white),
|
||
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
|
||
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
|
||
('PADDING', (0, 0), (-1, -1), 12),
|
||
('BOX', (0, 0), (-1, -1), 2, score_color),
|
||
('LINEABOVE', (0, 1), (-1, 1), 1.5, score_color),
|
||
('GRID', (0, 0), (-1, -1), 0.5, colors.HexColor('#e2e8f0')),
|
||
('ROUNDEDCORNERS', [8, 8, 8, 8]),
|
||
]))
|
||
elements.append(score_table)
|
||
elements.append(PageBreak())
|
||
|
||
# ===== DETALLE DE RESPUESTAS =====
|
||
elements.append(Paragraph("📝 DETALLE DE LA INSPECCIÓN", section_header_style))
|
||
elements.append(Spacer(1, 5*mm))
|
||
|
||
# Obtener respuestas agrupadas por sección
|
||
answers = db.query(models.Answer).options(
|
||
joinedload(models.Answer.media_files),
|
||
joinedload(models.Answer.question)
|
||
).join(models.Question).filter(
|
||
models.Answer.inspection_id == inspection_id
|
||
).order_by(
|
||
models.Question.section,
|
||
models.Question.order
|
||
).all()
|
||
|
||
# Función helper para convertir valores técnicos a etiquetas legibles
|
||
def get_readable_answer(answer_value, question_options):
|
||
"""
|
||
Convierte el valor técnico de la respuesta a su etiqueta legible.
|
||
Ej: 'option1' -> 'Bueno', 'pass' -> 'Pasa'
|
||
"""
|
||
if not answer_value or not question_options:
|
||
return answer_value or 'Sin respuesta'
|
||
|
||
config = question_options
|
||
question_type = config.get('type', '')
|
||
|
||
# Para tipos con choices (boolean, single_choice, multiple_choice)
|
||
if question_type in ['boolean', 'single_choice', 'multiple_choice'] and config.get('choices'):
|
||
# Si es multiple_choice, puede tener varios valores separados por coma
|
||
if question_type == 'multiple_choice' and ',' in answer_value:
|
||
values = answer_value.split(',')
|
||
labels = []
|
||
for val in values:
|
||
val = val.strip()
|
||
choice = next((c for c in config['choices'] if c.get('value') == val), None)
|
||
if choice:
|
||
labels.append(choice.get('label', val))
|
||
else:
|
||
labels.append(val)
|
||
return ', '.join(labels)
|
||
else:
|
||
# Buscar la etiqueta correspondiente al valor
|
||
choice = next((c for c in config['choices'] if c.get('value') == answer_value), None)
|
||
if choice:
|
||
return choice.get('label', answer_value)
|
||
|
||
# Para tipos scale, text, number, date, time - devolver el valor tal cual
|
||
return answer_value
|
||
|
||
current_section = None
|
||
for ans in answers:
|
||
question = ans.question
|
||
|
||
# Nueva sección
|
||
if question.section != current_section:
|
||
if current_section is not None:
|
||
elements.append(Spacer(1, 5*mm))
|
||
current_section = question.section
|
||
elements.append(Paragraph(f"▶ {question.section or 'General'}", section_header_style))
|
||
elements.append(Spacer(1, 3*mm))
|
||
|
||
# Estado visual
|
||
status_colors = {
|
||
'ok': colors.HexColor('#22c55e'),
|
||
'warning': colors.HexColor('#eab308'),
|
||
'critical': colors.HexColor('#ef4444')
|
||
}
|
||
status_icons = {
|
||
'ok': '✓',
|
||
'warning': '⚠',
|
||
'critical': '✕'
|
||
}
|
||
status_color = status_colors.get(ans.status, colors.HexColor('#64748b'))
|
||
status_icon = status_icons.get(ans.status, '●')
|
||
|
||
# Tabla de pregunta/respuesta
|
||
question_data = []
|
||
|
||
# Fila 1: Pregunta con estilo mejorado
|
||
question_data.append([
|
||
Paragraph(f"<b>{status_icon} {question.text}</b>", question_style),
|
||
])
|
||
|
||
# Fila 2: Respuesta y estado - Convertir valor técnico a etiqueta legible
|
||
answer_text = get_readable_answer(ans.answer_value, question.options)
|
||
question_data.append([
|
||
Table([
|
||
[
|
||
Paragraph(f"<b>Respuesta:</b> {answer_text}", answer_style),
|
||
Paragraph(f"<b>Estado:</b> {ans.status.upper()}", ParagraphStyle('status', parent=answer_style, textColor=status_color, fontName='Helvetica-Bold'))
|
||
]
|
||
], colWidths=[120*mm, 50*mm])
|
||
])
|
||
|
||
# Fila 3: Comentario mejorado (si existe)
|
||
if ans.comment:
|
||
comment_text = ans.comment
|
||
|
||
# Limpiar prefijo de análisis automático/IA si existe (con cualquier porcentaje)
|
||
import re
|
||
# Patrón para detectar "Análisis Automático (XX% confianza): " o "Análisis IA (XX% confianza): "
|
||
comment_text = re.sub(r'^(Análisis Automático|Análisis IA)\s*\(\d+%\s*confianza\):\s*', '', comment_text)
|
||
# También remover variantes sin emoji
|
||
comment_text = re.sub(r'^🤖\s*(Análisis Automático|Análisis IA)\s*\(\d+%\s*confianza\):\s*', '', comment_text)
|
||
|
||
# Separar análisis y recomendaciones con salto de línea
|
||
if "Recomendaciones:" in comment_text or "Recomendación:" in comment_text:
|
||
comment_text = comment_text.replace("Recomendaciones:", "<br/><br/><b>Recomendaciones:</b>")
|
||
comment_text = comment_text.replace("Recomendación:", "<br/><br/><b>Recomendación:</b>")
|
||
|
||
question_data.append([
|
||
Paragraph(f"<b>Comentario:</b> {comment_text}", comment_style)
|
||
])
|
||
|
||
# Fila 4: Imágenes (si existen)
|
||
if ans.media_files:
|
||
media_imgs = []
|
||
for media in ans.media_files:
|
||
if media.file_type == "image":
|
||
try:
|
||
img_resp = requests.get(media.file_path, timeout=10)
|
||
if img_resp.status_code == 200:
|
||
img_bytes = BytesIO(img_resp.content)
|
||
rl_img = RLImage(img_bytes, width=25*mm, height=25*mm)
|
||
media_imgs.append(rl_img)
|
||
except Exception as e:
|
||
print(f"Error cargando imagen {media.file_path}: {e}")
|
||
|
||
if media_imgs:
|
||
# Crear tabla de miniaturas (máximo 6 por fila)
|
||
img_rows = []
|
||
for i in range(0, len(media_imgs), 6):
|
||
img_rows.append(media_imgs[i:i+6])
|
||
|
||
img_table = Table(img_rows)
|
||
img_table.setStyle(TableStyle([
|
||
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
|
||
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
|
||
('PADDING', (0, 0), (-1, -1), 2),
|
||
]))
|
||
question_data.append([img_table])
|
||
|
||
# Tabla de la pregunta completa
|
||
q_table = Table(question_data, colWidths=[180*mm])
|
||
q_table.setStyle(TableStyle([
|
||
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#f1f5f9')),
|
||
('PADDING', (0, 0), (-1, -1), 6),
|
||
('BOX', (0, 0), (-1, -1), 0.5, colors.HexColor('#cbd5e1')),
|
||
('LEFTPADDING', (0, 0), (-1, -1), 8),
|
||
('VALIGN', (0, 0), (-1, -1), 'TOP'),
|
||
]))
|
||
|
||
elements.append(KeepTogether(q_table))
|
||
elements.append(Spacer(1, 3*mm))
|
||
|
||
# ===== FIRMA =====
|
||
if inspection.signature_data:
|
||
elements.append(PageBreak())
|
||
elements.append(Spacer(1, 10*mm))
|
||
elements.append(Paragraph("✍️ FIRMA DEL OPERARIO", section_header_style))
|
||
elements.append(Spacer(1, 5*mm))
|
||
|
||
try:
|
||
# Decodificar firma base64
|
||
import base64
|
||
signature_bytes = base64.b64decode(inspection.signature_data.split(',')[1] if ',' in inspection.signature_data else inspection.signature_data)
|
||
signature_img_buffer = BytesIO(signature_bytes)
|
||
signature_img = RLImage(signature_img_buffer, width=80*mm, height=40*mm)
|
||
|
||
# Tabla con la firma y datos
|
||
signature_data = [
|
||
[signature_img],
|
||
[Paragraph(f"<b>Operario:</b> {inspection.mechanic_employee_code or 'N/A'}", info_style)],
|
||
[Paragraph(f"<b>Fecha de finalización:</b> {inspection.completed_at.strftime('%d/%m/%Y %H:%M') if inspection.completed_at else 'N/A'}", info_style)]
|
||
]
|
||
|
||
signature_table = Table(signature_data, colWidths=[180*mm])
|
||
signature_table.setStyle(TableStyle([
|
||
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
|
||
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
|
||
('LINEABOVE', (0, 0), (0, 0), 1, colors.HexColor('#cbd5e1')),
|
||
('PADDING', (0, 0), (-1, -1), 8),
|
||
]))
|
||
|
||
elements.append(signature_table)
|
||
print(f"✅ Firma agregada al PDF")
|
||
except Exception as e:
|
||
print(f"⚠️ Error agregando firma al PDF: {e}")
|
||
elements.append(Paragraph(
|
||
f"<i>Error al cargar la firma</i>",
|
||
ParagraphStyle('error', parent=small_style, alignment=TA_CENTER, textColor=colors.HexColor('#ef4444'))
|
||
))
|
||
|
||
# ===== FOOTER =====
|
||
elements.append(Spacer(1, 10*mm))
|
||
elements.append(Paragraph(
|
||
f"Documento generado automáticamente por Checklist Inteligente el {datetime.now().strftime('%d/%m/%Y a las %H:%M')}",
|
||
ParagraphStyle('footer', parent=small_style, alignment=TA_CENTER, textColor=colors.HexColor('#94a3b8'))
|
||
))
|
||
|
||
# Generar PDF
|
||
try:
|
||
doc.build(elements)
|
||
except Exception as e:
|
||
print(f"❌ Error al generar PDF: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
raise HTTPException(status_code=500, detail=f"Error al generar PDF: {str(e)}")
|
||
|
||
# Subir a S3
|
||
buffer.seek(0)
|
||
now = datetime.now()
|
||
folder = f"{now.year}/{now.month:02d}"
|
||
filename = f"inspeccion_{inspection_id}_{inspection.vehicle_plate or 'sin-patente'}.pdf"
|
||
s3_key = f"{folder}/{filename}"
|
||
buffer.seek(0)
|
||
s3_client.upload_fileobj(buffer, S3_PDF_BUCKET, s3_key, ExtraArgs={"ContentType": "application/pdf"})
|
||
pdf_url = f"{S3_ENDPOINT}/{S3_PDF_BUCKET}/{s3_key}"
|
||
|
||
print(f"✅ PDF generado y subido a S3: {pdf_url}")
|
||
return pdf_url
|
||
|
||
|
||
@app.post("/api/inspections/{inspection_id}/complete", response_model=schemas.Inspection)
|
||
def complete_inspection(
|
||
inspection_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
inspection = db.query(models.Inspection).filter(
|
||
models.Inspection.id == inspection_id
|
||
).first()
|
||
|
||
if not inspection:
|
||
raise HTTPException(status_code=404, detail="Inspección no encontrada")
|
||
|
||
# Calcular score
|
||
answers = db.query(models.Answer).filter(models.Answer.inspection_id == inspection_id).all()
|
||
total_score = sum(a.points_earned for a in answers)
|
||
flagged_count = sum(1 for a in answers if a.is_flagged)
|
||
|
||
inspection.score = total_score
|
||
inspection.percentage = (total_score / inspection.max_score * 100) if inspection.max_score > 0 else 0
|
||
inspection.flagged_items_count = flagged_count
|
||
inspection.status = "completed"
|
||
inspection.completed_at = datetime.utcnow()
|
||
|
||
# Generar PDF usando función reutilizable
|
||
pdf_url = generate_inspection_pdf(inspection_id, db)
|
||
inspection.pdf_url = pdf_url
|
||
db.commit()
|
||
db.refresh(inspection)
|
||
|
||
# Enviar inspección completa a n8n con todas las respuestas e imágenes
|
||
send_completed_inspection_to_n8n(inspection, db)
|
||
|
||
return inspection
|
||
|
||
|
||
@app.patch("/api/inspections/{inspection_id}/deactivate")
|
||
def deactivate_inspection(
|
||
inspection_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
# Solo admin puede inactivar
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="No tienes permisos para inactivar inspecciones")
|
||
|
||
inspection = db.query(models.Inspection).filter(
|
||
models.Inspection.id == inspection_id
|
||
).first()
|
||
|
||
if not inspection:
|
||
raise HTTPException(status_code=404, detail="Inspección no encontrada")
|
||
|
||
inspection.is_active = False
|
||
inspection.status = "inactive"
|
||
|
||
db.commit()
|
||
db.refresh(inspection)
|
||
|
||
return {"message": "Inspección inactivada correctamente", "inspection_id": inspection_id}
|
||
|
||
|
||
# ============= ANSWER ENDPOINTS =============
|
||
@app.post("/api/answers", response_model=schemas.Answer)
|
||
def create_answer(
|
||
answer: schemas.AnswerCreate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
# Obtener la pregunta para saber los puntos
|
||
question = db.query(models.Question).filter(models.Question.id == answer.question_id).first()
|
||
|
||
if not question:
|
||
raise HTTPException(status_code=404, detail="Pregunta no encontrada")
|
||
|
||
# Sistema simplificado: 1 punto por pregunta correcta
|
||
points_earned = 0
|
||
if answer.status == "ok":
|
||
points_earned = 1
|
||
elif answer.status == "warning":
|
||
points_earned = 0.5
|
||
|
||
# Buscar si ya existe una respuesta para esta inspección y pregunta
|
||
existing_answer = db.query(models.Answer).filter(
|
||
models.Answer.inspection_id == answer.inspection_id,
|
||
models.Answer.question_id == answer.question_id
|
||
).first()
|
||
if existing_answer:
|
||
# Actualizar la respuesta existente
|
||
# Si status es pass/fail, no poner valor por defecto en answer_value
|
||
if answer.status in ["pass", "fail"] and not answer.answer_value:
|
||
existing_answer.answer_value = None
|
||
else:
|
||
existing_answer.answer_value = answer.answer_value
|
||
existing_answer.status = answer.status
|
||
existing_answer.comment = getattr(answer, "comment", None)
|
||
existing_answer.ai_analysis = getattr(answer, "ai_analysis", None)
|
||
existing_answer.is_flagged = getattr(answer, "is_flagged", False)
|
||
existing_answer.points_earned = points_earned
|
||
existing_answer.updated_at = datetime.utcnow()
|
||
db.commit()
|
||
db.refresh(existing_answer)
|
||
|
||
|
||
# Solo enviar si tiene valor real (no vacío ni None)
|
||
if question.send_notification and answer.answer_value:
|
||
print(f"✅ Enviando notificación para pregunta #{question.id}")
|
||
send_answer_notification(existing_answer, question, current_user, db)
|
||
else:
|
||
if not question.send_notification:
|
||
print(f"❌ NO se envía notificación (send_notification=False) para pregunta #{question.id}")
|
||
else:
|
||
print(f"⏭️ NO se envía notificación (respuesta vacía) para pregunta #{question.id}")
|
||
|
||
return existing_answer
|
||
else:
|
||
# Si status es pass/fail y no hay valor, no poner valor por defecto en answer_value
|
||
answer_data = answer.dict()
|
||
if answer.status in ["pass", "fail"] and not answer.answer_value:
|
||
answer_data["answer_value"] = None
|
||
db_answer = models.Answer(
|
||
**answer_data,
|
||
points_earned=points_earned
|
||
)
|
||
db.add(db_answer)
|
||
db.commit()
|
||
db.refresh(db_answer)
|
||
|
||
|
||
|
||
# Solo enviar si tiene valor real (no vacío ni None)
|
||
if question.send_notification and answer.answer_value:
|
||
print(f"✅ Enviando notificación para pregunta #{question.id}")
|
||
send_answer_notification(db_answer, question, current_user, db)
|
||
else:
|
||
if not question.send_notification:
|
||
print(f"❌ NO se envía notificación (send_notification=False) para pregunta #{question.id}")
|
||
else:
|
||
print(f"⏭️ NO se envía notificación (respuesta vacía) para pregunta #{question.id}")
|
||
|
||
return db_answer
|
||
|
||
|
||
@app.put("/api/answers/{answer_id}", response_model=schemas.Answer)
|
||
def update_answer(
|
||
answer_id: int,
|
||
answer: schemas.AnswerUpdate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
db_answer = db.query(models.Answer).filter(models.Answer.id == answer_id).first()
|
||
|
||
if not db_answer:
|
||
raise HTTPException(status_code=404, detail="Respuesta no encontrada")
|
||
|
||
# Obtener la inspección para verificar si está completada
|
||
inspection = db.query(models.Inspection).filter(
|
||
models.Inspection.id == db_answer.inspection_id
|
||
).first()
|
||
|
||
if not inspection:
|
||
raise HTTPException(status_code=404, detail="Inspección no encontrada")
|
||
|
||
# Recalcular puntos si cambió el status
|
||
if answer.status and answer.status != db_answer.status:
|
||
question = db.query(models.Question).filter(
|
||
models.Question.id == db_answer.question_id
|
||
).first()
|
||
|
||
if answer.status == "ok":
|
||
db_answer.points_earned = question.points
|
||
elif answer.status == "warning":
|
||
db_answer.points_earned = int(question.points * 0.5)
|
||
else:
|
||
db_answer.points_earned = 0
|
||
|
||
for key, value in answer.dict(exclude_unset=True).items():
|
||
setattr(db_answer, key, value)
|
||
|
||
db.commit()
|
||
db.refresh(db_answer)
|
||
|
||
# Si la inspección está completada, regenerar PDF con los cambios
|
||
if inspection.status == "completed":
|
||
print(f"🔄 Regenerando PDF para inspección completada #{inspection.id}")
|
||
|
||
# Recalcular score de la inspección
|
||
answers = db.query(models.Answer).filter(
|
||
models.Answer.inspection_id == inspection.id
|
||
).all()
|
||
|
||
inspection.score = sum(a.points_earned for a in answers)
|
||
inspection.percentage = (inspection.score / inspection.max_score * 100) if inspection.max_score > 0 else 0
|
||
inspection.flagged_items_count = sum(1 for a in answers if a.is_flagged)
|
||
|
||
# Regenerar PDF
|
||
try:
|
||
pdf_url = generate_inspection_pdf(inspection.id, db)
|
||
inspection.pdf_url = pdf_url
|
||
db.commit()
|
||
print(f"✅ PDF regenerado exitosamente: {pdf_url}")
|
||
except Exception as e:
|
||
print(f"❌ Error regenerando PDF: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
# No lanzamos excepción para no interrumpir la actualización de la respuesta
|
||
|
||
return db_answer
|
||
|
||
|
||
# ============= AUDIT LOG ENDPOINTS =============
|
||
@app.get("/api/inspections/{inspection_id}/audit-log", response_model=List[schemas.AuditLog])
|
||
def get_inspection_audit_log(
|
||
inspection_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Obtener el historial de cambios de una inspección"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="Solo administradores pueden ver el historial")
|
||
|
||
logs = db.query(models.InspectionAuditLog).filter(
|
||
models.InspectionAuditLog.inspection_id == inspection_id
|
||
).order_by(models.InspectionAuditLog.created_at.desc()).all()
|
||
|
||
# Agregar nombre de usuario a cada log
|
||
result = []
|
||
for log in logs:
|
||
log_dict = {
|
||
"id": log.id,
|
||
"inspection_id": log.inspection_id,
|
||
"answer_id": log.answer_id,
|
||
"user_id": log.user_id,
|
||
"action": log.action,
|
||
"entity_type": log.entity_type,
|
||
"field_name": log.field_name,
|
||
"old_value": log.old_value,
|
||
"new_value": log.new_value,
|
||
"comment": log.comment,
|
||
"created_at": log.created_at,
|
||
"user_name": None
|
||
}
|
||
|
||
user = db.query(models.User).filter(models.User.id == log.user_id).first()
|
||
if user:
|
||
log_dict["user_name"] = user.full_name or user.username
|
||
|
||
result.append(schemas.AuditLog(**log_dict))
|
||
|
||
return result
|
||
|
||
|
||
@app.put("/api/answers/{answer_id}/admin-edit", response_model=schemas.Answer)
|
||
def admin_edit_answer(
|
||
answer_id: int,
|
||
answer_edit: schemas.AnswerEdit,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Editar una respuesta (solo admin) con registro de auditoría"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="Solo administradores pueden editar respuestas")
|
||
|
||
db_answer = db.query(models.Answer).filter(models.Answer.id == answer_id).first()
|
||
|
||
if not db_answer:
|
||
raise HTTPException(status_code=404, detail="Respuesta no encontrada")
|
||
|
||
# Registrar cambios en el log de auditoría
|
||
changes = []
|
||
|
||
if answer_edit.answer_value is not None and answer_edit.answer_value != db_answer.answer_value:
|
||
changes.append({
|
||
"field_name": "answer_value",
|
||
"old_value": db_answer.answer_value,
|
||
"new_value": answer_edit.answer_value
|
||
})
|
||
db_answer.answer_value = answer_edit.answer_value
|
||
|
||
if answer_edit.status is not None and answer_edit.status != db_answer.status:
|
||
changes.append({
|
||
"field_name": "status",
|
||
"old_value": db_answer.status,
|
||
"new_value": answer_edit.status
|
||
})
|
||
|
||
# Recalcular puntos
|
||
question = db.query(models.Question).filter(
|
||
models.Question.id == db_answer.question_id
|
||
).first()
|
||
|
||
old_points = db_answer.points_earned
|
||
if answer_edit.status == "ok":
|
||
db_answer.points_earned = question.points
|
||
elif answer_edit.status == "warning":
|
||
db_answer.points_earned = int(question.points * 0.5)
|
||
else:
|
||
db_answer.points_earned = 0
|
||
|
||
if old_points != db_answer.points_earned:
|
||
changes.append({
|
||
"field_name": "points_earned",
|
||
"old_value": str(old_points),
|
||
"new_value": str(db_answer.points_earned)
|
||
})
|
||
|
||
db_answer.status = answer_edit.status
|
||
|
||
if answer_edit.comment is not None and answer_edit.comment != db_answer.comment:
|
||
changes.append({
|
||
"field_name": "comment",
|
||
"old_value": db_answer.comment or "",
|
||
"new_value": answer_edit.comment
|
||
})
|
||
db_answer.comment = answer_edit.comment
|
||
|
||
if answer_edit.is_flagged is not None and answer_edit.is_flagged != db_answer.is_flagged:
|
||
changes.append({
|
||
"field_name": "is_flagged",
|
||
"old_value": str(db_answer.is_flagged),
|
||
"new_value": str(answer_edit.is_flagged)
|
||
})
|
||
db_answer.is_flagged = answer_edit.is_flagged
|
||
|
||
# Crear registros de auditoría para cada cambio
|
||
for change in changes:
|
||
audit_log = models.InspectionAuditLog(
|
||
inspection_id=db_answer.inspection_id,
|
||
answer_id=answer_id,
|
||
user_id=current_user.id,
|
||
action="updated",
|
||
entity_type="answer",
|
||
field_name=change["field_name"],
|
||
old_value=change["old_value"],
|
||
new_value=change["new_value"],
|
||
comment=answer_edit.edit_comment or "Editado por administrador"
|
||
)
|
||
db.add(audit_log)
|
||
|
||
db_answer.updated_at = datetime.utcnow()
|
||
db.commit()
|
||
db.refresh(db_answer)
|
||
|
||
# Si la inspección está completada, regenerar PDF con los cambios
|
||
inspection = db.query(models.Inspection).filter(
|
||
models.Inspection.id == db_answer.inspection_id
|
||
).first()
|
||
|
||
if inspection and inspection.status == "completed":
|
||
print(f"🔄 Regenerando PDF para inspección completada #{inspection.id} (admin-edit)")
|
||
|
||
# Recalcular score de la inspección
|
||
answers = db.query(models.Answer).filter(
|
||
models.Answer.inspection_id == inspection.id
|
||
).all()
|
||
|
||
inspection.score = sum(a.points_earned for a in answers)
|
||
inspection.percentage = (inspection.score / inspection.max_score * 100) if inspection.max_score > 0 else 0
|
||
inspection.flagged_items_count = sum(1 for a in answers if a.is_flagged)
|
||
|
||
# Regenerar PDF
|
||
try:
|
||
pdf_url = generate_inspection_pdf(inspection.id, db)
|
||
inspection.pdf_url = pdf_url
|
||
db.commit()
|
||
print(f"✅ PDF regenerado exitosamente: {pdf_url}")
|
||
except Exception as e:
|
||
print(f"❌ Error regenerando PDF: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
# No lanzamos excepción para no interrumpir la actualización de la respuesta
|
||
|
||
return db_answer
|
||
|
||
|
||
# ============= MEDIA FILE ENDPOINTS =============
|
||
@app.post("/api/answers/{answer_id}/upload", response_model=schemas.MediaFile)
|
||
async def upload_photo(
|
||
answer_id: int,
|
||
file: UploadFile = File(...),
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
# Verificar que la respuesta existe
|
||
answer = db.query(models.Answer).filter(models.Answer.id == answer_id).first()
|
||
if not answer:
|
||
raise HTTPException(status_code=404, detail="Respuesta no encontrada")
|
||
|
||
# Subir imagen a S3/MinIO
|
||
file_extension = file.filename.split(".")[-1]
|
||
now = datetime.now()
|
||
folder = f"{now.year}/{now.month:02d}"
|
||
file_name = f"answer_{answer_id}_{uuid.uuid4().hex}.{file_extension}"
|
||
s3_key = f"{folder}/{file_name}"
|
||
s3_client.upload_fileobj(file.file, S3_IMAGE_BUCKET, s3_key, ExtraArgs={"ContentType": file.content_type})
|
||
# Generar URL pública (ajusta si usas presigned)
|
||
image_url = f"{S3_ENDPOINT}/{S3_IMAGE_BUCKET}/{s3_key}"
|
||
# Crear registro en BD
|
||
media_file = models.MediaFile(
|
||
answer_id=answer_id,
|
||
file_path=image_url,
|
||
file_type="image"
|
||
)
|
||
db.add(media_file)
|
||
db.commit()
|
||
db.refresh(media_file)
|
||
return media_file
|
||
|
||
|
||
# ============= AI ANALYSIS =============
|
||
@app.get("/api/ai/models", response_model=List[schemas.AIModelInfo])
|
||
def get_available_ai_models(current_user: models.User = Depends(get_current_user)):
|
||
"""Obtener lista de modelos de IA disponibles"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="Solo administradores pueden ver modelos de IA")
|
||
|
||
models_list = [
|
||
# OpenAI Models
|
||
{
|
||
"id": "gpt-4o",
|
||
"name": "GPT-4o (Recomendado)",
|
||
"provider": "openai",
|
||
"description": "Modelo multimodal más avanzado de OpenAI, rápido y preciso para análisis de imágenes"
|
||
},
|
||
{
|
||
"id": "gpt-4o-mini",
|
||
"name": "GPT-4o Mini",
|
||
"provider": "openai",
|
||
"description": "Versión compacta y económica de GPT-4o, ideal para análisis rápidos"
|
||
},
|
||
{
|
||
"id": "gpt-4-turbo",
|
||
"name": "GPT-4 Turbo",
|
||
"provider": "openai",
|
||
"description": "Modelo potente con capacidades de visión y contexto amplio"
|
||
},
|
||
{
|
||
"id": "gpt-4-vision-preview",
|
||
"name": "GPT-4 Vision (Preview)",
|
||
"provider": "openai",
|
||
"description": "Modelo específico para análisis de imágenes (versión previa)"
|
||
},
|
||
# Gemini Models - Actualizados a versiones 2.0, 2.5 y 3.0
|
||
{
|
||
"id": "gemini-3-pro-preview",
|
||
"name": "Gemini 3 Pro Preview (Último)",
|
||
"provider": "gemini",
|
||
"description": "Modelo de próxima generación en preview, máxima capacidad de análisis"
|
||
},
|
||
{
|
||
"id": "gemini-2.5-pro",
|
||
"name": "Gemini 2.5 Pro (Recomendado)",
|
||
"provider": "gemini",
|
||
"description": "Último modelo estable con excelente análisis visual y razonamiento avanzado"
|
||
},
|
||
{
|
||
"id": "gemini-2.5-flash",
|
||
"name": "Gemini 2.5 Flash",
|
||
"provider": "gemini",
|
||
"description": "Versión rápida del 2.5, ideal para inspecciones en tiempo real"
|
||
},
|
||
{
|
||
"id": "gemini-2.0-flash",
|
||
"name": "Gemini 2.0 Flash",
|
||
"provider": "gemini",
|
||
"description": "Modelo rápido y eficiente de la generación 2.0"
|
||
},
|
||
{
|
||
"id": "gemini-1.5-pro-latest",
|
||
"name": "Gemini 1.5 Pro Latest",
|
||
"provider": "gemini",
|
||
"description": "Versión estable 1.5 con contexto de 2M tokens"
|
||
},
|
||
{
|
||
"id": "gemini-1.5-flash-latest",
|
||
"name": "Gemini 1.5 Flash Latest",
|
||
"provider": "gemini",
|
||
"description": "Modelo 1.5 rápido para análisis básicos"
|
||
}
|
||
]
|
||
|
||
return models_list
|
||
|
||
|
||
@app.get("/api/ai/configuration", response_model=schemas.AIConfiguration)
|
||
def get_ai_configuration(
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Obtener configuración de IA actual"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="Solo administradores pueden ver configuración de IA")
|
||
|
||
config = db.query(models.AIConfiguration).filter(
|
||
models.AIConfiguration.is_active == True
|
||
).first()
|
||
|
||
if not config:
|
||
raise HTTPException(status_code=404, detail="No hay configuración de IA activa")
|
||
|
||
return config
|
||
|
||
|
||
@app.post("/api/ai/configuration", response_model=schemas.AIConfiguration)
|
||
def create_ai_configuration(
|
||
config: schemas.AIConfigurationCreate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Crear o actualizar configuración de IA"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="Solo administradores pueden configurar IA")
|
||
|
||
# Desactivar configuraciones anteriores
|
||
db.query(models.AIConfiguration).update({"is_active": False})
|
||
|
||
# Determinar modelo por defecto según el proveedor si no se especifica
|
||
model_name = config.model_name
|
||
if not model_name:
|
||
if config.provider == "openai":
|
||
model_name = "gpt-4o"
|
||
elif config.provider == "gemini":
|
||
model_name = "gemini-2.5-pro"
|
||
else:
|
||
model_name = "default"
|
||
|
||
# Crear nueva configuración
|
||
new_config = models.AIConfiguration(
|
||
provider=config.provider,
|
||
api_key=config.api_key,
|
||
model_name=model_name,
|
||
is_active=True
|
||
)
|
||
|
||
db.add(new_config)
|
||
db.commit()
|
||
db.refresh(new_config)
|
||
|
||
return new_config
|
||
|
||
|
||
@app.put("/api/ai/configuration/{config_id}", response_model=schemas.AIConfiguration)
|
||
def update_ai_configuration(
|
||
config_id: int,
|
||
config_update: schemas.AIConfigurationUpdate,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Actualizar configuración de IA existente"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="Solo administradores pueden actualizar configuración de IA")
|
||
|
||
config = db.query(models.AIConfiguration).filter(
|
||
models.AIConfiguration.id == config_id
|
||
).first()
|
||
|
||
if not config:
|
||
raise HTTPException(status_code=404, detail="Configuración no encontrada")
|
||
|
||
# Actualizar campos
|
||
for key, value in config_update.dict(exclude_unset=True).items():
|
||
setattr(config, key, value)
|
||
|
||
db.commit()
|
||
db.refresh(config)
|
||
|
||
return config
|
||
|
||
|
||
@app.delete("/api/ai/configuration/{config_id}")
|
||
def delete_ai_configuration(
|
||
config_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""Eliminar configuración de IA"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(status_code=403, detail="Solo administradores pueden eliminar configuración de IA")
|
||
|
||
config = db.query(models.AIConfiguration).filter(
|
||
models.AIConfiguration.id == config_id
|
||
).first()
|
||
|
||
if not config:
|
||
raise HTTPException(status_code=404, detail="Configuración no encontrada")
|
||
|
||
db.delete(config)
|
||
db.commit()
|
||
|
||
return {"message": "Configuración eliminada correctamente"}
|
||
|
||
|
||
@app.post("/api/analyze-image")
|
||
async def analyze_image(
|
||
file: UploadFile = File(...),
|
||
question_id: int = Form(None),
|
||
inspection_id: int = Form(None),
|
||
custom_prompt: str = Form(None),
|
||
db: Session = Depends(get_db),
|
||
current_user: models.User = Depends(get_current_user)
|
||
):
|
||
"""
|
||
Analiza una imagen usando IA para sugerir respuestas
|
||
Usa la configuración de IA activa (OpenAI o Gemini)
|
||
Incluye contexto del vehículo si se proporciona inspection_id
|
||
"""
|
||
print("\n" + "="*80)
|
||
print("🔍 ANALYZE IMAGE - DEBUG")
|
||
print("="*80)
|
||
print(f"📥 Parámetros recibidos:")
|
||
print(f" - file: {file.filename}")
|
||
print(f" - question_id: {question_id}")
|
||
print(f" - inspection_id: {inspection_id}")
|
||
print(f" - custom_prompt (del Form): {custom_prompt[:100] if custom_prompt else 'NO RECIBIDO'}")
|
||
|
||
# Obtener configuración de IA activa
|
||
ai_config = db.query(models.AIConfiguration).filter(
|
||
models.AIConfiguration.is_active == True
|
||
).first()
|
||
|
||
if not ai_config:
|
||
return {
|
||
"status": "disabled",
|
||
"message": "No hay configuración de IA activa. Configure en Settings."
|
||
}
|
||
|
||
# Guardar imagen temporalmente
|
||
import base64
|
||
|
||
contents = await file.read()
|
||
image_b64 = base64.b64encode(contents).decode('utf-8')
|
||
|
||
# Obtener contexto de la pregunta si se proporciona
|
||
question_obj = None
|
||
if question_id:
|
||
question_obj = db.query(models.Question).filter(models.Question.id == question_id).first()
|
||
print(f"📋 Pregunta encontrada:")
|
||
print(f" - ID: {question_obj.id}")
|
||
print(f" - Texto: {question_obj.text}")
|
||
print(f" - ai_prompt en DB: {question_obj.ai_prompt[:100] if question_obj.ai_prompt else 'NO TIENE'}")
|
||
|
||
# Si no se proporciona custom_prompt en el Form, usar el de la pregunta
|
||
if not custom_prompt and question_obj and question_obj.ai_prompt:
|
||
custom_prompt = question_obj.ai_prompt
|
||
print(f"✅ Usando ai_prompt de la pregunta de la DB")
|
||
elif custom_prompt:
|
||
print(f"✅ Usando custom_prompt del Form")
|
||
else:
|
||
print(f"⚠️ NO HAY custom_prompt (ni del Form ni de la DB)")
|
||
|
||
print(f"📝 Custom prompt FINAL a usar: {custom_prompt[:150] if custom_prompt else 'NINGUNO'}...")
|
||
|
||
# Obtener contexto del vehículo si se proporciona inspection_id
|
||
vehicle_context = ""
|
||
if inspection_id:
|
||
inspection = db.query(models.Inspection).filter(models.Inspection.id == inspection_id).first()
|
||
if inspection:
|
||
print(f"🚗 Contexto del vehículo agregado: {inspection.vehicle_brand} {inspection.vehicle_model}")
|
||
vehicle_context = f"""
|
||
INFORMACIÓN DEL VEHÍCULO INSPECCIONADO:
|
||
- Marca: {inspection.vehicle_brand}
|
||
- Modelo: {inspection.vehicle_model}
|
||
- Placa: {inspection.vehicle_plate}
|
||
- Kilometraje: {inspection.vehicle_km} km
|
||
- Nº Pedido: {inspection.order_number}
|
||
- OR/Orden: {inspection.or_number}
|
||
"""
|
||
else:
|
||
print(f"⚠️ inspection_id {inspection_id} no encontrado en DB")
|
||
else:
|
||
print(f"⚠️ NO se proporcionó inspection_id, sin contexto de vehículo")
|
||
|
||
try:
|
||
# Construir prompt dinámico basado en la pregunta específica
|
||
if question_obj:
|
||
# Usar prompt personalizado si está disponible
|
||
if custom_prompt:
|
||
# Prompt personalizado - DIRECTO Y SIMPLE
|
||
system_prompt = f"""Eres un mecánico experto realizando una inspección vehicular.
|
||
|
||
{vehicle_context}
|
||
|
||
TAREA ESPECÍFICA:
|
||
{custom_prompt}
|
||
|
||
Responde SOLO en formato JSON válido (sin markdown, sin ```json):
|
||
{{
|
||
"status": "ok",
|
||
"observations": "Describe lo que observas en la imagen en relación a la tarea solicitada",
|
||
"recommendation": "Acción sugerida basada en lo observado",
|
||
"confidence": 0.85
|
||
}}
|
||
|
||
VALORES DE STATUS:
|
||
- "ok": Cumple con lo esperado según la tarea
|
||
- "minor": Presenta observaciones menores o advertencias
|
||
- "critical": Presenta problemas graves o no cumple con lo esperado
|
||
|
||
IMPORTANTE: Si la tarea requiere verificar funcionamiento (algo encendido, prendido, activo) pero la imagen muestra el componente apagado o en reposo, usa status "critical" e indica en "recommendation" que se necesita una foto con el componente funcionando o un video."""
|
||
|
||
user_message = f"Pregunta de inspección: {question_obj.text}\n\nAnaliza esta imagen según la tarea especificada."
|
||
else:
|
||
# Prompt altamente específico para la pregunta
|
||
question_text = question_obj.text
|
||
question_type = question_obj.type
|
||
section = question_obj.section
|
||
|
||
system_prompt = f"""Eres un mecánico experto realizando una inspección vehicular.
|
||
|
||
{vehicle_context}
|
||
|
||
PREGUNTA ESPECÍFICA A RESPONDER: "{question_text}"
|
||
Sección: {section}
|
||
|
||
Analiza la imagen ÚNICAMENTE para responder esta pregunta específica.
|
||
Sé directo y enfócate solo en lo que la pregunta solicita.
|
||
Considera el kilometraje y características del vehículo para contextualizar tu análisis.
|
||
|
||
VALIDACIÓN DE IMAGEN:
|
||
- Si la imagen NO corresponde al contexto de la pregunta, indica en "recommendation" que deben cambiar la foto
|
||
- Si la imagen es borrosa o no permite análisis, indica en "recommendation" que tomen otra foto más clara
|
||
|
||
Responde SOLO en formato JSON válido (sin markdown, sin ```json):
|
||
{{
|
||
"status": "ok",
|
||
"observations": "Respuesta técnica específica a: {question_text}",
|
||
"recommendation": "Acción técnica recomendada o mensaje si la foto no es apropiada",
|
||
"confidence": 0.85
|
||
}}
|
||
|
||
NOTA IMPORTANTE sobre el campo "status":
|
||
- Usa "ok" si el componente está en buen estado y pasa la inspección
|
||
- Usa "minor" si hay problemas leves que requieren atención pero no son críticos
|
||
- Usa "critical" si hay problemas graves que requieren reparación inmediata
|
||
|
||
RECUERDA:
|
||
- Responde SOLO lo que la pregunta pide
|
||
- No des información genérica del vehículo
|
||
- Sé específico y técnico"""
|
||
|
||
if vehicle_context:
|
||
user_message = f"Inspecciona esta imagen del vehículo y responde específicamente: {question_obj.text}. En tus observaciones, menciona si el estado es apropiado para el kilometraje y marca/modelo del vehículo."
|
||
else:
|
||
user_message = f"Inspecciona la imagen y responde específicamente: {question_obj.text}"
|
||
else:
|
||
# Fallback para análisis general
|
||
system_prompt = f"""Eres un experto mecánico automotriz.
|
||
|
||
{vehicle_context}
|
||
|
||
Analiza la imagen y proporciona:
|
||
1. Estado del componente (bueno/regular/malo)
|
||
2. Nivel de criticidad (ok/minor/critical)
|
||
3. Observaciones técnicas breves
|
||
4. Recomendación de acción
|
||
|
||
Responde SOLO en formato JSON válido (sin markdown, sin ```json):
|
||
{{
|
||
"status": "ok",
|
||
"observations": "descripción técnica del componente",
|
||
"recommendation": "acción sugerida",
|
||
"confidence": 0.85
|
||
}}
|
||
|
||
NOTA: "status" debe ser "ok" (bueno), "minor" (problemas leves) o "critical" (problemas graves)."""
|
||
user_message = "Analiza este componente del vehículo para la inspección general."
|
||
|
||
print(f"\n🤖 PROMPT ENVIADO AL AI:")
|
||
print(f"Provider: {ai_config.provider}")
|
||
print(f"Model: {ai_config.model_name}")
|
||
print(f"System prompt (primeros 200 chars): {system_prompt[:200]}...")
|
||
print(f"User message: {user_message}")
|
||
print("="*80 + "\n")
|
||
|
||
if ai_config.provider == "openai":
|
||
import openai
|
||
openai.api_key = ai_config.api_key
|
||
|
||
response = openai.ChatCompletion.create(
|
||
model=ai_config.model_name,
|
||
messages=[
|
||
{"role": "system", "content": system_prompt},
|
||
{
|
||
"role": "user",
|
||
"content": [
|
||
{
|
||
"type": "text",
|
||
"text": user_message
|
||
},
|
||
{
|
||
"type": "image_url",
|
||
"image_url": {"url": f"data:image/jpeg;base64,{image_b64}"}
|
||
}
|
||
]
|
||
}
|
||
],
|
||
max_tokens=500
|
||
)
|
||
|
||
ai_response = response.choices[0].message.content
|
||
|
||
elif ai_config.provider == "gemini":
|
||
import google.generativeai as genai
|
||
from PIL import Image
|
||
from io import BytesIO
|
||
|
||
genai.configure(api_key=ai_config.api_key)
|
||
model = genai.GenerativeModel(ai_config.model_name)
|
||
|
||
# Convertir base64 a imagen PIL
|
||
image = Image.open(BytesIO(contents))
|
||
|
||
prompt = f"{system_prompt}\n\n{user_message}"
|
||
response = model.generate_content([prompt, image])
|
||
ai_response = response.text
|
||
|
||
else:
|
||
return {
|
||
"success": False,
|
||
"error": f"Provider {ai_config.provider} no soportado"
|
||
}
|
||
|
||
# Intentar parsear como JSON, si falla, usar texto plano
|
||
try:
|
||
import json
|
||
import re
|
||
|
||
# Limpiar markdown code blocks si existen
|
||
cleaned_response = ai_response.strip()
|
||
|
||
# Remover ```json ... ``` si existe
|
||
if cleaned_response.startswith('```'):
|
||
# Extraer contenido entre ``` markers
|
||
match = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', cleaned_response, re.DOTALL)
|
||
if match:
|
||
cleaned_response = match.group(1).strip()
|
||
|
||
analysis = json.loads(cleaned_response)
|
||
except:
|
||
# Si no es JSON válido, crear estructura básica
|
||
analysis = {
|
||
"status": "ok",
|
||
"observations": ai_response,
|
||
"recommendation": "Revisar manualmente",
|
||
"confidence": 0.7
|
||
}
|
||
|
||
return {
|
||
"success": True,
|
||
"analysis": analysis,
|
||
"raw_response": ai_response,
|
||
"model": ai_config.model_name,
|
||
"provider": ai_config.provider
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"Error en análisis AI: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"message": "Error analyzing image with AI. Please check AI configuration in Settings."
|
||
}
|
||
|
||
try:
|
||
import openai
|
||
openai.api_key = settings.OPENAI_API_KEY
|
||
|
||
# Prompt especializado para inspección vehicular
|
||
system_prompt = """Eres un experto mecánico automotriz. Analiza la imagen y proporciona:
|
||
1. Estado del componente (bueno/regular/malo)
|
||
2. Nivel de criticidad (ok/minor/critical)
|
||
3. Observaciones técnicas breves
|
||
4. Recomendación de acción
|
||
|
||
Responde en formato JSON:
|
||
{
|
||
"status": "ok|minor|critical",
|
||
"observations": "descripción técnica",
|
||
"recommendation": "acción sugerida",
|
||
"confidence": 0.0-1.0
|
||
}"""
|
||
|
||
response = openai.ChatCompletion.create(
|
||
model="gpt-4-vision-preview" if "gpt-4" in str(settings.OPENAI_API_KEY) else "gpt-4o",
|
||
messages=[
|
||
{
|
||
"role": "system",
|
||
"content": system_prompt
|
||
},
|
||
{
|
||
"role": "user",
|
||
"content": [
|
||
{
|
||
"type": "text",
|
||
"text": f"Analiza este componente del vehículo.\n{question_context}"
|
||
},
|
||
{
|
||
"type": "image_url",
|
||
"image_url": {
|
||
"url": f"data:image/jpeg;base64,{image_b64}"
|
||
}
|
||
}
|
||
]
|
||
}
|
||
],
|
||
max_tokens=500
|
||
)
|
||
|
||
ai_response = response.choices[0].message.content
|
||
|
||
# Intentar parsear como JSON, si falla, usar texto plano
|
||
try:
|
||
import json
|
||
analysis = json.loads(ai_response)
|
||
except:
|
||
# Si no es JSON válido, crear estructura básica
|
||
analysis = {
|
||
"status": "ok",
|
||
"observations": ai_response,
|
||
"recommendation": "Revisar manualmente",
|
||
"confidence": 0.7
|
||
}
|
||
|
||
return {
|
||
"success": True,
|
||
"analysis": analysis,
|
||
"raw_response": ai_response,
|
||
"model": "gpt-4-vision"
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"Error en análisis AI: {e}")
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"message": "Error analyzing image with AI"
|
||
}
|
||
|
||
|
||
# ============= REPORTS =============
|
||
@app.get("/api/reports/dashboard", response_model=schemas.DashboardData)
|
||
def get_dashboard_data(
|
||
start_date: Optional[str] = None,
|
||
end_date: Optional[str] = None,
|
||
mechanic_id: Optional[int] = None,
|
||
current_user: models.User = Depends(get_current_user),
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""Obtener datos del dashboard de informes"""
|
||
if current_user.role not in ["admin", "asesor"]:
|
||
raise HTTPException(status_code=403, detail="No tienes permisos para acceder a reportes")
|
||
|
||
# Construir query base
|
||
query = db.query(models.Inspection)
|
||
|
||
# Aplicar filtros de fecha
|
||
if start_date:
|
||
# Parsear fecha y establecer al inicio del día en UTC-3
|
||
from datetime import timezone
|
||
local_tz = timezone(timedelta(hours=-3))
|
||
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
|
||
if start.tzinfo is None:
|
||
start = start.replace(tzinfo=local_tz)
|
||
query = query.filter(models.Inspection.started_at >= start)
|
||
if end_date:
|
||
# Parsear fecha y establecer al final del día en UTC-3
|
||
from datetime import timezone
|
||
local_tz = timezone(timedelta(hours=-3))
|
||
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
|
||
if end.tzinfo is None:
|
||
end = end.replace(tzinfo=local_tz)
|
||
query = query.filter(models.Inspection.started_at <= end)
|
||
|
||
# Filtro por mecánico
|
||
if mechanic_id:
|
||
query = query.filter(models.Inspection.mechanic_id == mechanic_id)
|
||
|
||
# Solo inspecciones activas
|
||
query = query.filter(models.Inspection.is_active == True)
|
||
|
||
# ESTADÍSTICAS GENERALES
|
||
total = query.count()
|
||
completed = query.filter(models.Inspection.status == "completed").count()
|
||
pending = total - completed
|
||
|
||
# Score promedio
|
||
avg_score_result = query.filter(
|
||
models.Inspection.score.isnot(None),
|
||
models.Inspection.max_score.isnot(None),
|
||
models.Inspection.max_score > 0
|
||
).with_entities(
|
||
func.avg(models.Inspection.score * 100.0 / models.Inspection.max_score)
|
||
).scalar()
|
||
avg_score = round(avg_score_result, 2) if avg_score_result else 0.0
|
||
|
||
# Items señalados
|
||
flagged_items = db.query(func.count(models.Answer.id))\
|
||
.filter(models.Answer.is_flagged == True)\
|
||
.join(models.Inspection)\
|
||
.filter(models.Inspection.is_active == True)
|
||
|
||
if start_date:
|
||
from datetime import timezone
|
||
local_tz = timezone(timedelta(hours=-3))
|
||
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
|
||
if start.tzinfo is None:
|
||
start = start.replace(tzinfo=local_tz)
|
||
flagged_items = flagged_items.filter(models.Inspection.started_at >= start)
|
||
if end_date:
|
||
from datetime import timezone
|
||
local_tz = timezone(timedelta(hours=-3))
|
||
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
|
||
if end.tzinfo is None:
|
||
end = end.replace(tzinfo=local_tz)
|
||
flagged_items = flagged_items.filter(models.Inspection.started_at <= end)
|
||
if mechanic_id:
|
||
flagged_items = flagged_items.filter(models.Inspection.mechanic_id == mechanic_id)
|
||
|
||
total_flagged = flagged_items.scalar() or 0
|
||
|
||
stats = schemas.InspectionStats(
|
||
total_inspections=total,
|
||
completed_inspections=completed,
|
||
pending_inspections=pending,
|
||
completion_rate=round((completed / total * 100) if total > 0 else 0, 2),
|
||
avg_score=avg_score,
|
||
total_flagged_items=total_flagged
|
||
)
|
||
|
||
# RANKING DE MECÁNICOS
|
||
mechanic_stats = db.query(
|
||
models.User.id,
|
||
models.User.full_name,
|
||
func.count(models.Inspection.id).label('total'),
|
||
func.avg(
|
||
case(
|
||
(models.Inspection.max_score > 0, models.Inspection.score * 100.0 / models.Inspection.max_score),
|
||
else_=None
|
||
)
|
||
).label('avg_score'),
|
||
func.count(case((models.Inspection.status == 'completed', 1))).label('completed')
|
||
).join(models.Inspection, models.Inspection.mechanic_id == models.User.id)\
|
||
.filter(models.User.role.in_(['mechanic', 'mecanico']))\
|
||
.filter(models.User.is_active == True)\
|
||
.filter(models.Inspection.is_active == True)
|
||
|
||
if start_date:
|
||
from datetime import timezone
|
||
local_tz = timezone(timedelta(hours=-3))
|
||
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
|
||
if start.tzinfo is None:
|
||
start = start.replace(tzinfo=local_tz)
|
||
mechanic_stats = mechanic_stats.filter(models.Inspection.started_at >= start)
|
||
if end_date:
|
||
from datetime import timezone
|
||
local_tz = timezone(timedelta(hours=-3))
|
||
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
|
||
if end.tzinfo is None:
|
||
end = end.replace(tzinfo=local_tz)
|
||
mechanic_stats = mechanic_stats.filter(models.Inspection.started_at <= end)
|
||
|
||
mechanic_stats = mechanic_stats.group_by(models.User.id, models.User.full_name)\
|
||
.order_by(func.count(models.Inspection.id).desc())\
|
||
.all()
|
||
|
||
mechanic_ranking = [
|
||
schemas.MechanicRanking(
|
||
mechanic_id=m.id,
|
||
mechanic_name=m.full_name or "Sin nombre",
|
||
total_inspections=m.total,
|
||
avg_score=round(m.avg_score, 2) if m.avg_score else 0.0,
|
||
completion_rate=round((m.completed / m.total * 100) if m.total > 0 else 0, 2)
|
||
)
|
||
for m in mechanic_stats if m.full_name
|
||
]
|
||
|
||
# ESTADÍSTICAS POR CHECKLIST
|
||
checklist_stats_query = db.query(
|
||
models.Checklist.id,
|
||
models.Checklist.name,
|
||
func.count(models.Inspection.id).label('total'),
|
||
func.avg(
|
||
case(
|
||
(models.Inspection.max_score > 0, models.Inspection.score * 100.0 / models.Inspection.max_score),
|
||
else_=None
|
||
)
|
||
).label('avg_score')
|
||
).join(models.Inspection)\
|
||
.filter(models.Inspection.is_active == True)\
|
||
.filter(models.Checklist.is_active == True)
|
||
|
||
if start_date:
|
||
from datetime import timezone
|
||
local_tz = timezone(timedelta(hours=-3))
|
||
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
|
||
if start.tzinfo is None:
|
||
start = start.replace(tzinfo=local_tz)
|
||
checklist_stats_query = checklist_stats_query.filter(models.Inspection.started_at >= start)
|
||
if end_date:
|
||
from datetime import timezone
|
||
local_tz = timezone(timedelta(hours=-3))
|
||
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
|
||
if end.tzinfo is None:
|
||
end = end.replace(tzinfo=local_tz)
|
||
checklist_stats_query = checklist_stats_query.filter(models.Inspection.started_at <= end)
|
||
if mechanic_id:
|
||
checklist_stats_query = checklist_stats_query.filter(models.Inspection.mechanic_id == mechanic_id)
|
||
|
||
checklist_stats_query = checklist_stats_query.group_by(models.Checklist.id, models.Checklist.name)
|
||
checklist_stats_data = checklist_stats_query.all()
|
||
|
||
checklist_stats = [
|
||
schemas.ChecklistStats(
|
||
checklist_id=c.id,
|
||
checklist_name=c.name or "Sin nombre",
|
||
total_inspections=c.total,
|
||
avg_score=round(c.avg_score, 2) if c.avg_score else 0.0
|
||
)
|
||
for c in checklist_stats_data if c.name
|
||
]
|
||
|
||
# INSPECCIONES POR FECHA (últimos 30 días)
|
||
end_date_obj = datetime.fromisoformat(end_date) if end_date else datetime.now()
|
||
start_date_obj = datetime.fromisoformat(start_date) if start_date else end_date_obj - timedelta(days=30)
|
||
|
||
inspections_by_date_query = db.query(
|
||
func.date(models.Inspection.started_at).label('date'),
|
||
func.count(models.Inspection.id).label('count')
|
||
).filter(
|
||
models.Inspection.started_at.between(start_date_obj, end_date_obj),
|
||
models.Inspection.is_active == True
|
||
)
|
||
|
||
if mechanic_id:
|
||
inspections_by_date_query = inspections_by_date_query.filter(
|
||
models.Inspection.mechanic_id == mechanic_id
|
||
)
|
||
|
||
inspections_by_date_data = inspections_by_date_query.group_by(
|
||
func.date(models.Inspection.started_at)
|
||
).all()
|
||
|
||
inspections_by_date = {
|
||
str(d.date): d.count for d in inspections_by_date_data
|
||
}
|
||
|
||
# RATIO PASS/FAIL
|
||
pass_fail_data = db.query(
|
||
models.Answer.answer_value,
|
||
func.count(models.Answer.id).label('count')
|
||
).join(models.Inspection)\
|
||
.filter(models.Inspection.is_active == True)\
|
||
.filter(models.Answer.answer_value.in_(['pass', 'fail', 'good', 'bad', 'regular']))
|
||
|
||
if start_date:
|
||
from datetime import timezone
|
||
local_tz = timezone(timedelta(hours=-3))
|
||
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
|
||
if start.tzinfo is None:
|
||
start = start.replace(tzinfo=local_tz)
|
||
pass_fail_data = pass_fail_data.filter(models.Inspection.started_at >= start)
|
||
if end_date:
|
||
from datetime import timezone
|
||
local_tz = timezone(timedelta(hours=-3))
|
||
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
|
||
if end.tzinfo is None:
|
||
end = end.replace(tzinfo=local_tz)
|
||
pass_fail_data = pass_fail_data.filter(models.Inspection.started_at <= end)
|
||
if mechanic_id:
|
||
pass_fail_data = pass_fail_data.filter(models.Inspection.mechanic_id == mechanic_id)
|
||
|
||
pass_fail_data = pass_fail_data.group_by(models.Answer.answer_value).all()
|
||
|
||
pass_fail_ratio = {d.answer_value: d.count for d in pass_fail_data}
|
||
|
||
return schemas.DashboardData(
|
||
stats=stats,
|
||
mechanic_ranking=mechanic_ranking,
|
||
checklist_stats=checklist_stats,
|
||
inspections_by_date=inspections_by_date,
|
||
pass_fail_ratio=pass_fail_ratio
|
||
)
|
||
|
||
|
||
@app.get("/api/reports/inspections")
|
||
def get_inspections_report(
|
||
start_date: Optional[str] = None,
|
||
end_date: Optional[str] = None,
|
||
mechanic_id: Optional[int] = None,
|
||
checklist_id: Optional[int] = None,
|
||
status: Optional[str] = None,
|
||
limit: int = 100,
|
||
current_user: models.User = Depends(get_current_user),
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""Obtener lista de inspecciones con filtros"""
|
||
if current_user.role not in ["admin", "asesor"]:
|
||
raise HTTPException(status_code=403, detail="No tienes permisos para acceder a reportes")
|
||
|
||
# Query base con select_from explícito
|
||
query = db.query(
|
||
models.Inspection.id,
|
||
models.Inspection.vehicle_plate,
|
||
models.Inspection.checklist_id,
|
||
models.Checklist.name.label('checklist_name'),
|
||
models.User.full_name.label('mechanic_name'),
|
||
models.Inspection.status,
|
||
models.Inspection.score,
|
||
models.Inspection.max_score,
|
||
models.Inspection.started_at,
|
||
models.Inspection.completed_at,
|
||
func.coalesce(
|
||
func.count(case((models.Answer.is_flagged == True, 1))),
|
||
0
|
||
).label('flagged_items')
|
||
).select_from(models.Inspection)\
|
||
.join(models.Checklist, models.Inspection.checklist_id == models.Checklist.id)\
|
||
.join(models.User, models.Inspection.mechanic_id == models.User.id)\
|
||
.outerjoin(models.Answer, models.Answer.inspection_id == models.Inspection.id)\
|
||
.filter(models.Inspection.is_active == True)
|
||
|
||
# Aplicar filtros
|
||
if start_date:
|
||
from datetime import timezone
|
||
local_tz = timezone(timedelta(hours=-3))
|
||
start = datetime.fromisoformat(start_date).replace(hour=0, minute=0, second=0, microsecond=0)
|
||
if start.tzinfo is None:
|
||
start = start.replace(tzinfo=local_tz)
|
||
query = query.filter(models.Inspection.started_at >= start)
|
||
if end_date:
|
||
from datetime import timezone
|
||
local_tz = timezone(timedelta(hours=-3))
|
||
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59, microsecond=999999)
|
||
if end.tzinfo is None:
|
||
end = end.replace(tzinfo=local_tz)
|
||
query = query.filter(models.Inspection.started_at <= end)
|
||
if mechanic_id:
|
||
query = query.filter(models.Inspection.mechanic_id == mechanic_id)
|
||
if checklist_id:
|
||
query = query.filter(models.Inspection.checklist_id == checklist_id)
|
||
if status:
|
||
query = query.filter(models.Inspection.status == status)
|
||
|
||
# Group by y order
|
||
query = query.group_by(
|
||
models.Inspection.id,
|
||
models.Checklist.name,
|
||
models.User.full_name
|
||
).order_by(models.Inspection.started_at.desc())\
|
||
.limit(limit)
|
||
|
||
results = query.all()
|
||
|
||
return [
|
||
{
|
||
"id": r.id,
|
||
"vehicle_plate": r.vehicle_plate,
|
||
"checklist_id": r.checklist_id,
|
||
"checklist_name": r.checklist_name or "Sin nombre",
|
||
"mechanic_name": r.mechanic_name or "Sin nombre",
|
||
"status": r.status,
|
||
"score": r.score,
|
||
"max_score": r.max_score,
|
||
"flagged_items": r.flagged_items,
|
||
"started_at": r.started_at.isoformat() if r.started_at else None,
|
||
"completed_at": r.completed_at.isoformat() if r.completed_at else None
|
||
}
|
||
for r in results
|
||
]
|
||
|
||
|
||
@app.get("/api/inspections/{inspection_id}/pdf")
|
||
def export_inspection_to_pdf(
|
||
inspection_id: int,
|
||
current_user: models.User = Depends(get_current_user),
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""Descargar el PDF guardado en MinIO para la inspección"""
|
||
from fastapi.responses import StreamingResponse
|
||
import requests
|
||
# Obtener inspección
|
||
inspection = db.query(models.Inspection).filter(
|
||
models.Inspection.id == inspection_id
|
||
).first()
|
||
if not inspection:
|
||
raise HTTPException(status_code=404, detail="Inspección no encontrada")
|
||
if current_user.role not in ["admin", "asesor"] and inspection.mechanic_id != current_user.id:
|
||
raise HTTPException(status_code=403, detail="No tienes permisos para ver esta inspección")
|
||
# Si existe pdf_url, descargar desde MinIO y devolverlo
|
||
if inspection.pdf_url:
|
||
try:
|
||
pdf_resp = requests.get(inspection.pdf_url, stream=True)
|
||
if pdf_resp.status_code == 200:
|
||
filename = inspection.pdf_url.split("/")[-1]
|
||
return StreamingResponse(pdf_resp.raw, media_type="application/pdf", headers={
|
||
"Content-Disposition": f"attachment; filename={filename}"
|
||
})
|
||
else:
|
||
raise HTTPException(status_code=404, detail="No se pudo descargar el PDF desde MinIO")
|
||
except Exception as e:
|
||
raise HTTPException(status_code=500, detail=f"Error al descargar PDF: {e}")
|
||
else:
|
||
raise HTTPException(status_code=404, detail="La inspección no tiene PDF generado")
|
||
|
||
|
||
# ============= HEALTH CHECK =============
|
||
@app.get("/")
|
||
def root():
|
||
return {"message": "Checklist Inteligente API", "version": "1.0.0", "status": "running"}
|
||
|
||
|
||
@app.get("/health")
|
||
def health_check():
|
||
return {"status": "healthy"}
|