Se termina con Tokens y Modulo Usuarios

This commit is contained in:
2025-11-19 09:52:51 -03:00
parent e2783015e3
commit 3a905a4d02
4 changed files with 190 additions and 1 deletions

View File

@@ -3,6 +3,7 @@ from typing import Optional
from jose import JWTError, jwt from jose import JWTError, jwt
from passlib.context import CryptContext from passlib.context import CryptContext
from app.core.config import settings from app.core.config import settings
import secrets
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@@ -33,3 +34,7 @@ def decode_access_token(token: str):
except JWTError as e: except JWTError as e:
print(f"JWT decode error: {e}") # Debug print(f"JWT decode error: {e}") # Debug
return None return None
def generate_api_token() -> str:
"""Genera un token API aleatorio seguro"""
return f"syntria_{secrets.token_urlsafe(32)}"

View File

@@ -33,6 +33,35 @@ def get_current_user(
db: Session = Depends(get_db) db: Session = Depends(get_db)
): ):
token = credentials.credentials token = credentials.credentials
# Verificar si es un API token (comienza con "syntria_")
if token.startswith("syntria_"):
api_token = db.query(models.APIToken).filter(
models.APIToken.token == token,
models.APIToken.is_active == True
).first()
if not api_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API Token inválido o inactivo"
)
# Actualizar último uso
api_token.last_used_at = datetime.utcnow()
db.commit()
# Obtener usuario
user = db.query(models.User).filter(models.User.id == api_token.user_id).first()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Usuario inválido o inactivo"
)
return user
# Si no es API token, es JWT token
payload = decode_access_token(token) payload = decode_access_token(token)
print(f"Token payload: {payload}") # Debug print(f"Token payload: {payload}") # Debug
if payload is None: if payload is None:
@@ -101,7 +130,7 @@ def get_me(current_user: models.User = Depends(get_current_user)):
def get_users( def get_users(
skip: int = 0, skip: int = 0,
limit: int = 100, limit: int = 100,
active_only: bool = True, active_only: bool = False,
db: Session = Depends(get_db), db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user) current_user: models.User = Depends(get_current_user)
): ):
@@ -186,6 +215,16 @@ def update_user(
raise HTTPException(status_code=404, detail="Usuario no encontrado") raise HTTPException(status_code=404, detail="Usuario no encontrado")
# Actualizar campos # Actualizar campos
if user_update.username is not None:
# Verificar si username está en uso
existing = db.query(models.User).filter(
models.User.username == user_update.username,
models.User.id != user_id
).first()
if existing:
raise HTTPException(status_code=400, detail="Nombre de usuario ya está en uso")
db_user.username = user_update.username
if user_update.email is not None: if user_update.email is not None:
# Verificar si email está en uso # Verificar si email está en uso
existing = db.query(models.User).filter( existing = db.query(models.User).filter(
@@ -320,6 +359,116 @@ def update_my_profile(
return current_user return current_user
# ============= API TOKENS ENDPOINTS =============
@app.get("/api/users/me/tokens", response_model=List[schemas.APIToken])
def get_my_tokens(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Obtener todos mis API tokens"""
tokens = db.query(models.APIToken).filter(
models.APIToken.user_id == current_user.id
).all()
return tokens
@app.post("/api/users/me/tokens", response_model=schemas.APITokenWithValue)
def create_my_token(
token_create: schemas.APITokenCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Generar un nuevo API token"""
from app.core.security import generate_api_token
# Generar token único
token_value = generate_api_token()
# Crear registro
api_token = models.APIToken(
user_id=current_user.id,
token=token_value,
description=token_create.description,
is_active=True
)
db.add(api_token)
db.commit()
db.refresh(api_token)
# Retornar con el token completo (solo esta vez)
return schemas.APITokenWithValue(
id=api_token.id,
token=api_token.token,
description=api_token.description,
is_active=api_token.is_active,
last_used_at=api_token.last_used_at,
created_at=api_token.created_at
)
@app.delete("/api/users/me/tokens/{token_id}")
def delete_my_token(
token_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Revocar uno de mis API tokens"""
api_token = db.query(models.APIToken).filter(
models.APIToken.id == token_id,
models.APIToken.user_id == current_user.id
).first()
if not api_token:
raise HTTPException(status_code=404, detail="Token no encontrado")
api_token.is_active = False
db.commit()
return {"message": "Token revocado correctamente", "token_id": token_id}
@app.get("/api/users/{user_id}/tokens", response_model=List[schemas.APIToken])
def get_user_tokens(
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Obtener tokens de un usuario (solo admin)"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos")
tokens = db.query(models.APIToken).filter(
models.APIToken.user_id == user_id
).all()
return tokens
@app.delete("/api/users/{user_id}/tokens/{token_id}")
def delete_user_token(
user_id: int,
token_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
"""Revocar token de un usuario (solo admin)"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="No tienes permisos")
api_token = db.query(models.APIToken).filter(
models.APIToken.id == token_id,
models.APIToken.user_id == user_id
).first()
if not api_token:
raise HTTPException(status_code=404, detail="Token no encontrado")
api_token.is_active = False
db.commit()
return {"message": "Token revocado correctamente", "token_id": token_id}
# ============= CHECKLIST ENDPOINTS ============= # ============= CHECKLIST ENDPOINTS =============
@app.get("/api/checklists", response_model=List[schemas.Checklist]) @app.get("/api/checklists", response_model=List[schemas.Checklist])
def get_checklists( def get_checklists(

View File

@@ -18,6 +18,22 @@ class User(Base):
# Relationships # Relationships
checklists_created = relationship("Checklist", back_populates="creator") checklists_created = relationship("Checklist", back_populates="creator")
inspections = relationship("Inspection", back_populates="mechanic") inspections = relationship("Inspection", back_populates="mechanic")
api_tokens = relationship("APIToken", back_populates="user", cascade="all, delete-orphan")
class APIToken(Base):
__tablename__ = "api_tokens"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
token = Column(String(100), unique=True, index=True, nullable=False)
description = Column(String(200))
is_active = Column(Boolean, default=True)
last_used_at = Column(DateTime(timezone=True))
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationship
user = relationship("User", back_populates="api_tokens")
class Checklist(Base): class Checklist(Base):

View File

@@ -13,6 +13,7 @@ class UserCreate(UserBase):
password: str password: str
class UserUpdate(BaseModel): class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[EmailStr] = None email: Optional[EmailStr] = None
full_name: Optional[str] = None full_name: Optional[str] = None
role: Optional[str] = None role: Optional[str] = None
@@ -42,6 +43,24 @@ class Token(BaseModel):
user: User user: User
# API Token Schemas
class APITokenCreate(BaseModel):
description: Optional[str] = None
class APIToken(BaseModel):
id: int
description: Optional[str] = None
is_active: bool
last_used_at: Optional[datetime] = None
created_at: datetime
class Config:
from_attributes = True
class APITokenWithValue(APIToken):
token: str
# Checklist Schemas # Checklist Schemas
class ChecklistBase(BaseModel): class ChecklistBase(BaseModel):
name: str name: str