esta todo ok

This commit is contained in:
2025-11-19 21:46:22 -03:00
parent 57ad12754f
commit 250758963c
7 changed files with 8 additions and 550 deletions

View File

@@ -27,19 +27,6 @@ app.add_middleware(
security = HTTPBearer()
# ============= PERMISSION HELPERS =============
def require_permission(user: models.User, permission: str):
"""Verifica que el usuario tenga un permiso específico"""
if not hasattr(user.role_obj, permission) or not getattr(user.role_obj, permission):
raise HTTPException(
status_code=403,
detail=f"No tienes permisos para esta acción (requiere: {permission})"
)
def has_permission(user: models.User, permission: str) -> bool:
"""Verifica si el usuario tiene un permiso específico"""
return hasattr(user.role_obj, permission) and getattr(user.role_obj, permission)
# Dependency para obtener usuario actual
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
@@ -64,11 +51,8 @@ def get_current_user(
api_token.last_used_at = datetime.utcnow()
db.commit()
# Obtener usuario con rol
user = db.query(models.User).options(
joinedload(models.User.role_obj)
).filter(models.User.id == api_token.user_id).first()
# Obtener usuario
user = db.query(models.User).filter(models.User.id == api_token.user_id).first()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -88,10 +72,7 @@ def get_current_user(
user_id = int(payload.get("sub"))
print(f"Looking for user ID: {user_id}") # Debug
user = db.query(models.User).options(
joinedload(models.User.role_obj)
).filter(models.User.id == user_id).first()
user = db.query(models.User).filter(models.User.id == user_id).first()
if user is None:
print(f"User not found with ID: {user_id}") # Debug
raise HTTPException(status_code=404, detail="Usuario no encontrado")
@@ -113,15 +94,13 @@ def register(user: schemas.UserCreate, db: Session = Depends(get_db)):
username=user.username,
email=user.email,
full_name=user.full_name,
role_id=user.role_id,
role=user.role,
password_hash=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
db.refresh(db_user)
return db_user
@app.post("/api/auth/login", response_model=schemas.Token)

View File

@@ -3,26 +3,6 @@ from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.core.database import Base
class Role(Base):
__tablename__ = "roles"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), unique=True, nullable=False) # administrador, asesor, mecanico
display_name = Column(String(100), nullable=False) # Administrador, Asesor, Mecánico
description = Column(String(255))
# Permisos
can_manage_users = Column(Boolean, default=False)
can_manage_roles = Column(Boolean, default=False)
can_manage_checklists = Column(Boolean, default=False)
can_create_inspections = Column(Boolean, default=False)
can_view_all_inspections = Column(Boolean, default=False)
can_view_reports = Column(Boolean, default=False)
can_deactivate_inspections = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationships
users = relationship("User", back_populates="role_obj")
class User(Base):
__tablename__ = "users"
@@ -30,13 +10,12 @@ class User(Base):
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True)
password_hash = Column(String(255), nullable=False)
role_id = Column(Integer, ForeignKey("roles.id"), nullable=False)
role = Column(String(20), nullable=False) # admin, mechanic
full_name = Column(String(100))
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationships
role_obj = relationship("Role", back_populates="users")
checklists_created = relationship("Checklist", back_populates="creator")
inspections = relationship("Inspection", back_populates="mechanic")
api_tokens = relationship("APIToken", back_populates="user", cascade="all, delete-orphan")

View File

@@ -2,46 +2,12 @@ from pydantic import BaseModel, EmailStr, Field
from typing import Optional, List
from datetime import datetime
# Role Schemas
class RoleBase(BaseModel):
name: str
display_name: str
description: Optional[str] = None
can_manage_users: bool = False
can_manage_roles: bool = False
can_manage_checklists: bool = False
can_create_inspections: bool = False
can_view_all_inspections: bool = False
can_view_reports: bool = False
can_deactivate_inspections: bool = False
class RoleCreate(RoleBase):
pass
class RoleUpdate(BaseModel):
display_name: Optional[str] = None
description: Optional[str] = None
can_manage_users: Optional[bool] = None
can_manage_roles: Optional[bool] = None
can_manage_checklists: Optional[bool] = None
can_create_inspections: Optional[bool] = None
can_view_all_inspections: Optional[bool] = None
can_view_reports: Optional[bool] = None
can_deactivate_inspections: Optional[bool] = None
class Role(RoleBase):
id: int
created_at: datetime
class Config:
from_attributes = True
# User Schemas
class UserBase(BaseModel):
username: str
email: Optional[EmailStr] = None
full_name: Optional[str] = None
role_id: int = 3 # Default: mecanico
role: str = "mechanic"
class UserCreate(UserBase):
password: str
@@ -50,7 +16,7 @@ class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[EmailStr] = None
full_name: Optional[str] = None
role_id: Optional[int] = None
role: Optional[str] = None
class UserPasswordUpdate(BaseModel):
current_password: str
@@ -63,13 +29,8 @@ class UserLogin(BaseModel):
username: str
password: str
class User(BaseModel):
class User(UserBase):
id: int
username: str
email: Optional[str] = None
full_name: Optional[str] = None
role_id: int
role: Role # Role object
is_active: bool
created_at: datetime

View File

@@ -1,128 +0,0 @@
"""
Script de migración para implementar sistema de roles
Crea tabla roles y migra usuarios existentes
"""
from sqlalchemy import create_engine, text
from app.core.config import settings
def migrate():
engine = create_engine(settings.DATABASE_URL)
with engine.connect() as conn:
print("🔄 Iniciando migración de roles...")
# 1. Crear tabla roles
print("📋 Creando tabla roles...")
conn.execute(text("""
CREATE TABLE IF NOT EXISTS roles (
id SERIAL PRIMARY KEY,
name VARCHAR(50) UNIQUE NOT NULL,
display_name VARCHAR(100) NOT NULL,
description VARCHAR(255),
can_manage_users BOOLEAN DEFAULT FALSE,
can_manage_roles BOOLEAN DEFAULT FALSE,
can_manage_checklists BOOLEAN DEFAULT FALSE,
can_create_inspections BOOLEAN DEFAULT FALSE,
can_view_all_inspections BOOLEAN DEFAULT FALSE,
can_view_reports BOOLEAN DEFAULT FALSE,
can_deactivate_inspections BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
)
"""))
conn.commit()
# 2. Insertar roles predefinidos
print("👥 Insertando roles predefinidos...")
conn.execute(text("""
INSERT INTO roles (name, display_name, description,
can_manage_users, can_manage_roles, can_manage_checklists,
can_create_inspections, can_view_all_inspections,
can_view_reports, can_deactivate_inspections)
VALUES
('administrador', 'Administrador', 'Acceso completo al sistema',
TRUE, TRUE, TRUE, TRUE, TRUE, TRUE, TRUE),
('asesor', 'Asesor', 'Acceso a reportes e informes',
FALSE, FALSE, FALSE, FALSE, TRUE, TRUE, FALSE),
('mecanico', 'Mecánico', 'Realizar inspecciones',
FALSE, FALSE, FALSE, TRUE, FALSE, FALSE, FALSE)
ON CONFLICT (name) DO NOTHING
"""))
conn.commit()
# 3. Agregar columna role_id a users (temporal)
print("🔧 Agregando columna role_id a usuarios...")
conn.execute(text("""
ALTER TABLE users ADD COLUMN IF NOT EXISTS role_id INTEGER
"""))
conn.commit()
# 4. Migrar datos: admin -> administrador (id=1)
print("🔄 Migrando usuarios admin -> administrador...")
conn.execute(text("""
UPDATE users
SET role_id = (SELECT id FROM roles WHERE name = 'administrador')
WHERE role = 'admin' AND role_id IS NULL
"""))
conn.commit()
# 5. Migrar datos: mechanic -> mecanico (id=3)
print("🔄 Migrando usuarios mechanic -> mecanico...")
conn.execute(text("""
UPDATE users
SET role_id = (SELECT id FROM roles WHERE name = 'mecanico')
WHERE role IN ('mechanic', 'mecanico') AND role_id IS NULL
"""))
conn.commit()
# 6. Verificar que todos tienen role_id
result = conn.execute(text("SELECT COUNT(*) FROM users WHERE role_id IS NULL"))
null_count = result.scalar()
if null_count > 0:
print(f"⚠️ Advertencia: {null_count} usuarios sin role_id asignado")
print(" Asignando rol de mecánico por defecto...")
conn.execute(text("""
UPDATE users
SET role_id = (SELECT id FROM roles WHERE name = 'mecanico')
WHERE role_id IS NULL
"""))
conn.commit()
# 7. Hacer role_id NOT NULL y crear foreign key
print("🔒 Aplicando restricciones...")
conn.execute(text("""
ALTER TABLE users ALTER COLUMN role_id SET NOT NULL
"""))
conn.execute(text("""
ALTER TABLE users
ADD CONSTRAINT fk_users_role_id
FOREIGN KEY (role_id) REFERENCES roles(id)
"""))
conn.commit()
# 8. Eliminar columna role antigua
print("🗑️ Eliminando columna role antigua...")
conn.execute(text("""
ALTER TABLE users DROP COLUMN IF EXISTS role
"""))
conn.commit()
# 9. Mostrar resumen
print("\n✅ Migración completada!")
print("\n📊 Resumen de roles:")
result = conn.execute(text("""
SELECT r.name, r.display_name, COUNT(u.id) as user_count
FROM roles r
LEFT JOIN users u ON u.role_id = r.id
GROUP BY r.id, r.name, r.display_name
ORDER BY r.id
"""))
for row in result:
print(f" {row[1]}: {row[2]} usuario(s)")
print("\n🎉 Sistema de roles implementado correctamente!")
if __name__ == "__main__":
migrate()

View File

@@ -1,100 +0,0 @@
# Endpoints para gestión de roles - Agregar después de los endpoints de usuarios
# ============= ROLE ENDPOINTS =============
@app.get("/api/roles", response_model=List[schemas.Role])
def get_roles(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Lista todos los roles disponibles (cualquier usuario autenticado)"""
return db.query(models.Role).all()
@app.get("/api/roles/{role_id}", response_model=schemas.Role)
def get_role(
role_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Obtiene un rol específico"""
require_permission(current_user, "can_manage_roles")
role = db.query(models.Role).filter(models.Role.id == role_id).first()
if not role:
raise HTTPException(status_code=404, detail="Rol no encontrado")
return role
@app.post("/api/roles", response_model=schemas.Role)
def create_role(
role: schemas.RoleCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Crea un nuevo rol (solo administrador)"""
require_permission(current_user, "can_manage_roles")
# Verificar si el rol ya existe
existing = db.query(models.Role).filter(models.Role.name == role.name).first()
if existing:
raise HTTPException(status_code=400, detail="El rol ya existe")
db_role = models.Role(**role.dict())
db.add(db_role)
db.commit()
db.refresh(db_role)
return db_role
@app.put("/api/roles/{role_id}", response_model=schemas.Role)
def update_role(
role_id: int,
role_update: schemas.RoleUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Actualiza un rol existente (solo administrador)"""
require_permission(current_user, "can_manage_roles")
db_role = db.query(models.Role).filter(models.Role.id == role_id).first()
if not db_role:
raise HTTPException(status_code=404, detail="Rol no encontrado")
# No permitir editar roles predefinidos (1, 2, 3)
if role_id in [1, 2, 3]:
raise HTTPException(status_code=403, detail="No se pueden editar roles predefinidos")
# Actualizar campos
update_data = role_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_role, field, value)
db.commit()
db.refresh(db_role)
return db_role
@app.delete("/api/roles/{role_id}")
def delete_role(
role_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Elimina un rol (solo administrador, no permite eliminar roles predefinidos)"""
require_permission(current_user, "can_manage_roles")
# No permitir eliminar roles predefinidos
if role_id in [1, 2, 3]:
raise HTTPException(status_code=403, detail="No se pueden eliminar roles predefinidos")
db_role = db.query(models.Role).filter(models.Role.id == role_id).first()
if not db_role:
raise HTTPException(status_code=404, detail="Rol no encontrado")
# Verificar si hay usuarios con este rol
users_count = db.query(models.User).filter(models.User.role_id == role_id).count()
if users_count > 0:
raise HTTPException(
status_code=400,
detail=f"No se puede eliminar el rol porque tiene {users_count} usuario(s) asignado(s)"
)
db.delete(db_role)
db.commit()
return {"message": "Rol eliminado correctamente", "role_id": role_id}

View File

@@ -1,67 +0,0 @@
"""
Script para actualizar automáticamente las verificaciones de permisos en main.py
Reemplaza las verificaciones de role string por verificaciones basadas en permisos
"""
import re
def update_permissions():
with open('app/main.py', 'r', encoding='utf-8') as f:
content = f.read()
# Mapa de reemplazos: patrón -> reemplazo
replacements = [
# Gestión de usuarios
(
r'if current_user\.role != "admin":\s+raise HTTPException\(status_code=403, detail="No tienes permisos para ver usuarios"\)',
'require_permission(current_user, "can_manage_users")'
),
(
r'if current_user\.role != "admin":\s+raise HTTPException\(status_code=403, detail="No tienes permisos.*usuarios?"\)',
'require_permission(current_user, "can_manage_users")'
),
# Gestión de checklists
(
r'if current_user\.role != "admin":\s+raise HTTPException\(status_code=403, detail=".*checklist.*"\)',
'require_permission(current_user, "can_manage_checklists")'
),
# Desactivar inspecciones
(
r'if current_user\.role != "admin":\s+raise HTTPException\(status_code=403, detail=".*inactivar.*inspecc.*"\)',
'require_permission(current_user, "can_deactivate_inspections")'
),
# Ver todas las inspecciones (mechanic filter)
(
r'if current_user\.role == "mechanic":\s+query = query\.filter\(models\.Inspection\.mechanic_id == current_user\.id\)',
'if not has_permission(current_user, "can_view_all_inspections"):\n query = query.filter(models.Inspection.mechanic_id == current_user.id)'
),
# Crear inspecciones
(
r'# Crear usuario\s+hashed_password = get_password_hash\(user\.password\)\s+db_user = models\.User\(\s+username=user\.username,\s+email=user\.email,\s+full_name=user\.full_name,\s+role=user\.role,',
'# Crear usuario\n hashed_password = get_password_hash(user.password)\n db_user = models.User(\n username=user.username,\n email=user.email,\n full_name=user.full_name,\n role_id=user.role_id,'
),
]
# Aplicar reemplazos
for pattern, replacement in replacements:
content = re.sub(pattern, replacement, content, flags=re.MULTILINE | re.DOTALL)
# Reemplazos específicos adicionales
# Cambiar role por role_id en UserUpdate
content = content.replace(
'if user_update.role is not None:\n if current_user.role != "admin":\n raise HTTPException(status_code=403, detail="No tienes permisos para cambiar roles")\n db_user.role = user_update.role',
'if user_update.role_id is not None:\n require_permission(current_user, "can_manage_roles")\n db_user.role_id = user_update.role_id'
)
with open('app/main.py', 'w', encoding='utf-8') as f:
f.write(content)
print("✅ Archivo main.py actualizado con sistema de permisos")
print("⚠️ Revisar manualmente y ajustar según sea necesario")
if __name__ == "__main__":
update_permissions()