Autenticación mediante Tokens en FastAPI empleando SQLAlchemy

En la mayoría de los escenarios vamos a querer al menos proteger partes de la API; ya sea que emplees un motor de plantillas como Jinja o no, casi siempre va a ser necesario proteger los métodos de la API; para ello, la manera más básica es emplear un sistema de autenticación, ya sea en base a sesión o tokens de autenticación; con estos mecanismos, se le otorga derechos al usuario para que pueda acceder a los métodos privados de la API y podamos conocer en todo momento (realizada esta implementación) cuando el usuario manipula los datos y en qué nivel.

En este apartado veremos algunos métodos de autenticación que se integran directamente a la documentación automática y otros esquemas personalizados que constan de usuario y contraseña integrándose con la base de datos y la generación de tokens de autenticación o accesos.

Autenticación básica

El esquema que se puede considerar más básico de autenticación, viene siendo en donde las credenciales del usuario se manejan en base a un token que se establece en la cabecera de la petición, específicamente en la cabecera llamada Authorization para luego ser consultadas y evaluadas por la aplicación:

api.py

from fastapi.security import APIKeyHeader
from fastapi import Depends, FastAPI, HTTPException, status

API_KEY_TOKEN = "SECRET_PASSWORD"

api_key_header = APIKeyHeader(name="Token")
@app.get("/protected-route")
async def protected_route(token: str = Depends(api_key_header)):
    if token != API_KEY_TOKEN:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
    return {"hello": "FASTAPI"}

En el ejemplo anterior, se establece un token genérico mediante API_KEY_TOKEN y se verifica si el token suministrado en la cabecera de la petición es igual al del token; se utiliza la clase APIKeyHeader (la cual, devuelve un error de tipo 403 cuando el token no es suministrado) para recuperar un valor de un encabezado (en el ejemplo anterior la variable llamada token). Finalmente, se establece como una dependencia más de la petición.

Si vamos a la documentación, en el método creado anteriormente, veremos un candado, que indica que está protegido:

Si realizamos la petición sin autenticarnos, veremos un error 403 (el establecido anteriormente):

Code Details
403
Undocumented
Error: Forbidden

Response body
Download
{
  "detail": "Not authenticated"
}

Si damos un click sobre el candado y establecemos el token de manera correcta (un token válido):

La petición será procesada de manera exitosa con un código de estado de tipo 200.

El problema con este enfoque es que solamente podemos proteger una ruta; podemos crear una función que realice la verificación anterior:

from fastapi.security import APIKeyHeader
from fastapi import Depends, FastAPI, HTTPException, status

API_KEY_TOKEN = "SECRET_PASSWORD"

async def authenticate(token: str = Depends(APIKeyHeader(name="Token"))):
    if token != API_KEY_TOKEN:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
    return token
Y se inyecta en cada una de los métodos de la API en los cuales requerimos que el usuario esté autenticado; por ejemplo:
@app.get("/page/")
def page(db: Session = Depends(get_database_session), dependencies=Depends(authenticate)):
    print(getAll(db))
    #create_user(db)
    print(dependencies)
    return {"page": 1}

Estos ejemplos ofrecen un esquema de autenticación muy básico en donde el token recibido se verifica con una constante; no se manejan usuarios autenticados y no hay manera de saber que usuario está realizando la petición.

Autenticación por base de datos

La autenticación anterior puede servir para definir esquemas simples, pero, en la mayoría de los casos es necesaria manejar esquemas más completos, en los cuales tendremos múltiples usuarios con la combinación ya conocida de usuario/contraseña para poder ingresar a la aplicación, para estos casos, podemos emplear el siguiente esquema.

Para este desarrollo, vamos a necesitar instalar un paquete al proyecto con el cual, podremos convertir textos planos en hash, específicamente lo utilizaremos para generar un hash del password del usuario:

$ pip install 'passlib[bcrypt]'

A continuación, modificaremos el modelo de usuarios para especificar la columna del password (en formato hash) y creamos una entidad relacional al usuario con la cual, manejaremos los tokens de acceso:

database\models.py

class User(Base):
    ***
    hashed_password = Column(String(255))
    
class AccessToken(Base):
    __tablename__ = 'access_tokens'
    user_id = Column(Integer, ForeignKey('users.id'), primary_key=True)
    access_token = Column(String(255))
    expiration_date = Column(DateTime(timezone=True))
    user = relationship('User',lazy="joined")

Con cada una de las columnas de la relación anterior, tenemos:

  • user_id permite manejar la relación foránea con los usuarios es importante notar la clave primaria para esta columna, lo que significa que solamente puede existir un token de acceso por usuario.
  • access_token mantiene el token de acceso del usuario autenticado.
  • expiration_date especifica el tiempo de duración del token.

Recuerda borrar todas las tablas de la base de datos para crear la nueva relación y verse reflejado el cambio en el modelo de usuario.

Creamos un esquema de la nueva entidad y modificamos la entidad del usuario para indicar el campo de contraseña:

schemas.py

class User(BaseModel):
    name: str = Field(min_length=5)
    surname: str
    email: EmailStr
    website: str #HttpUrl
    class Config:
        from_attributes = True

class UserCreate(User):
    password: str

class UserDB(User):
    hashed_password: str 

class AccessToken(BaseModel):
    user_id: int
    access_token: str
    expiration_date: datetime
    class Config:
        from_attributes = True

Para el usuario, si se va a crear un usuario, entonces la contraseña esta en texto plano, ya que, es la introducida por el usuario (UserCreate.password), pero, cuando se trata del usuario obtenido desde la base de datos, la contraseña esta en formato hash (UserDB.hashed_password). 

Para gestionar la contraseña, usaremos un nuevo archivo:

authentication\password.py

import secrets
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"])

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def generate_token() -> str:
    return secrets.token_urlsafe(32)

Explicación de las funciones anteriores

  • Con schemes=["bcrypt"] se especifica el algoritmo de encriptado a emplear.
  • Con la función get_password_hash() permite crear un hash de la contraseña en texto plano; esta función será empleada más adelante al momento de registrar un usuario.
  • con la función verify_password() permite verificar una contraseña; la característica principal de los hash es que los mismos no pueden revertirse a un texto plano una vez convertidas, por lo tanto, existe una función para comparar si un hash corresponde con un texto plano, en este caso, el texto plano corresponde a la contraseña en texto plano; esta función será empleada más adelante para realizar el login.
  • Con la función de generate_token() permite generar un token empleando la función de Python token_urlsafe() que permite generar tokens seguros de 32 bytes.

Para realizar el proceso de la autenticación del usuario y generar el token de acceso, usaremos un nuevo archivo:

authentication\authentication.py

from sqlalchemy.orm import Session
from datetime import datetime, timedelta

# from time
from database.models import User, AccessToken
from authentication.password import verify_password, generate_token

def authenticate(email: str, password: str, db: Session) -> User|None:
    user = db.query(User).filter(User.email == email).first()

    if user is None:
        return None
    
    if not verify_password(password, user.hashed_password):
        return None
    
    return user

def create_access_token(user:User, db: Session) -> AccessToken:
    tomorrow = datetime.now() + timedelta(days=1) # time.time + 60 * 60 * 24

    accessToken = AccessToken(user_id=user.id, expiration_date=tomorrow, access_token=generate_token())

    db.add(accessToken)
    db.commit()
    db.refresh(accessToken)

    return accessToken

Explicación de las funciones anteriores

  • La función de authenticate() no tiene mucha complicación, simplemente busca el usuario por el email y verifica la contraseña en texto plano contra el hash de la contraseña guardada en la base de datos.
  • La función de create_access_token() permite generar un token de acceso, para ello, se inserta un registro en la tabla de access_tokens especificando el id del usuario, la fecha (de mañana para indicar que la fecha de expiración del token) y generar el token, para ello se emplea la función de generate_token() creada en el apartado anterior.

Métodos para la API

Crearemos un nuevo archivo en donde implementamos las opciones de registrar un usuario y crear el token (login) del usuario:

user.py

from fastapi import APIRouter, HTTPException, status, Depends
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session

from schemes import UserCreate
from authentication import password, authentication
from database import database, models

user_router = APIRouter()

@user_router.post('/token')
def create_token(form_data : OAuth2PasswordRequestForm = Depends(OAuth2PasswordRequestForm), db: Session = Depends(database.get_database_session)):
    email = form_data.username
    password = form_data.password

    user = authentication.authenticate(email,password,db)

    if user is None:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
    
    token = authentication.create_access_token(user, db)
    return {"access_token": token.access_token}

@user_router.post('/register', status_code=status.HTTP_201_CREATED)
def register(user: UserCreate, db:Session = Depends(database.get_database_session)): #  -> models.User

    user_exist = db.query(models.User).filter(models.User.email == user.email).first()
    if user_exist:
        raise HTTPException(status_code=status.HTTP_409_CONFLICT,
                            detail="User email already exist")
    
    hashed_password = password.get_password_hash(user.password)
    print(hashed_password)

    userdb = models.User(name=user.name, 
                         email= user.email, 
                         surname = user.surname, 
                         website= user.website, 
                         hashed_password=hashed_password)
    
    db.add(userdb)
    db.commit()
    db.refresh(userdb)

    return userdb

 

Con la clase OAuth2PasswordRequestForm es inyectada como dependencia de la función para generar el token y permite declarar en un cuerpo de formulario con los siguientes campos:

  1. El nombre de usuario.
  2. La contraseña.
  3. Un campo de scope opcional.
  4. Una clave para el cliente pública y privada también opcional.

Claro está, pudieras emplear en su lugar las clases de Pydantic o las clases Body, Form o similares, algo como:

create_token(email: str = Form(), password: str = Form())

 Finalmente, son incluidas las rutas anteriores en la aplicación:

api.py

from user import user_router
***
app.include_router(user_router)
***

Y con esto tendremos:

Este material forma parte de mi curso y libro completo sobre FastAPI.

- Andrés Cruz

In english
Andrés Cruz

Desarrollo con Laravel, Django, Flask, CodeIgniter, HTML5, CSS3, MySQL, JavaScript, Vue, Android, iOS, Flutter

Andrés Cruz En Udemy

Acepto recibir anuncios de interes sobre este Blog.