from fastapi import FastAPI, Depends, HTTPException, status, UploadFile, File from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session, joinedload from typing import List import os import shutil from datetime import datetime, timedelta from app.core.database import engine, get_db, Base from app.core.security import verify_password, get_password_hash, create_access_token, decode_access_token from app import models, schemas # Crear tablas Base.metadata.create_all(bind=engine) app = FastAPI(title="Checklist Inteligente API", version="1.0.0") # CORS app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173", "http://localhost:3000"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) security = HTTPBearer() # Dependency para obtener usuario actual def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ): token = credentials.credentials payload = decode_access_token(token) print(f"Token payload: {payload}") # Debug if payload is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token inválido o expirado" ) user_id = int(payload.get("sub")) print(f"Looking for user ID: {user_id}") # Debug user = db.query(models.User).filter(models.User.id == user_id).first() if user is None: print(f"User not found with ID: {user_id}") # Debug raise HTTPException(status_code=404, detail="Usuario no encontrado") return user # ============= AUTH ENDPOINTS ============= @app.post("/api/auth/register", response_model=schemas.User) def register(user: schemas.UserCreate, db: Session = Depends(get_db)): # Verificar si usuario existe db_user = db.query(models.User).filter(models.User.username == user.username).first() if db_user: raise HTTPException(status_code=400, detail="Usuario ya existe") # Crear usuario hashed_password = get_password_hash(user.password) db_user = models.User( username=user.username, email=user.email, full_name=user.full_name, role=user.role, password_hash=hashed_password ) db.add(db_user) db.commit() db.refresh(db_user) return db_user @app.post("/api/auth/login", response_model=schemas.Token) def login(user_login: schemas.UserLogin, db: Session = Depends(get_db)): user = db.query(models.User).filter(models.User.username == user_login.username).first() if not user or not verify_password(user_login.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Usuario o contraseña incorrectos" ) access_token = create_access_token(data={"sub": str(user.id), "role": user.role}) return { "access_token": access_token, "token_type": "bearer", "user": user } @app.get("/api/auth/me", response_model=schemas.User) def get_me(current_user: models.User = Depends(get_current_user)): return current_user # ============= USER ENDPOINTS ============= @app.get("/api/users", response_model=List[schemas.User]) def get_users( skip: int = 0, limit: int = 100, active_only: bool = True, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): # Solo admin puede ver todos los usuarios if current_user.role != "admin": raise HTTPException(status_code=403, detail="No tienes permisos para ver usuarios") query = db.query(models.User) if active_only: query = query.filter(models.User.is_active == True) return query.offset(skip).limit(limit).all() @app.get("/api/users/{user_id}", response_model=schemas.User) def get_user( user_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): # Solo admin puede ver otros usuarios if current_user.role != "admin" and current_user.id != user_id: raise HTTPException(status_code=403, detail="No tienes permisos para ver este usuario") user = db.query(models.User).filter(models.User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="Usuario no encontrado") return user @app.post("/api/users", response_model=schemas.User) def create_user( user: schemas.UserCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): # Solo admin puede crear usuarios if current_user.role != "admin": raise HTTPException(status_code=403, detail="No tienes permisos para crear usuarios") # Verificar si usuario existe db_user = db.query(models.User).filter(models.User.username == user.username).first() if db_user: raise HTTPException(status_code=400, detail="Usuario ya existe") # Verificar si email existe if user.email: db_email = db.query(models.User).filter(models.User.email == user.email).first() if db_email: raise HTTPException(status_code=400, detail="Email ya está en uso") # Crear usuario hashed_password = get_password_hash(user.password) db_user = models.User( username=user.username, email=user.email, full_name=user.full_name, role=user.role, password_hash=hashed_password, is_active=True ) db.add(db_user) db.commit() db.refresh(db_user) return db_user @app.put("/api/users/{user_id}", response_model=schemas.User) def update_user( user_id: int, user_update: schemas.UserUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): # Solo admin puede actualizar otros usuarios if current_user.role != "admin" and current_user.id != user_id: raise HTTPException(status_code=403, detail="No tienes permisos para actualizar este usuario") db_user = db.query(models.User).filter(models.User.id == user_id).first() if not db_user: raise HTTPException(status_code=404, detail="Usuario no encontrado") # Actualizar campos if user_update.email is not None: # Verificar si email está en uso existing = db.query(models.User).filter( models.User.email == user_update.email, models.User.id != user_id ).first() if existing: raise HTTPException(status_code=400, detail="Email ya está en uso") db_user.email = user_update.email if user_update.full_name is not None: db_user.full_name = user_update.full_name # Solo admin puede cambiar roles if user_update.role is not None: if current_user.role != "admin": raise HTTPException(status_code=403, detail="No tienes permisos para cambiar roles") db_user.role = user_update.role db.commit() db.refresh(db_user) return db_user @app.patch("/api/users/{user_id}/deactivate") def deactivate_user( user_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): # Solo admin puede inactivar usuarios if current_user.role != "admin": raise HTTPException(status_code=403, detail="No tienes permisos para inactivar usuarios") # No permitir auto-inactivación if current_user.id == user_id: raise HTTPException(status_code=400, detail="No puedes inactivar tu propio usuario") db_user = db.query(models.User).filter(models.User.id == user_id).first() if not db_user: raise HTTPException(status_code=404, detail="Usuario no encontrado") db_user.is_active = False db.commit() return {"message": "Usuario inactivado correctamente", "user_id": user_id} @app.patch("/api/users/{user_id}/activate") def activate_user( user_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): # Solo admin puede activar usuarios if current_user.role != "admin": raise HTTPException(status_code=403, detail="No tienes permisos para activar usuarios") db_user = db.query(models.User).filter(models.User.id == user_id).first() if not db_user: raise HTTPException(status_code=404, detail="Usuario no encontrado") db_user.is_active = True db.commit() return {"message": "Usuario activado correctamente", "user_id": user_id} @app.patch("/api/users/{user_id}/password") def change_user_password( user_id: int, password_update: schemas.AdminPasswordUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): # Solo admin puede cambiar contraseñas de otros usuarios if current_user.role != "admin": raise HTTPException(status_code=403, detail="No tienes permisos para cambiar contraseñas") db_user = db.query(models.User).filter(models.User.id == user_id).first() if not db_user: raise HTTPException(status_code=404, detail="Usuario no encontrado") # Cambiar contraseña db_user.password_hash = get_password_hash(password_update.new_password) db.commit() return {"message": "Contraseña actualizada correctamente"} @app.patch("/api/users/me/password") def change_my_password( password_update: schemas.UserPasswordUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): # Verificar contraseña actual if not verify_password(password_update.current_password, current_user.password_hash): raise HTTPException(status_code=400, detail="Contraseña actual incorrecta") # Cambiar contraseña current_user.password_hash = get_password_hash(password_update.new_password) db.commit() return {"message": "Contraseña actualizada correctamente"} @app.put("/api/users/me", response_model=schemas.User) def update_my_profile( user_update: schemas.UserUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): # Actualizar email if user_update.email is not None: # Verificar si email está en uso existing = db.query(models.User).filter( models.User.email == user_update.email, models.User.id != current_user.id ).first() if existing: raise HTTPException(status_code=400, detail="Email ya está en uso") current_user.email = user_update.email # Actualizar nombre if user_update.full_name is not None: current_user.full_name = user_update.full_name # No permitir cambio de rol desde perfil db.commit() db.refresh(current_user) return current_user # ============= CHECKLIST ENDPOINTS ============= @app.get("/api/checklists", response_model=List[schemas.Checklist]) def get_checklists( skip: int = 0, limit: int = 100, active_only: bool = False, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): query = db.query(models.Checklist) if active_only: query = query.filter(models.Checklist.is_active == True) return query.offset(skip).limit(limit).all() @app.get("/api/checklists/{checklist_id}", response_model=schemas.ChecklistWithQuestions) def get_checklist(checklist_id: int, db: Session = Depends(get_db)): checklist = db.query(models.Checklist).options( joinedload(models.Checklist.questions) ).filter(models.Checklist.id == checklist_id).first() if not checklist: raise HTTPException(status_code=404, detail="Checklist no encontrado") return checklist @app.post("/api/checklists", response_model=schemas.Checklist) def create_checklist( checklist: schemas.ChecklistCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): if current_user.role != "admin": raise HTTPException(status_code=403, detail="No autorizado") db_checklist = models.Checklist(**checklist.dict(), created_by=current_user.id) db.add(db_checklist) db.commit() db.refresh(db_checklist) return db_checklist @app.put("/api/checklists/{checklist_id}", response_model=schemas.Checklist) def update_checklist( checklist_id: int, checklist: schemas.ChecklistUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): if current_user.role != "admin": raise HTTPException(status_code=403, detail="No autorizado") db_checklist = db.query(models.Checklist).filter(models.Checklist.id == checklist_id).first() if not db_checklist: raise HTTPException(status_code=404, detail="Checklist no encontrado") for key, value in checklist.dict(exclude_unset=True).items(): setattr(db_checklist, key, value) db.commit() db.refresh(db_checklist) return db_checklist # ============= QUESTION ENDPOINTS ============= @app.post("/api/questions", response_model=schemas.Question) def create_question( question: schemas.QuestionCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): if current_user.role != "admin": raise HTTPException(status_code=403, detail="No autorizado") db_question = models.Question(**question.dict()) db.add(db_question) # Actualizar max_score del checklist checklist = db.query(models.Checklist).filter( models.Checklist.id == question.checklist_id ).first() if checklist: checklist.max_score += question.points db.commit() db.refresh(db_question) return db_question @app.put("/api/questions/{question_id}", response_model=schemas.Question) def update_question( question_id: int, question: schemas.QuestionUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): if current_user.role != "admin": raise HTTPException(status_code=403, detail="No autorizado") db_question = db.query(models.Question).filter(models.Question.id == question_id).first() if not db_question: raise HTTPException(status_code=404, detail="Pregunta no encontrada") for key, value in question.dict(exclude_unset=True).items(): setattr(db_question, key, value) db.commit() db.refresh(db_question) return db_question @app.delete("/api/questions/{question_id}") def delete_question( question_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): if current_user.role != "admin": raise HTTPException(status_code=403, detail="No autorizado") db_question = db.query(models.Question).filter(models.Question.id == question_id).first() if not db_question: raise HTTPException(status_code=404, detail="Pregunta no encontrada") db.delete(db_question) db.commit() return {"message": "Pregunta eliminada"} # ============= INSPECTION ENDPOINTS ============= @app.get("/api/inspections", response_model=List[schemas.Inspection]) def get_inspections( skip: int = 0, limit: int = 100, vehicle_plate: str = None, status: str = None, show_inactive: bool = False, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): query = db.query(models.Inspection) # Por defecto, solo mostrar inspecciones activas if not show_inactive: query = query.filter(models.Inspection.is_active == True) # Mecánicos solo ven sus inspecciones if current_user.role == "mechanic": query = query.filter(models.Inspection.mechanic_id == current_user.id) if vehicle_plate: query = query.filter(models.Inspection.vehicle_plate.contains(vehicle_plate)) if status: query = query.filter(models.Inspection.status == status) return query.order_by(models.Inspection.created_at.desc()).offset(skip).limit(limit).all() @app.get("/api/inspections/{inspection_id}", response_model=schemas.InspectionDetail) def get_inspection( inspection_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): inspection = db.query(models.Inspection).options( joinedload(models.Inspection.checklist).joinedload(models.Checklist.questions), joinedload(models.Inspection.mechanic), joinedload(models.Inspection.answers).joinedload(models.Answer.question), joinedload(models.Inspection.answers).joinedload(models.Answer.media_files) ).filter(models.Inspection.id == inspection_id).first() if not inspection: raise HTTPException(status_code=404, detail="Inspección no encontrada") return inspection @app.post("/api/inspections", response_model=schemas.Inspection) def create_inspection( inspection: schemas.InspectionCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): # Obtener max_score del checklist checklist = db.query(models.Checklist).filter( models.Checklist.id == inspection.checklist_id ).first() if not checklist: raise HTTPException(status_code=404, detail="Checklist no encontrado") db_inspection = models.Inspection( **inspection.dict(), mechanic_id=current_user.id, max_score=checklist.max_score ) db.add(db_inspection) db.commit() db.refresh(db_inspection) return db_inspection @app.put("/api/inspections/{inspection_id}", response_model=schemas.Inspection) def update_inspection( inspection_id: int, inspection: schemas.InspectionUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): db_inspection = db.query(models.Inspection).filter( models.Inspection.id == inspection_id ).first() if not db_inspection: raise HTTPException(status_code=404, detail="Inspección no encontrada") for key, value in inspection.dict(exclude_unset=True).items(): setattr(db_inspection, key, value) db.commit() db.refresh(db_inspection) return db_inspection @app.post("/api/inspections/{inspection_id}/complete", response_model=schemas.Inspection) def complete_inspection( inspection_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): inspection = db.query(models.Inspection).filter( models.Inspection.id == inspection_id ).first() if not inspection: raise HTTPException(status_code=404, detail="Inspección no encontrada") # Calcular score answers = db.query(models.Answer).filter(models.Answer.inspection_id == inspection_id).all() total_score = sum(a.points_earned for a in answers) flagged_count = sum(1 for a in answers if a.is_flagged) inspection.score = total_score inspection.percentage = (total_score / inspection.max_score * 100) if inspection.max_score > 0 else 0 inspection.flagged_items_count = flagged_count inspection.status = "completed" inspection.completed_at = datetime.utcnow() db.commit() db.refresh(inspection) return inspection @app.patch("/api/inspections/{inspection_id}/deactivate") def deactivate_inspection( inspection_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): # Solo admin puede inactivar if current_user.role != "admin": raise HTTPException(status_code=403, detail="No tienes permisos para inactivar inspecciones") inspection = db.query(models.Inspection).filter( models.Inspection.id == inspection_id ).first() if not inspection: raise HTTPException(status_code=404, detail="Inspección no encontrada") inspection.is_active = False inspection.status = "inactive" db.commit() db.refresh(inspection) return {"message": "Inspección inactivada correctamente", "inspection_id": inspection_id} # ============= ANSWER ENDPOINTS ============= @app.post("/api/answers", response_model=schemas.Answer) def create_answer( answer: schemas.AnswerCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): # Obtener la pregunta para saber los puntos question = db.query(models.Question).filter(models.Question.id == answer.question_id).first() if not question: raise HTTPException(status_code=404, detail="Pregunta no encontrada") # Calcular puntos según status points_earned = 0 if answer.status == "ok": points_earned = question.points elif answer.status == "warning": points_earned = int(question.points * 0.5) db_answer = models.Answer( **answer.dict(), points_earned=points_earned ) db.add(db_answer) db.commit() db.refresh(db_answer) return db_answer @app.put("/api/answers/{answer_id}", response_model=schemas.Answer) def update_answer( answer_id: int, answer: schemas.AnswerUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): db_answer = db.query(models.Answer).filter(models.Answer.id == answer_id).first() if not db_answer: raise HTTPException(status_code=404, detail="Respuesta no encontrada") # Recalcular puntos si cambió el status if answer.status and answer.status != db_answer.status: question = db.query(models.Question).filter( models.Question.id == db_answer.question_id ).first() if answer.status == "ok": db_answer.points_earned = question.points elif answer.status == "warning": db_answer.points_earned = int(question.points * 0.5) else: db_answer.points_earned = 0 for key, value in answer.dict(exclude_unset=True).items(): setattr(db_answer, key, value) db.commit() db.refresh(db_answer) return db_answer # ============= MEDIA FILE ENDPOINTS ============= @app.post("/api/answers/{answer_id}/upload", response_model=schemas.MediaFile) async def upload_photo( answer_id: int, file: UploadFile = File(...), db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): # Verificar que la respuesta existe answer = db.query(models.Answer).filter(models.Answer.id == answer_id).first() if not answer: raise HTTPException(status_code=404, detail="Respuesta no encontrada") # Crear directorio si no existe upload_dir = f"uploads/inspection_{answer.inspection_id}" os.makedirs(upload_dir, exist_ok=True) # Guardar archivo file_extension = file.filename.split(".")[-1] file_name = f"answer_{answer_id}_{datetime.now().timestamp()}.{file_extension}" file_path = os.path.join(upload_dir, file_name) with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Crear registro en BD media_file = models.MediaFile( answer_id=answer_id, file_path=file_path, file_type="image" ) db.add(media_file) db.commit() db.refresh(media_file) return media_file # ============= AI ANALYSIS ============= @app.get("/api/ai/models", response_model=List[schemas.AIModelInfo]) def get_available_ai_models(current_user: models.User = Depends(get_current_user)): """Obtener lista de modelos de IA disponibles""" if current_user.role != "admin": raise HTTPException(status_code=403, detail="Solo administradores pueden ver modelos de IA") models_list = [ # OpenAI Models { "id": "gpt-4o", "name": "GPT-4o (Recomendado)", "provider": "openai", "description": "Modelo multimodal más avanzado de OpenAI, rápido y preciso para análisis de imágenes" }, { "id": "gpt-4o-mini", "name": "GPT-4o Mini", "provider": "openai", "description": "Versión compacta y económica de GPT-4o, ideal para análisis rápidos" }, { "id": "gpt-4-turbo", "name": "GPT-4 Turbo", "provider": "openai", "description": "Modelo potente con capacidades de visión y contexto amplio" }, { "id": "gpt-4-vision-preview", "name": "GPT-4 Vision (Preview)", "provider": "openai", "description": "Modelo específico para análisis de imágenes (versión previa)" }, # Gemini Models - Actualizados a versiones 2.0, 2.5 y 3.0 { "id": "gemini-3-pro-preview", "name": "Gemini 3 Pro Preview (Último)", "provider": "gemini", "description": "Modelo de próxima generación en preview, máxima capacidad de análisis" }, { "id": "gemini-2.5-pro", "name": "Gemini 2.5 Pro (Recomendado)", "provider": "gemini", "description": "Último modelo estable con excelente análisis visual y razonamiento avanzado" }, { "id": "gemini-2.5-flash", "name": "Gemini 2.5 Flash", "provider": "gemini", "description": "Versión rápida del 2.5, ideal para inspecciones en tiempo real" }, { "id": "gemini-2.0-flash", "name": "Gemini 2.0 Flash", "provider": "gemini", "description": "Modelo rápido y eficiente de la generación 2.0" }, { "id": "gemini-1.5-pro-latest", "name": "Gemini 1.5 Pro Latest", "provider": "gemini", "description": "Versión estable 1.5 con contexto de 2M tokens" }, { "id": "gemini-1.5-flash-latest", "name": "Gemini 1.5 Flash Latest", "provider": "gemini", "description": "Modelo 1.5 rápido para análisis básicos" } ] return models_list @app.get("/api/ai/configuration", response_model=schemas.AIConfiguration) def get_ai_configuration( db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): """Obtener configuración de IA actual""" if current_user.role != "admin": raise HTTPException(status_code=403, detail="Solo administradores pueden ver configuración de IA") config = db.query(models.AIConfiguration).filter( models.AIConfiguration.is_active == True ).first() if not config: raise HTTPException(status_code=404, detail="No hay configuración de IA activa") return config @app.post("/api/ai/configuration", response_model=schemas.AIConfiguration) def create_ai_configuration( config: schemas.AIConfigurationCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): """Crear o actualizar configuración de IA""" if current_user.role != "admin": raise HTTPException(status_code=403, detail="Solo administradores pueden configurar IA") # Desactivar configuraciones anteriores db.query(models.AIConfiguration).update({"is_active": False}) # Crear nueva configuración new_config = models.AIConfiguration( provider=config.provider, api_key=config.api_key, model_name=config.model_name, is_active=True ) db.add(new_config) db.commit() db.refresh(new_config) return new_config @app.put("/api/ai/configuration/{config_id}", response_model=schemas.AIConfiguration) def update_ai_configuration( config_id: int, config_update: schemas.AIConfigurationUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): """Actualizar configuración de IA existente""" if current_user.role != "admin": raise HTTPException(status_code=403, detail="Solo administradores pueden actualizar configuración de IA") config = db.query(models.AIConfiguration).filter( models.AIConfiguration.id == config_id ).first() if not config: raise HTTPException(status_code=404, detail="Configuración no encontrada") # Actualizar campos for key, value in config_update.dict(exclude_unset=True).items(): setattr(config, key, value) db.commit() db.refresh(config) return config @app.delete("/api/ai/configuration/{config_id}") def delete_ai_configuration( config_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): """Eliminar configuración de IA""" if current_user.role != "admin": raise HTTPException(status_code=403, detail="Solo administradores pueden eliminar configuración de IA") config = db.query(models.AIConfiguration).filter( models.AIConfiguration.id == config_id ).first() if not config: raise HTTPException(status_code=404, detail="Configuración no encontrada") db.delete(config) db.commit() return {"message": "Configuración eliminada correctamente"} @app.post("/api/analyze-image") async def analyze_image( file: UploadFile = File(...), question_id: int = None, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user) ): """ Analiza una imagen usando IA para sugerir respuestas Usa la configuración de IA activa (OpenAI o Gemini) """ # Obtener configuración de IA activa ai_config = db.query(models.AIConfiguration).filter( models.AIConfiguration.is_active == True ).first() if not ai_config: return { "status": "disabled", "message": "No hay configuración de IA activa. Configure en Settings." } # Guardar imagen temporalmente import base64 contents = await file.read() image_b64 = base64.b64encode(contents).decode('utf-8') # Obtener contexto de la pregunta si se proporciona question_obj = None if question_id: question_obj = db.query(models.Question).filter(models.Question.id == question_id).first() try: # Construir prompt dinámico basado en la pregunta específica if question_obj: # Prompt altamente específico para la pregunta question_text = question_obj.text question_type = question_obj.type section = question_obj.section system_prompt = f"""Eres un mecánico experto realizando una inspección vehicular. PREGUNTA ESPECÍFICA A RESPONDER: "{question_text}" Sección: {section} Analiza la imagen ÚNICAMENTE para responder esta pregunta específica. Sé directo y enfócate solo en lo que la pregunta solicita. Responde en formato JSON: {{ "status": "ok|minor|critical", "observations": "Respuesta específica a: {question_text}", "recommendation": "Acción si aplica", "confidence": 0.0-1.0 }} IMPORTANTE: - Responde SOLO lo que la pregunta pide - No des información genérica del vehículo - Sé específico y técnico - Si la pregunta es pass/fail, indica claramente si pasa o falla - Si la pregunta es bueno/regular/malo, indica el estado específico del componente""" user_message = f"Inspecciona la imagen y responde específicamente: {question_text}" else: # Fallback para análisis general system_prompt = """Eres un experto mecánico automotriz. Analiza la imagen y proporciona: 1. Estado del componente (bueno/regular/malo) 2. Nivel de criticidad (ok/minor/critical) 3. Observaciones técnicas breves 4. Recomendación de acción Responde en formato JSON: { "status": "ok|minor|critical", "observations": "descripción técnica", "recommendation": "acción sugerida", "confidence": 0.0-1.0 }""" user_message = "Analiza este componente del vehículo para la inspección general." if ai_config.provider == "openai": import openai openai.api_key = ai_config.api_key response = openai.ChatCompletion.create( model=ai_config.model_name, messages=[ {"role": "system", "content": system_prompt}, { "role": "user", "content": [ { "type": "text", "text": user_message }, { "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"} } ] } ], max_tokens=500 ) ai_response = response.choices[0].message.content elif ai_config.provider == "gemini": import google.generativeai as genai from PIL import Image from io import BytesIO genai.configure(api_key=ai_config.api_key) model = genai.GenerativeModel(ai_config.model_name) # Convertir base64 a imagen PIL image = Image.open(BytesIO(contents)) prompt = f"{system_prompt}\n\n{user_message}" response = model.generate_content([prompt, image]) ai_response = response.text else: return { "success": False, "error": f"Provider {ai_config.provider} no soportado" } # Intentar parsear como JSON, si falla, usar texto plano try: import json import re # Limpiar markdown code blocks si existen cleaned_response = ai_response.strip() # Remover ```json ... ``` si existe if cleaned_response.startswith('```'): # Extraer contenido entre ``` markers match = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', cleaned_response, re.DOTALL) if match: cleaned_response = match.group(1).strip() analysis = json.loads(cleaned_response) except: # Si no es JSON válido, crear estructura básica analysis = { "status": "ok", "observations": ai_response, "recommendation": "Revisar manualmente", "confidence": 0.7 } return { "success": True, "analysis": analysis, "raw_response": ai_response, "model": ai_config.model_name, "provider": ai_config.provider } except Exception as e: print(f"Error en análisis AI: {e}") import traceback traceback.print_exc() return { "success": False, "error": str(e), "message": "Error analyzing image with AI. Please check AI configuration in Settings." } try: import openai openai.api_key = settings.OPENAI_API_KEY # Prompt especializado para inspección vehicular system_prompt = """Eres un experto mecánico automotriz. Analiza la imagen y proporciona: 1. Estado del componente (bueno/regular/malo) 2. Nivel de criticidad (ok/minor/critical) 3. Observaciones técnicas breves 4. Recomendación de acción Responde en formato JSON: { "status": "ok|minor|critical", "observations": "descripción técnica", "recommendation": "acción sugerida", "confidence": 0.0-1.0 }""" response = openai.ChatCompletion.create( model="gpt-4-vision-preview" if "gpt-4" in str(settings.OPENAI_API_KEY) else "gpt-4o", messages=[ { "role": "system", "content": system_prompt }, { "role": "user", "content": [ { "type": "text", "text": f"Analiza este componente del vehículo.\n{question_context}" }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_b64}" } } ] } ], max_tokens=500 ) ai_response = response.choices[0].message.content # Intentar parsear como JSON, si falla, usar texto plano try: import json analysis = json.loads(ai_response) except: # Si no es JSON válido, crear estructura básica analysis = { "status": "ok", "observations": ai_response, "recommendation": "Revisar manualmente", "confidence": 0.7 } return { "success": True, "analysis": analysis, "raw_response": ai_response, "model": "gpt-4-vision" } except Exception as e: print(f"Error en análisis AI: {e}") return { "success": False, "error": str(e), "message": "Error analyzing image with AI" } # ============= HEALTH CHECK ============= @app.get("/") def root(): return {"message": "Checklist Inteligente API", "version": "1.0.0", "status": "running"} @app.get("/health") def health_check(): return {"status": "healthy"}