modulo usuarios

This commit is contained in:
2025-11-19 02:30:22 -03:00
parent be10a888fb
commit ffe298a544
5 changed files with 336 additions and 22 deletions

View File

@@ -1,5 +1,4 @@
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
# Database
@@ -11,7 +10,7 @@ class Settings(BaseSettings):
ACCESS_TOKEN_EXPIRE_MINUTES: int = 10080 # 7 días
# OpenAI
OPENAI_API_KEY: Optional[str] = None
OPENAI_API_KEY: str | None = None
# Environment
ENVIRONMENT: str = "development"

View File

@@ -96,6 +96,230 @@ def get_me(current_user: models.User = Depends(get_current_user)):
return current_user
# ============= USER ENDPOINTS =============
@app.get("/api/users", response_model=List[schemas.User])
def get_users(
skip: int = 0,
limit: int = 100,
active_only: bool = True,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede ver todos los usuarios
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para ver usuarios")
query = db.query(models.User)
if active_only:
query = query.filter(models.User.is_active == True)
return query.offset(skip).limit(limit).all()
@app.get("/api/users/{user_id}", response_model=schemas.User)
def get_user(
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede ver otros usuarios
if current_user.role != "admin" and current_user.id != user_id:
raise HTTPException(status_code=403, detail="No tienes permisos para ver este usuario")
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
return user
@app.post("/api/users", response_model=schemas.User)
def create_user(
user: schemas.UserCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede crear usuarios
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para crear usuarios")
# Verificar si usuario existe
db_user = db.query(models.User).filter(models.User.username == user.username).first()
if db_user:
raise HTTPException(status_code=400, detail="Usuario ya existe")
# Verificar si email existe
if user.email:
db_email = db.query(models.User).filter(models.User.email == user.email).first()
if db_email:
raise HTTPException(status_code=400, detail="Email ya está en uso")
# Crear usuario
hashed_password = get_password_hash(user.password)
db_user = models.User(
username=user.username,
email=user.email,
full_name=user.full_name,
role=user.role,
password_hash=hashed_password,
is_active=True
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.put("/api/users/{user_id}", response_model=schemas.User)
def update_user(
user_id: int,
user_update: schemas.UserUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede actualizar otros usuarios
if current_user.role != "admin" and current_user.id != user_id:
raise HTTPException(status_code=403, detail="No tienes permisos para actualizar este usuario")
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
# Actualizar campos
if user_update.email is not None:
# Verificar si email está en uso
existing = db.query(models.User).filter(
models.User.email == user_update.email,
models.User.id != user_id
).first()
if existing:
raise HTTPException(status_code=400, detail="Email ya está en uso")
db_user.email = user_update.email
if user_update.full_name is not None:
db_user.full_name = user_update.full_name
# Solo admin puede cambiar roles
if user_update.role is not None:
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para cambiar roles")
db_user.role = user_update.role
db.commit()
db.refresh(db_user)
return db_user
@app.patch("/api/users/{user_id}/deactivate")
def deactivate_user(
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede inactivar usuarios
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para inactivar usuarios")
# No permitir auto-inactivación
if current_user.id == user_id:
raise HTTPException(status_code=400, detail="No puedes inactivar tu propio usuario")
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
db_user.is_active = False
db.commit()
return {"message": "Usuario inactivado correctamente", "user_id": user_id}
@app.patch("/api/users/{user_id}/activate")
def activate_user(
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede activar usuarios
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para activar usuarios")
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
db_user.is_active = True
db.commit()
return {"message": "Usuario activado correctamente", "user_id": user_id}
@app.patch("/api/users/{user_id}/password")
def change_user_password(
user_id: int,
password_update: schemas.AdminPasswordUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede cambiar contraseñas de otros usuarios
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para cambiar contraseñas")
db_user = db.query(models.User).filter(models.User.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
# Cambiar contraseña
db_user.password_hash = get_password_hash(password_update.new_password)
db.commit()
return {"message": "Contraseña actualizada correctamente"}
@app.patch("/api/users/me/password")
def change_my_password(
password_update: schemas.UserPasswordUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Verificar contraseña actual
if not verify_password(password_update.current_password, current_user.password_hash):
raise HTTPException(status_code=400, detail="Contraseña actual incorrecta")
# Cambiar contraseña
current_user.password_hash = get_password_hash(password_update.new_password)
db.commit()
return {"message": "Contraseña actualizada correctamente"}
@app.put("/api/users/me", response_model=schemas.User)
def update_my_profile(
user_update: schemas.UserUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Actualizar email
if user_update.email is not None:
# Verificar si email está en uso
existing = db.query(models.User).filter(
models.User.email == user_update.email,
models.User.id != current_user.id
).first()
if existing:
raise HTTPException(status_code=400, detail="Email ya está en uso")
current_user.email = user_update.email
# Actualizar nombre
if user_update.full_name is not None:
current_user.full_name = user_update.full_name
# No permitir cambio de rol desde perfil
db.commit()
db.refresh(current_user)
return current_user
# ============= CHECKLIST ENDPOINTS =============
@app.get("/api/checklists", response_model=List[schemas.Checklist])
def get_checklists(
@@ -233,11 +457,16 @@ def get_inspections(
limit: int = 100,
vehicle_plate: str = None,
status: str = None,
show_inactive: bool = False,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
query = db.query(models.Inspection)
# Por defecto, solo mostrar inspecciones activas
if not show_inactive:
query = query.filter(models.Inspection.is_active == True)
# Mecánicos solo ven sus inspecciones
if current_user.role == "mechanic":
query = query.filter(models.Inspection.mechanic_id == current_user.id)
@@ -346,6 +575,32 @@ def complete_inspection(
return inspection
@app.patch("/api/inspections/{inspection_id}/deactivate")
def deactivate_inspection(
inspection_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
# Solo admin puede inactivar
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos para inactivar inspecciones")
inspection = db.query(models.Inspection).filter(
models.Inspection.id == inspection_id
).first()
if not inspection:
raise HTTPException(status_code=404, detail="Inspección no encontrada")
inspection.is_active = False
inspection.status = "inactive"
db.commit()
db.refresh(inspection)
return {"message": "Inspección inactivada correctamente", "inspection_id": inspection_id}
# ============= ANSWER ENDPOINTS =============
@app.post("/api/answers", response_model=schemas.Answer)
def create_answer(

View File

@@ -87,7 +87,8 @@ class Inspection(Base):
flagged_items_count = Column(Integer, default=0)
# Estado
status = Column(String(20), default="draft") # draft, completed
status = Column(String(20), default="draft") # draft, completed, inactive
is_active = Column(Boolean, default=True)
# Firma
signature_data = Column(Text) # Base64 de la firma

View File

@@ -12,6 +12,18 @@ class UserBase(BaseModel):
class UserCreate(UserBase):
password: str
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
full_name: Optional[str] = None
role: Optional[str] = None
class UserPasswordUpdate(BaseModel):
current_password: str
new_password: str
class AdminPasswordUpdate(BaseModel):
new_password: str
class UserLogin(BaseModel):
username: str
password: str