esta todo ok
This commit is contained in:
@@ -27,6 +27,19 @@ app.add_middleware(
|
||||
|
||||
security = HTTPBearer()
|
||||
|
||||
# ============= PERMISSION HELPERS =============
|
||||
def require_permission(user: models.User, permission: str):
|
||||
"""Verifica que el usuario tenga un permiso específico"""
|
||||
if not hasattr(user.role_obj, permission) or not getattr(user.role_obj, permission):
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail=f"No tienes permisos para esta acción (requiere: {permission})"
|
||||
)
|
||||
|
||||
def has_permission(user: models.User, permission: str) -> bool:
|
||||
"""Verifica si el usuario tiene un permiso específico"""
|
||||
return hasattr(user.role_obj, permission) and getattr(user.role_obj, permission)
|
||||
|
||||
# Dependency para obtener usuario actual
|
||||
def get_current_user(
|
||||
credentials: HTTPAuthorizationCredentials = Depends(security),
|
||||
@@ -51,8 +64,11 @@ def get_current_user(
|
||||
api_token.last_used_at = datetime.utcnow()
|
||||
db.commit()
|
||||
|
||||
# Obtener usuario
|
||||
user = db.query(models.User).filter(models.User.id == api_token.user_id).first()
|
||||
# Obtener usuario con rol
|
||||
user = db.query(models.User).options(
|
||||
joinedload(models.User.role_obj)
|
||||
).filter(models.User.id == api_token.user_id).first()
|
||||
|
||||
if not user or not user.is_active:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
@@ -72,7 +88,10 @@ def get_current_user(
|
||||
|
||||
user_id = int(payload.get("sub"))
|
||||
print(f"Looking for user ID: {user_id}") # Debug
|
||||
user = db.query(models.User).filter(models.User.id == user_id).first()
|
||||
user = db.query(models.User).options(
|
||||
joinedload(models.User.role_obj)
|
||||
).filter(models.User.id == user_id).first()
|
||||
|
||||
if user is None:
|
||||
print(f"User not found with ID: {user_id}") # Debug
|
||||
raise HTTPException(status_code=404, detail="Usuario no encontrado")
|
||||
@@ -94,13 +113,15 @@ def register(user: schemas.UserCreate, db: Session = Depends(get_db)):
|
||||
username=user.username,
|
||||
email=user.email,
|
||||
full_name=user.full_name,
|
||||
role=user.role,
|
||||
role_id=user.role_id,
|
||||
password_hash=hashed_password
|
||||
)
|
||||
db.add(db_user)
|
||||
db.commit()
|
||||
db.refresh(db_user)
|
||||
return db_user
|
||||
db.refresh(db_user)
|
||||
return db_user
|
||||
|
||||
|
||||
@app.post("/api/auth/login", response_model=schemas.Token)
|
||||
|
||||
@@ -3,6 +3,26 @@ from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.sql import func
|
||||
from app.core.database import Base
|
||||
|
||||
class Role(Base):
|
||||
__tablename__ = "roles"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
name = Column(String(50), unique=True, nullable=False) # administrador, asesor, mecanico
|
||||
display_name = Column(String(100), nullable=False) # Administrador, Asesor, Mecánico
|
||||
description = Column(String(255))
|
||||
# Permisos
|
||||
can_manage_users = Column(Boolean, default=False)
|
||||
can_manage_roles = Column(Boolean, default=False)
|
||||
can_manage_checklists = Column(Boolean, default=False)
|
||||
can_create_inspections = Column(Boolean, default=False)
|
||||
can_view_all_inspections = Column(Boolean, default=False)
|
||||
can_view_reports = Column(Boolean, default=False)
|
||||
can_deactivate_inspections = Column(Boolean, default=False)
|
||||
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
||||
|
||||
# Relationships
|
||||
users = relationship("User", back_populates="role_obj")
|
||||
|
||||
class User(Base):
|
||||
__tablename__ = "users"
|
||||
|
||||
@@ -10,12 +30,13 @@ class User(Base):
|
||||
username = Column(String(50), unique=True, index=True, nullable=False)
|
||||
email = Column(String(100), unique=True, index=True)
|
||||
password_hash = Column(String(255), nullable=False)
|
||||
role = Column(String(20), nullable=False) # admin, mechanic
|
||||
role_id = Column(Integer, ForeignKey("roles.id"), nullable=False)
|
||||
full_name = Column(String(100))
|
||||
is_active = Column(Boolean, default=True)
|
||||
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
||||
|
||||
# Relationships
|
||||
role_obj = relationship("Role", back_populates="users")
|
||||
checklists_created = relationship("Checklist", back_populates="creator")
|
||||
inspections = relationship("Inspection", back_populates="mechanic")
|
||||
api_tokens = relationship("APIToken", back_populates="user", cascade="all, delete-orphan")
|
||||
|
||||
@@ -2,12 +2,46 @@ from pydantic import BaseModel, EmailStr, Field
|
||||
from typing import Optional, List
|
||||
from datetime import datetime
|
||||
|
||||
# Role Schemas
|
||||
class RoleBase(BaseModel):
|
||||
name: str
|
||||
display_name: str
|
||||
description: Optional[str] = None
|
||||
can_manage_users: bool = False
|
||||
can_manage_roles: bool = False
|
||||
can_manage_checklists: bool = False
|
||||
can_create_inspections: bool = False
|
||||
can_view_all_inspections: bool = False
|
||||
can_view_reports: bool = False
|
||||
can_deactivate_inspections: bool = False
|
||||
|
||||
class RoleCreate(RoleBase):
|
||||
pass
|
||||
|
||||
class RoleUpdate(BaseModel):
|
||||
display_name: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
can_manage_users: Optional[bool] = None
|
||||
can_manage_roles: Optional[bool] = None
|
||||
can_manage_checklists: Optional[bool] = None
|
||||
can_create_inspections: Optional[bool] = None
|
||||
can_view_all_inspections: Optional[bool] = None
|
||||
can_view_reports: Optional[bool] = None
|
||||
can_deactivate_inspections: Optional[bool] = None
|
||||
|
||||
class Role(RoleBase):
|
||||
id: int
|
||||
created_at: datetime
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
# User Schemas
|
||||
class UserBase(BaseModel):
|
||||
username: str
|
||||
email: Optional[EmailStr] = None
|
||||
full_name: Optional[str] = None
|
||||
role: str = "mechanic"
|
||||
role_id: int = 3 # Default: mecanico
|
||||
|
||||
class UserCreate(UserBase):
|
||||
password: str
|
||||
@@ -16,7 +50,7 @@ class UserUpdate(BaseModel):
|
||||
username: Optional[str] = None
|
||||
email: Optional[EmailStr] = None
|
||||
full_name: Optional[str] = None
|
||||
role: Optional[str] = None
|
||||
role_id: Optional[int] = None
|
||||
|
||||
class UserPasswordUpdate(BaseModel):
|
||||
current_password: str
|
||||
@@ -29,8 +63,13 @@ class UserLogin(BaseModel):
|
||||
username: str
|
||||
password: str
|
||||
|
||||
class User(UserBase):
|
||||
class User(BaseModel):
|
||||
id: int
|
||||
username: str
|
||||
email: Optional[str] = None
|
||||
full_name: Optional[str] = None
|
||||
role_id: int
|
||||
role: Role # Role object
|
||||
is_active: bool
|
||||
created_at: datetime
|
||||
|
||||
|
||||
128
backend/migrate_roles.py
Normal file
128
backend/migrate_roles.py
Normal file
@@ -0,0 +1,128 @@
|
||||
"""
|
||||
Script de migración para implementar sistema de roles
|
||||
Crea tabla roles y migra usuarios existentes
|
||||
"""
|
||||
from sqlalchemy import create_engine, text
|
||||
from app.core.config import settings
|
||||
|
||||
def migrate():
|
||||
engine = create_engine(settings.DATABASE_URL)
|
||||
|
||||
with engine.connect() as conn:
|
||||
print("🔄 Iniciando migración de roles...")
|
||||
|
||||
# 1. Crear tabla roles
|
||||
print("📋 Creando tabla roles...")
|
||||
conn.execute(text("""
|
||||
CREATE TABLE IF NOT EXISTS roles (
|
||||
id SERIAL PRIMARY KEY,
|
||||
name VARCHAR(50) UNIQUE NOT NULL,
|
||||
display_name VARCHAR(100) NOT NULL,
|
||||
description VARCHAR(255),
|
||||
can_manage_users BOOLEAN DEFAULT FALSE,
|
||||
can_manage_roles BOOLEAN DEFAULT FALSE,
|
||||
can_manage_checklists BOOLEAN DEFAULT FALSE,
|
||||
can_create_inspections BOOLEAN DEFAULT FALSE,
|
||||
can_view_all_inspections BOOLEAN DEFAULT FALSE,
|
||||
can_view_reports BOOLEAN DEFAULT FALSE,
|
||||
can_deactivate_inspections BOOLEAN DEFAULT FALSE,
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
||||
)
|
||||
"""))
|
||||
conn.commit()
|
||||
|
||||
# 2. Insertar roles predefinidos
|
||||
print("👥 Insertando roles predefinidos...")
|
||||
conn.execute(text("""
|
||||
INSERT INTO roles (name, display_name, description,
|
||||
can_manage_users, can_manage_roles, can_manage_checklists,
|
||||
can_create_inspections, can_view_all_inspections,
|
||||
can_view_reports, can_deactivate_inspections)
|
||||
VALUES
|
||||
('administrador', 'Administrador', 'Acceso completo al sistema',
|
||||
TRUE, TRUE, TRUE, TRUE, TRUE, TRUE, TRUE),
|
||||
('asesor', 'Asesor', 'Acceso a reportes e informes',
|
||||
FALSE, FALSE, FALSE, FALSE, TRUE, TRUE, FALSE),
|
||||
('mecanico', 'Mecánico', 'Realizar inspecciones',
|
||||
FALSE, FALSE, FALSE, TRUE, FALSE, FALSE, FALSE)
|
||||
ON CONFLICT (name) DO NOTHING
|
||||
"""))
|
||||
conn.commit()
|
||||
|
||||
# 3. Agregar columna role_id a users (temporal)
|
||||
print("🔧 Agregando columna role_id a usuarios...")
|
||||
conn.execute(text("""
|
||||
ALTER TABLE users ADD COLUMN IF NOT EXISTS role_id INTEGER
|
||||
"""))
|
||||
conn.commit()
|
||||
|
||||
# 4. Migrar datos: admin -> administrador (id=1)
|
||||
print("🔄 Migrando usuarios admin -> administrador...")
|
||||
conn.execute(text("""
|
||||
UPDATE users
|
||||
SET role_id = (SELECT id FROM roles WHERE name = 'administrador')
|
||||
WHERE role = 'admin' AND role_id IS NULL
|
||||
"""))
|
||||
conn.commit()
|
||||
|
||||
# 5. Migrar datos: mechanic -> mecanico (id=3)
|
||||
print("🔄 Migrando usuarios mechanic -> mecanico...")
|
||||
conn.execute(text("""
|
||||
UPDATE users
|
||||
SET role_id = (SELECT id FROM roles WHERE name = 'mecanico')
|
||||
WHERE role IN ('mechanic', 'mecanico') AND role_id IS NULL
|
||||
"""))
|
||||
conn.commit()
|
||||
|
||||
# 6. Verificar que todos tienen role_id
|
||||
result = conn.execute(text("SELECT COUNT(*) FROM users WHERE role_id IS NULL"))
|
||||
null_count = result.scalar()
|
||||
|
||||
if null_count > 0:
|
||||
print(f"⚠️ Advertencia: {null_count} usuarios sin role_id asignado")
|
||||
print(" Asignando rol de mecánico por defecto...")
|
||||
conn.execute(text("""
|
||||
UPDATE users
|
||||
SET role_id = (SELECT id FROM roles WHERE name = 'mecanico')
|
||||
WHERE role_id IS NULL
|
||||
"""))
|
||||
conn.commit()
|
||||
|
||||
# 7. Hacer role_id NOT NULL y crear foreign key
|
||||
print("🔒 Aplicando restricciones...")
|
||||
conn.execute(text("""
|
||||
ALTER TABLE users ALTER COLUMN role_id SET NOT NULL
|
||||
"""))
|
||||
conn.execute(text("""
|
||||
ALTER TABLE users
|
||||
ADD CONSTRAINT fk_users_role_id
|
||||
FOREIGN KEY (role_id) REFERENCES roles(id)
|
||||
"""))
|
||||
conn.commit()
|
||||
|
||||
# 8. Eliminar columna role antigua
|
||||
print("🗑️ Eliminando columna role antigua...")
|
||||
conn.execute(text("""
|
||||
ALTER TABLE users DROP COLUMN IF EXISTS role
|
||||
"""))
|
||||
conn.commit()
|
||||
|
||||
# 9. Mostrar resumen
|
||||
print("\n✅ Migración completada!")
|
||||
print("\n📊 Resumen de roles:")
|
||||
|
||||
result = conn.execute(text("""
|
||||
SELECT r.name, r.display_name, COUNT(u.id) as user_count
|
||||
FROM roles r
|
||||
LEFT JOIN users u ON u.role_id = r.id
|
||||
GROUP BY r.id, r.name, r.display_name
|
||||
ORDER BY r.id
|
||||
"""))
|
||||
|
||||
for row in result:
|
||||
print(f" {row[1]}: {row[2]} usuario(s)")
|
||||
|
||||
print("\n🎉 Sistema de roles implementado correctamente!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
migrate()
|
||||
100
backend/role_endpoints.txt
Normal file
100
backend/role_endpoints.txt
Normal file
@@ -0,0 +1,100 @@
|
||||
# Endpoints para gestión de roles - Agregar después de los endpoints de usuarios
|
||||
|
||||
# ============= ROLE ENDPOINTS =============
|
||||
@app.get("/api/roles", response_model=List[schemas.Role])
|
||||
def get_roles(
|
||||
db: Session = Depends(get_db),
|
||||
current_user: models.User = Depends(get_current_user)
|
||||
):
|
||||
"""Lista todos los roles disponibles (cualquier usuario autenticado)"""
|
||||
return db.query(models.Role).all()
|
||||
|
||||
@app.get("/api/roles/{role_id}", response_model=schemas.Role)
|
||||
def get_role(
|
||||
role_id: int,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: models.User = Depends(get_current_user)
|
||||
):
|
||||
"""Obtiene un rol específico"""
|
||||
require_permission(current_user, "can_manage_roles")
|
||||
|
||||
role = db.query(models.Role).filter(models.Role.id == role_id).first()
|
||||
if not role:
|
||||
raise HTTPException(status_code=404, detail="Rol no encontrado")
|
||||
return role
|
||||
|
||||
@app.post("/api/roles", response_model=schemas.Role)
|
||||
def create_role(
|
||||
role: schemas.RoleCreate,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: models.User = Depends(get_current_user)
|
||||
):
|
||||
"""Crea un nuevo rol (solo administrador)"""
|
||||
require_permission(current_user, "can_manage_roles")
|
||||
|
||||
# Verificar si el rol ya existe
|
||||
existing = db.query(models.Role).filter(models.Role.name == role.name).first()
|
||||
if existing:
|
||||
raise HTTPException(status_code=400, detail="El rol ya existe")
|
||||
|
||||
db_role = models.Role(**role.dict())
|
||||
db.add(db_role)
|
||||
db.commit()
|
||||
db.refresh(db_role)
|
||||
return db_role
|
||||
|
||||
@app.put("/api/roles/{role_id}", response_model=schemas.Role)
|
||||
def update_role(
|
||||
role_id: int,
|
||||
role_update: schemas.RoleUpdate,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: models.User = Depends(get_current_user)
|
||||
):
|
||||
"""Actualiza un rol existente (solo administrador)"""
|
||||
require_permission(current_user, "can_manage_roles")
|
||||
|
||||
db_role = db.query(models.Role).filter(models.Role.id == role_id).first()
|
||||
if not db_role:
|
||||
raise HTTPException(status_code=404, detail="Rol no encontrado")
|
||||
|
||||
# No permitir editar roles predefinidos (1, 2, 3)
|
||||
if role_id in [1, 2, 3]:
|
||||
raise HTTPException(status_code=403, detail="No se pueden editar roles predefinidos")
|
||||
|
||||
# Actualizar campos
|
||||
update_data = role_update.dict(exclude_unset=True)
|
||||
for field, value in update_data.items():
|
||||
setattr(db_role, field, value)
|
||||
|
||||
db.commit()
|
||||
db.refresh(db_role)
|
||||
return db_role
|
||||
|
||||
@app.delete("/api/roles/{role_id}")
|
||||
def delete_role(
|
||||
role_id: int,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: models.User = Depends(get_current_user)
|
||||
):
|
||||
"""Elimina un rol (solo administrador, no permite eliminar roles predefinidos)"""
|
||||
require_permission(current_user, "can_manage_roles")
|
||||
|
||||
# No permitir eliminar roles predefinidos
|
||||
if role_id in [1, 2, 3]:
|
||||
raise HTTPException(status_code=403, detail="No se pueden eliminar roles predefinidos")
|
||||
|
||||
db_role = db.query(models.Role).filter(models.Role.id == role_id).first()
|
||||
if not db_role:
|
||||
raise HTTPException(status_code=404, detail="Rol no encontrado")
|
||||
|
||||
# Verificar si hay usuarios con este rol
|
||||
users_count = db.query(models.User).filter(models.User.role_id == role_id).count()
|
||||
if users_count > 0:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"No se puede eliminar el rol porque tiene {users_count} usuario(s) asignado(s)"
|
||||
)
|
||||
|
||||
db.delete(db_role)
|
||||
db.commit()
|
||||
return {"message": "Rol eliminado correctamente", "role_id": role_id}
|
||||
67
backend/update_permissions.py
Normal file
67
backend/update_permissions.py
Normal file
@@ -0,0 +1,67 @@
|
||||
"""
|
||||
Script para actualizar automáticamente las verificaciones de permisos en main.py
|
||||
Reemplaza las verificaciones de role string por verificaciones basadas en permisos
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
def update_permissions():
|
||||
with open('app/main.py', 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
# Mapa de reemplazos: patrón -> reemplazo
|
||||
replacements = [
|
||||
# Gestión de usuarios
|
||||
(
|
||||
r'if current_user\.role != "admin":\s+raise HTTPException\(status_code=403, detail="No tienes permisos para ver usuarios"\)',
|
||||
'require_permission(current_user, "can_manage_users")'
|
||||
),
|
||||
(
|
||||
r'if current_user\.role != "admin":\s+raise HTTPException\(status_code=403, detail="No tienes permisos.*usuarios?"\)',
|
||||
'require_permission(current_user, "can_manage_users")'
|
||||
),
|
||||
|
||||
# Gestión de checklists
|
||||
(
|
||||
r'if current_user\.role != "admin":\s+raise HTTPException\(status_code=403, detail=".*checklist.*"\)',
|
||||
'require_permission(current_user, "can_manage_checklists")'
|
||||
),
|
||||
|
||||
# Desactivar inspecciones
|
||||
(
|
||||
r'if current_user\.role != "admin":\s+raise HTTPException\(status_code=403, detail=".*inactivar.*inspecc.*"\)',
|
||||
'require_permission(current_user, "can_deactivate_inspections")'
|
||||
),
|
||||
|
||||
# Ver todas las inspecciones (mechanic filter)
|
||||
(
|
||||
r'if current_user\.role == "mechanic":\s+query = query\.filter\(models\.Inspection\.mechanic_id == current_user\.id\)',
|
||||
'if not has_permission(current_user, "can_view_all_inspections"):\n query = query.filter(models.Inspection.mechanic_id == current_user.id)'
|
||||
),
|
||||
|
||||
# Crear inspecciones
|
||||
(
|
||||
r'# Crear usuario\s+hashed_password = get_password_hash\(user\.password\)\s+db_user = models\.User\(\s+username=user\.username,\s+email=user\.email,\s+full_name=user\.full_name,\s+role=user\.role,',
|
||||
'# Crear usuario\n hashed_password = get_password_hash(user.password)\n db_user = models.User(\n username=user.username,\n email=user.email,\n full_name=user.full_name,\n role_id=user.role_id,'
|
||||
),
|
||||
]
|
||||
|
||||
# Aplicar reemplazos
|
||||
for pattern, replacement in replacements:
|
||||
content = re.sub(pattern, replacement, content, flags=re.MULTILINE | re.DOTALL)
|
||||
|
||||
# Reemplazos específicos adicionales
|
||||
# Cambiar role por role_id en UserUpdate
|
||||
content = content.replace(
|
||||
'if user_update.role is not None:\n if current_user.role != "admin":\n raise HTTPException(status_code=403, detail="No tienes permisos para cambiar roles")\n db_user.role = user_update.role',
|
||||
'if user_update.role_id is not None:\n require_permission(current_user, "can_manage_roles")\n db_user.role_id = user_update.role_id'
|
||||
)
|
||||
|
||||
with open('app/main.py', 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
|
||||
print("✅ Archivo main.py actualizado con sistema de permisos")
|
||||
print("⚠️ Revisar manualmente y ajustar según sea necesario")
|
||||
|
||||
if __name__ == "__main__":
|
||||
update_permissions()
|
||||
Reference in New Issue
Block a user