Base Principal del Proyecto

This commit is contained in:
2025-11-18 13:09:42 -03:00
commit be30b3ca18
24 changed files with 2018 additions and 0 deletions

1
backend/app/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Backend app package

View File

@@ -0,0 +1 @@
# Core package

View File

@@ -0,0 +1,27 @@
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
# Database
DATABASE_URL: str
# Security
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 10080 # 7 días
# OpenAI
OPENAI_API_KEY: Optional[str] = None
# Environment
ENVIRONMENT: str = "development"
# Uploads
UPLOAD_DIR: str = "uploads"
MAX_FILE_SIZE: int = 10 * 1024 * 1024 # 10MB
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()

View File

@@ -0,0 +1,21 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
engine = create_engine(
settings.DATABASE_URL,
pool_pre_ping=True,
echo=settings.ENVIRONMENT == "development"
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

View File

@@ -0,0 +1,30 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_access_token(token: str):
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
return None

454
backend/app/main.py Normal file
View File

@@ -0,0 +1,454 @@
from fastapi import FastAPI, Depends, HTTPException, status, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session, joinedload
from typing import List
import os
import shutil
from datetime import datetime, timedelta
from app.core.database import engine, get_db, Base
from app.core.security import verify_password, get_password_hash, create_access_token, decode_access_token
from app import models, schemas
# Crear tablas
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Checklist Inteligente API", version="1.0.0")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
security = HTTPBearer()
# Dependency para obtener usuario actual
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
token = credentials.credentials
payload = decode_access_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido o expirado"
)
user = db.query(models.User).filter(models.User.id == payload.get("sub")).first()
if user is None:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
return user
# ============= AUTH ENDPOINTS =============
@app.post("/api/auth/register", response_model=schemas.User)
def register(user: schemas.UserCreate, db: Session = Depends(get_db)):
# Verificar si usuario existe
db_user = db.query(models.User).filter(models.User.username == user.username).first()
if db_user:
raise HTTPException(status_code=400, detail="Usuario ya existe")
# Crear usuario
hashed_password = get_password_hash(user.password)
db_user = models.User(
username=user.username,
email=user.email,
full_name=user.full_name,
role=user.role,
password_hash=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.post("/api/auth/login", response_model=schemas.Token)
def login(user_login: schemas.UserLogin, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.username == user_login.username).first()
if not user or not verify_password(user_login.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Usuario o contraseña incorrectos"
)
access_token = create_access_token(data={"sub": user.id, "role": user.role})
return {
"access_token": access_token,
"token_type": "bearer",
"user": user
}
@app.get("/api/auth/me", response_model=schemas.User)
def get_me(current_user: models.User = Depends(get_current_user)):
return current_user
# ============= CHECKLIST ENDPOINTS =============
@app.get("/api/checklists", response_model=List[schemas.Checklist])
def get_checklists(
skip: int = 0,
limit: int = 100,
active_only: bool = False,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
query = db.query(models.Checklist)
if active_only:
query = query.filter(models.Checklist.is_active == True)
return query.offset(skip).limit(limit).all()
@app.get("/api/checklists/{checklist_id}", response_model=schemas.ChecklistWithQuestions)
def get_checklist(checklist_id: int, db: Session = Depends(get_db)):
checklist = db.query(models.Checklist).options(
joinedload(models.Checklist.questions)
).filter(models.Checklist.id == checklist_id).first()
if not checklist:
raise HTTPException(status_code=404, detail="Checklist no encontrado")
return checklist
@app.post("/api/checklists", response_model=schemas.Checklist)
def create_checklist(
checklist: schemas.ChecklistCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No autorizado")
db_checklist = models.Checklist(**checklist.dict(), created_by=current_user.id)
db.add(db_checklist)
db.commit()
db.refresh(db_checklist)
return db_checklist
@app.put("/api/checklists/{checklist_id}", response_model=schemas.Checklist)
def update_checklist(
checklist_id: int,
checklist: schemas.ChecklistUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No autorizado")
db_checklist = db.query(models.Checklist).filter(models.Checklist.id == checklist_id).first()
if not db_checklist:
raise HTTPException(status_code=404, detail="Checklist no encontrado")
for key, value in checklist.dict(exclude_unset=True).items():
setattr(db_checklist, key, value)
db.commit()
db.refresh(db_checklist)
return db_checklist
# ============= QUESTION ENDPOINTS =============
@app.post("/api/questions", response_model=schemas.Question)
def create_question(
question: schemas.QuestionCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No autorizado")
db_question = models.Question(**question.dict())
db.add(db_question)
# Actualizar max_score del checklist
checklist = db.query(models.Checklist).filter(
models.Checklist.id == question.checklist_id
).first()
if checklist:
checklist.max_score += question.points
db.commit()
db.refresh(db_question)
return db_question
@app.put("/api/questions/{question_id}", response_model=schemas.Question)
def update_question(
question_id: int,
question: schemas.QuestionUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No autorizado")
db_question = db.query(models.Question).filter(models.Question.id == question_id).first()
if not db_question:
raise HTTPException(status_code=404, detail="Pregunta no encontrada")
for key, value in question.dict(exclude_unset=True).items():
setattr(db_question, key, value)
db.commit()
db.refresh(db_question)
return db_question
@app.delete("/api/questions/{question_id}")
def delete_question(
question_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No autorizado")
db_question = db.query(models.Question).filter(models.Question.id == question_id).first()
if not db_question:
raise HTTPException(status_code=404, detail="Pregunta no encontrada")
db.delete(db_question)
db.commit()
return {"message": "Pregunta eliminada"}
# ============= INSPECTION ENDPOINTS =============
@app.get("/api/inspections", response_model=List[schemas.Inspection])
def get_inspections(
skip: int = 0,
limit: int = 100,
vehicle_plate: str = None,
status: str = None,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
query = db.query(models.Inspection)
# Mecánicos solo ven sus inspecciones
if current_user.role == "mechanic":
query = query.filter(models.Inspection.mechanic_id == current_user.id)
if vehicle_plate:
query = query.filter(models.Inspection.vehicle_plate.contains(vehicle_plate))
if status:
query = query.filter(models.Inspection.status == status)
return query.order_by(models.Inspection.created_at.desc()).offset(skip).limit(limit).all()
@app.get("/api/inspections/{inspection_id}", response_model=schemas.InspectionDetail)
def get_inspection(
inspection_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
inspection = db.query(models.Inspection).options(
joinedload(models.Inspection.checklist),
joinedload(models.Inspection.mechanic),
joinedload(models.Inspection.answers).joinedload(models.Answer.question),
joinedload(models.Inspection.answers).joinedload(models.Answer.media_files)
).filter(models.Inspection.id == inspection_id).first()
if not inspection:
raise HTTPException(status_code=404, detail="Inspección no encontrada")
return inspection
@app.post("/api/inspections", response_model=schemas.Inspection)
def create_inspection(
inspection: schemas.InspectionCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Obtener max_score del checklist
checklist = db.query(models.Checklist).filter(
models.Checklist.id == inspection.checklist_id
).first()
if not checklist:
raise HTTPException(status_code=404, detail="Checklist no encontrado")
db_inspection = models.Inspection(
**inspection.dict(),
mechanic_id=current_user.id,
max_score=checklist.max_score
)
db.add(db_inspection)
db.commit()
db.refresh(db_inspection)
return db_inspection
@app.put("/api/inspections/{inspection_id}", response_model=schemas.Inspection)
def update_inspection(
inspection_id: int,
inspection: schemas.InspectionUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
db_inspection = db.query(models.Inspection).filter(
models.Inspection.id == inspection_id
).first()
if not db_inspection:
raise HTTPException(status_code=404, detail="Inspección no encontrada")
for key, value in inspection.dict(exclude_unset=True).items():
setattr(db_inspection, key, value)
db.commit()
db.refresh(db_inspection)
return db_inspection
@app.post("/api/inspections/{inspection_id}/complete", response_model=schemas.Inspection)
def complete_inspection(
inspection_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
inspection = db.query(models.Inspection).filter(
models.Inspection.id == inspection_id
).first()
if not inspection:
raise HTTPException(status_code=404, detail="Inspección no encontrada")
# Calcular score
answers = db.query(models.Answer).filter(models.Answer.inspection_id == inspection_id).all()
total_score = sum(a.points_earned for a in answers)
flagged_count = sum(1 for a in answers if a.is_flagged)
inspection.score = total_score
inspection.percentage = (total_score / inspection.max_score * 100) if inspection.max_score > 0 else 0
inspection.flagged_items_count = flagged_count
inspection.status = "completed"
inspection.completed_at = datetime.utcnow()
db.commit()
db.refresh(inspection)
return inspection
# ============= ANSWER ENDPOINTS =============
@app.post("/api/answers", response_model=schemas.Answer)
def create_answer(
answer: schemas.AnswerCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Obtener la pregunta para saber los puntos
question = db.query(models.Question).filter(models.Question.id == answer.question_id).first()
if not question:
raise HTTPException(status_code=404, detail="Pregunta no encontrada")
# Calcular puntos según status
points_earned = 0
if answer.status == "ok":
points_earned = question.points
elif answer.status == "warning":
points_earned = int(question.points * 0.5)
db_answer = models.Answer(
**answer.dict(),
points_earned=points_earned
)
db.add(db_answer)
db.commit()
db.refresh(db_answer)
return db_answer
@app.put("/api/answers/{answer_id}", response_model=schemas.Answer)
def update_answer(
answer_id: int,
answer: schemas.AnswerUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
db_answer = db.query(models.Answer).filter(models.Answer.id == answer_id).first()
if not db_answer:
raise HTTPException(status_code=404, detail="Respuesta no encontrada")
# Recalcular puntos si cambió el status
if answer.status and answer.status != db_answer.status:
question = db.query(models.Question).filter(
models.Question.id == db_answer.question_id
).first()
if answer.status == "ok":
db_answer.points_earned = question.points
elif answer.status == "warning":
db_answer.points_earned = int(question.points * 0.5)
else:
db_answer.points_earned = 0
for key, value in answer.dict(exclude_unset=True).items():
setattr(db_answer, key, value)
db.commit()
db.refresh(db_answer)
return db_answer
# ============= MEDIA FILE ENDPOINTS =============
@app.post("/api/answers/{answer_id}/upload", response_model=schemas.MediaFile)
async def upload_photo(
answer_id: int,
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Verificar que la respuesta existe
answer = db.query(models.Answer).filter(models.Answer.id == answer_id).first()
if not answer:
raise HTTPException(status_code=404, detail="Respuesta no encontrada")
# Crear directorio si no existe
upload_dir = f"uploads/inspection_{answer.inspection_id}"
os.makedirs(upload_dir, exist_ok=True)
# Guardar archivo
file_extension = file.filename.split(".")[-1]
file_name = f"answer_{answer_id}_{datetime.now().timestamp()}.{file_extension}"
file_path = os.path.join(upload_dir, file_name)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Crear registro en BD
media_file = models.MediaFile(
answer_id=answer_id,
file_path=file_path,
file_type="image"
)
db.add(media_file)
db.commit()
db.refresh(media_file)
return media_file
# ============= HEALTH CHECK =============
@app.get("/")
def root():
return {"message": "Checklist Inteligente API", "version": "1.0.0", "status": "running"}
@app.get("/health")
def health_check():
return {"status": "healthy"}

146
backend/app/models.py Normal file
View File

@@ -0,0 +1,146 @@
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text, JSON, Float
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.core.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True)
password_hash = Column(String(255), nullable=False)
role = Column(String(20), nullable=False) # admin, mechanic
full_name = Column(String(100))
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationships
checklists_created = relationship("Checklist", back_populates="creator")
inspections = relationship("Inspection", back_populates="mechanic")
class Checklist(Base):
__tablename__ = "checklists"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(200), nullable=False)
description = Column(Text)
ai_mode = Column(String(20), default="off") # off, assisted, copilot
scoring_enabled = Column(Boolean, default=True)
max_score = Column(Integer, default=0)
logo_url = Column(String(500))
is_active = Column(Boolean, default=True)
created_by = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
creator = relationship("User", back_populates="checklists_created")
questions = relationship("Question", back_populates="checklist", cascade="all, delete-orphan")
inspections = relationship("Inspection", back_populates="checklist")
class Question(Base):
__tablename__ = "questions"
id = Column(Integer, primary_key=True, index=True)
checklist_id = Column(Integer, ForeignKey("checklists.id"), nullable=False)
section = Column(String(100)) # Sistema eléctrico, Frenos, etc
text = Column(Text, nullable=False)
type = Column(String(30), nullable=False) # pass_fail, good_bad, text, etc
points = Column(Integer, default=1)
options = Column(JSON) # Para multiple choice
order = Column(Integer, default=0)
allow_photos = Column(Boolean, default=True)
max_photos = Column(Integer, default=3)
requires_comment_on_fail = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationships
checklist = relationship("Checklist", back_populates="questions")
answers = relationship("Answer", back_populates="question")
class Inspection(Base):
__tablename__ = "inspections"
id = Column(Integer, primary_key=True, index=True)
checklist_id = Column(Integer, ForeignKey("checklists.id"), nullable=False)
mechanic_id = Column(Integer, ForeignKey("users.id"), nullable=False)
# Datos de la OR
or_number = Column(String(50))
work_order_number = Column(String(50))
# Datos del vehículo
vehicle_plate = Column(String(20), nullable=False, index=True)
vehicle_brand = Column(String(50))
vehicle_model = Column(String(100))
vehicle_km = Column(Integer)
client_name = Column(String(200))
# Scoring
score = Column(Integer, default=0)
max_score = Column(Integer, default=0)
percentage = Column(Float, default=0.0)
flagged_items_count = Column(Integer, default=0)
# Estado
status = Column(String(20), default="draft") # draft, completed
# Firma
signature_data = Column(Text) # Base64 de la firma
signed_at = Column(DateTime(timezone=True))
# Timestamps
started_at = Column(DateTime(timezone=True), server_default=func.now())
completed_at = Column(DateTime(timezone=True))
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
checklist = relationship("Checklist", back_populates="inspections")
mechanic = relationship("User", back_populates="inspections")
answers = relationship("Answer", back_populates="inspection", cascade="all, delete-orphan")
class Answer(Base):
__tablename__ = "answers"
id = Column(Integer, primary_key=True, index=True)
inspection_id = Column(Integer, ForeignKey("inspections.id"), nullable=False)
question_id = Column(Integer, ForeignKey("questions.id"), nullable=False)
answer_value = Column(Text) # La respuesta del mecánico
status = Column(String(20), default="ok") # ok, warning, critical, info
points_earned = Column(Integer, default=0)
comment = Column(Text) # Comentarios adicionales
ai_analysis = Column(JSON) # Análisis de IA si aplica
is_flagged = Column(Boolean, default=False) # Si requiere atención
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
inspection = relationship("Inspection", back_populates="answers")
question = relationship("Question", back_populates="answers")
media_files = relationship("MediaFile", back_populates="answer", cascade="all, delete-orphan")
class MediaFile(Base):
__tablename__ = "media_files"
id = Column(Integer, primary_key=True, index=True)
answer_id = Column(Integer, ForeignKey("answers.id"), nullable=False)
file_path = Column(String(500), nullable=False)
file_type = Column(String(20), default="image") # image, video
caption = Column(Text)
order = Column(Integer, default=0)
uploaded_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationships
answer = relationship("Answer", back_populates="media_files")

177
backend/app/schemas.py Normal file
View File

@@ -0,0 +1,177 @@
from pydantic import BaseModel, EmailStr, Field
from typing import Optional, List
from datetime import datetime
# User Schemas
class UserBase(BaseModel):
username: str
email: Optional[EmailStr] = None
full_name: Optional[str] = None
role: str = "mechanic"
class UserCreate(UserBase):
password: str
class UserLogin(BaseModel):
username: str
password: str
class User(UserBase):
id: int
is_active: bool
created_at: datetime
class Config:
from_attributes = True
class Token(BaseModel):
access_token: str
token_type: str
user: User
# Checklist Schemas
class ChecklistBase(BaseModel):
name: str
description: Optional[str] = None
ai_mode: str = "off"
scoring_enabled: bool = True
logo_url: Optional[str] = None
class ChecklistCreate(ChecklistBase):
pass
class ChecklistUpdate(ChecklistBase):
is_active: Optional[bool] = None
class Checklist(ChecklistBase):
id: int
max_score: int
is_active: bool
created_by: int
created_at: datetime
class Config:
from_attributes = True
# Question Schemas
class QuestionBase(BaseModel):
section: Optional[str] = None
text: str
type: str
points: int = 1
options: Optional[dict] = None
order: int = 0
allow_photos: bool = True
max_photos: int = 3
requires_comment_on_fail: bool = False
class QuestionCreate(QuestionBase):
checklist_id: int
class QuestionUpdate(QuestionBase):
pass
class Question(QuestionBase):
id: int
checklist_id: int
created_at: datetime
class Config:
from_attributes = True
# Inspection Schemas
class InspectionBase(BaseModel):
or_number: Optional[str] = None
work_order_number: Optional[str] = None
vehicle_plate: str
vehicle_brand: Optional[str] = None
vehicle_model: Optional[str] = None
vehicle_km: Optional[int] = None
client_name: Optional[str] = None
class InspectionCreate(InspectionBase):
checklist_id: int
class InspectionUpdate(BaseModel):
vehicle_brand: Optional[str] = None
vehicle_model: Optional[str] = None
vehicle_km: Optional[int] = None
signature_data: Optional[str] = None
status: Optional[str] = None
class Inspection(InspectionBase):
id: int
checklist_id: int
mechanic_id: int
score: int
max_score: int
percentage: float
flagged_items_count: int
status: str
started_at: datetime
completed_at: Optional[datetime] = None
class Config:
from_attributes = True
# Answer Schemas
class AnswerBase(BaseModel):
answer_value: str
status: str = "ok"
comment: Optional[str] = None
is_flagged: bool = False
class AnswerCreate(AnswerBase):
inspection_id: int
question_id: int
class AnswerUpdate(AnswerBase):
pass
class Answer(AnswerBase):
id: int
inspection_id: int
question_id: int
points_earned: int
ai_analysis: Optional[dict] = None
created_at: datetime
class Config:
from_attributes = True
# MediaFile Schemas
class MediaFileBase(BaseModel):
caption: Optional[str] = None
order: int = 0
class MediaFileCreate(MediaFileBase):
file_type: str = "image"
class MediaFile(MediaFileBase):
id: int
answer_id: int
file_path: str
file_type: str
uploaded_at: datetime
class Config:
from_attributes = True
# Response Schemas
class ChecklistWithQuestions(Checklist):
questions: List[Question] = []
class InspectionDetail(Inspection):
checklist: Checklist
mechanic: User
answers: List[Answer] = []
class AnswerWithMedia(Answer):
media_files: List[MediaFile] = []
question: Question