Authentication through Tokens in FastAPI using SQLAlchemy

In most scenarios we will want to at least protect parts of the API; whether you're using a templating engine like Jinja or not, you'll almost always need to protect your API methods; for this, the most basic way is to use an authentication system, either based on session or authentication tokens; with these mechanisms, the user is granted rights so that he can access the private methods of the API and we can know at all times (done this implementation) when the user manipulates the data and at what level.

In this section we will see some authentication methods that are directly integrated into the automatic documentation and other custom schemes that consist of a username and password, integrating with the database and the generation of authentication or access tokens.

Basic authentication

The most basic authentication scheme that can be considered is where the user's credentials are managed based on a token established in the request header, specifically in the header called Authorization to be later consulted and evaluated by the request application:

api.py

from fastapi.security import APIKeyHeader
from fastapi import Depends, FastAPI, HTTPException, status

API_KEY_TOKEN = "SECRET_PASSWORD"

api_key_header = APIKeyHeader(name="Token")
@app.get("/protected-route")
async def protected_route(token: str = Depends(api_key_header)):
    if token != API_KEY_TOKEN:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
    return {"hello": "FASTAPI"}

In the example above, a generic token is set using API_KEY_TOKEN and it is checked if the token supplied in the request header is equal to the token; the APIKeyHeader class (which returns a 403 error when the token is not supplied) is used to retrieve a header value (in the above example the variable called token). Finally, it is established as one more dependency of the request.

If we go to the documentation, in the method created previously, we will see a lock, which indicates that it is protected:

If we make the request without authenticating ourselves, we will see a 403 error (the one established above):

Code Details
403
Undocumented
Error: Forbidden

Response body
Download
{
  "detail": "Not authenticated"
}

If we click on the lock and set the token correctly (a valid token):

The request will be successfully processed with a status code of type 200.

The problem with this approach is that we can only protect one route; we can create a function that performs the above check:

from fastapi.security import APIKeyHeader
from fastapi import Depends, FastAPI, HTTPException, status

API_KEY_TOKEN = "SECRET_PASSWORD"

async def authenticate(token: str = Depends(APIKeyHeader(name="Token"))):
    if token != API_KEY_TOKEN:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
    return token

And it is injected into each of the API methods in which we require the user to be authenticated; for example:

@app.get("/page/")
def page(db: Session = Depends(get_database_session), dependencies=Depends(authenticate)):
    print(getAll(db))
    #create_user(db)
    print(dependencies)
    return {"page": 1}

These examples offer a very basic authentication scheme where the received token is verified against a constant; authenticated users are not handled and there is no way to know which user is making the request.

Database authentication

The previous authentication can be used to define simple schemes, but, in most cases, it is necessary to handle more complete schemes, in which we will have multiple users with the already known username/password combination to be able to enter the application, for these cases, we can use the following scheme.

For this development, we are going to need to install a package to the project with which we can convert plain texts into hashes, specifically we will use it to generate a hash of the user's password:

$ pip install 'passlib[bcrypt]'

Next, we will modify the users model to specify the password column (in hash format) and create a relational entity to the user with which we will manage the access tokens:

database\models.py

class User(Base):
    ***
    hashed_password = Column(String(255))
    
class AccessToken(Base):
    __tablename__ = 'access_tokens'
    user_id = Column(Integer, ForeignKey('users.id'), primary_key=True)
    access_token = Column(String(255))
    expiration_date = Column(DateTime(timezone=True))
    user = relationship('User',lazy="joined")

With each of the columns of the previous relation, we have:

  • user_id allows you to handle the foreign relationship with users. It is important to note the primary key for this column, which means that there can only be one access token per user.
  • access_token holds the access token of the authenticated user.
  • expiration_date specifies the lifetime of the token.

Remember to delete all the tables in the database to create the new relationship and see the change reflected in the user model.

We create a schema of the new entity and modify the user entity to indicate the password field:

schemas.py

class User(BaseModel):
    name: str = Field(min_length=5)
    surname: str
    email: EmailStr
    website: str #HttpUrl
    class Config:
        from_attributes = True

class UserCreate(User):
    password: str

class UserDB(User):
    hashed_password: str 

class AccessToken(BaseModel):
    user_id: int
    access_token: str
    expiration_date: datetime
    class Config:
        from_attributes = True

For the user, if a user is going to be created, then the password is in plain text, since it is introduced by the user (UserCreate.password), but, when it comes to the user obtained from the database, the password is in hash format (UserDB.hashed_password).

To manage the password, we will use a new file:

authentication\password.py

import secrets
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"])

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def generate_token() -> str:
    return secrets.token_urlsafe(32)

Explanation of the above functions

  • With schemes=["bcrypt"] the encryption algorithm to be used is specified.
  • With the function get_password_hash() it allows to create a hash of the password in plain text; this function will be used later when registering a user.
  • with the function verify_password() allows you to verify a password; the main characteristic of hashes is that they cannot be reverted to a plain text once converted, therefore, there is a function to compare if a hash corresponds to a plain text, in this case, the plain text corresponds to the password in plain text; this function will be used later to perform the login.
  • Using the generate_token() function allows you to generate a token using the Python function token_urlsafe() which allows you to generate 32-byte safe tokens.

To carry out the user authentication process and generate the access token, we will use a new file:

authentication\authentication.py

from sqlalchemy.orm import Session
from datetime import datetime, timedelta

# from time
from database.models import User, AccessToken
from authentication.password import verify_password, generate_token

def authenticate(email: str, password: str, db: Session) -> User|None:
    user = db.query(User).filter(User.email == email).first()

    if user is None:
        return None
    
    if not verify_password(password, user.hashed_password):
        return None
    
    return user

def create_access_token(user:User, db: Session) -> AccessToken:
    tomorrow = datetime.now() + timedelta(days=1) # time.time + 60 * 60 * 24

    accessToken = AccessToken(user_id=user.id, expiration_date=tomorrow, access_token=generate_token())

    db.add(accessToken)
    db.commit()
    db.refresh(accessToken)

    return accessToken

Explanation of the above functions

  • The authentification() function is not very complicated, it simply looks up the user by email and checks the plaintext password against the hash of the password stored in the database.
  • The create_access_token() function allows you to generate an access token, for this, a record is inserted in the access_tokens table specifying the user id, the date (tomorrow to indicate the expiration date of the token) and generate the token for this, the generate_token() function created in the previous section is used.

Methods for the API

We will create a new file where we implement the options to register a user and create the user's token (login):

user.py

from fastapi import APIRouter, HTTPException, status, Depends
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session

from schemes import UserCreate
from authentication import password, authentication
from database import database, models

user_router = APIRouter()

@user_router.post('/token')
def create_token(form_data : OAuth2PasswordRequestForm = Depends(OAuth2PasswordRequestForm), db: Session = Depends(database.get_database_session)):
    email = form_data.username
    password = form_data.password

    user = authentication.authenticate(email,password,db)

    if user is None:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
    
    token = authentication.create_access_token(user, db)
    return {"access_token": token.access_token}

@user_router.post('/register', status_code=status.HTTP_201_CREATED)
def register(user: UserCreate, db:Session = Depends(database.get_database_session)): #  -> models.User

    user_exist = db.query(models.User).filter(models.User.email == user.email).first()
    if user_exist:
        raise HTTPException(status_code=status.HTTP_409_CONFLICT,
                            detail="User email already exist")
    
    hashed_password = password.get_password_hash(user.password)
    print(hashed_password)

    userdb = models.User(name=user.name, 
                         email= user.email, 
                         surname = user.surname, 
                         website= user.website, 
                         hashed_password=hashed_password)
    
    db.add(userdb)
    db.commit()
    db.refresh(userdb)

    return userdb

 

With the OAuth2PasswordRequestForm class, it is injected as a dependency of the function to generate the token and allows to declare in a form body with the following fields:

  1. Username.
  2. Password.
  3. An optional scope field.
  4. A key for the public and private client is also optional.

Of course, you could use the similar Pydantic classes or the 

Body, Form, or classes instead, something like:

create_token(email: str = Form(), password: str = Form())

Finally, the previous routes are included in the application:

api.py

from user import user_router
***
app.include_router(user_router)
***

And with this we will have:

This material is part of my complete course and book on FastAPI.

- Andrés Cruz

En español
Andrés Cruz

Develop with Laravel, Django, Flask, CodeIgniter, HTML5, CSS3, MySQL, JavaScript, Vue, Android, iOS, Flutter

Andrés Cruz In Udemy

I agree to receive announcements of interest about this Blog.