• nMoMo’s
  • Code Snippets
  • Contact
  • Hedgehog Gallery

nMoMo's

Stories about programming and Hedgehog.

  1. nMoMo's>
  2. TIPS>
  3. Python

FastAPI を使ってWEBアプリを作ってみる その7

2021-02-21Python,TIPSDocker,FastAPI,JWT,login,users

  • Twitter
  • Facebook
  • Pocket
  • LINE
  • LINE
  • RSS

前回の投稿では、ユーザーがサインアップできるようにユーザモデルを実装しました。
現在、ユーザーのパスワードはプレーンテキストで保存されており、ソルトについてもハードコーディングしています。

この投稿ではこれらを解決し適切な暗号化を施した上でユーザにログイン機能を提供します。
前置きをしておくと今回は少し長めになるのでコードだけ見たい方は最下部にあるGitHubリンクから入手することができます。

過去の投稿はこちらから辿ることができます。

FastAPI を使ってWEBアプリを作ってみる その1FastAPIとDockerでHelloWorld
FastAPI を使ってWEBアプリを作ってみる その2AlembicとPostgreSQLでDB Migrate
FastAPI を使ってWEBアプリを作ってみる その3APIエンドポイントをPostgreSQLに接続
FastAPI を使ってWEBアプリを作ってみる その4PytestとDockerでテスト構築
FastAPI を使ってWEBアプリを作ってみる その5RESTAPIエンドポイントを実装
FastAPI を使ってWEBアプリを作ってみる その6ユーザーモデルの作成
FastAPI を使ってWEBアプリを作ってみる その7今ここ
Contents
  • 1. セキュリティ関連の新しいパッケージ
  • 2. 認証サービスの設定
  • 3. ユーザ登録のリファクタリング
  • 4. Json Web Token
  • 5. トークンモデルの作成
  • 6. ユーザとトークンヘッダのPytest テストフィクスチャ構築
  • 7. まとめ

セキュリティ関連の新しいパッケージ

コードを記述していく前に今回利用する新しいパッケージをインストールしておきます。

fastapi==0.63.0
uvicorn==0.13.3
pydantic==1.7.3
email-validator==1.1.2

databases[postgresql]==0.4.1
SQLAlchemy==1.3.23
alembic==1.5.4
psycopg2==2.8.6

pyjwt==2.0.1
passlib[bcrypt]==1.7.4

pytest==6.2.2
pytest-asyncio==0.14.0
httpx==0.16.1
asgi-lifespan==1.0.1
docker==4.4.3

2つの新しいパッケージをインストールします。
※ bcryptのビルドにはCの処理系が必要なため使用しているディストリビューションによってはDockerfileにbuild-base libffi-dev を追加してください。

pyjwt – ユーザーの認証に使われるJSON Web Tokensのエンコードとデコードに使われます。
JWTを認証に使用するかどうかの議論はこの記事の範囲外なのでここでは触れません。
OAuth, SAMLをはじめとした他の形式の認証は FastAPI でも利用可能なので、このアプローチに違和感を感じる方は他の方法を試すことをお勧めします

Security - FastAPI

FastAPI framework, high performance, easy to learn, fast to code, ready for production

 https://fastapi.tiangolo.com/tutorial/security/

passlib – 平文でのパスワードの保存を避けるために使用するパスワードハッシュライブラリです。
前の記事で述べたようにパスワードをプレーンテキストで保存すること絶対に避けるべきです。

コンテナのリビルドをおこない依存関係をインストールします。

$ docker-compose build

コンテナが構築されるのを待つ間、セキュリティについて少し話します。

そもそもなぜパスワードをハッシュ化する必要があるのでしょうか?
これは逆の場合の問題を考える方が簡単です。
もし私たちが平文でパスワードをデータベースに保存していたら、チームの運用・管理メンバーは誰でも好きなときに全ユーザのパスワードを見ることができてしまいます。

仮に一緒に働くメンバー全員を完全に信頼することができたとしても、インターネット越しにアプリケーションにアクセスしてくる全てのトラフィックを信用することは不可能です。
もしアプリケーションがハッキングされた場合、すべてのユーザのパスワードは、追加作業なしにハッカーがそのまま利用できるようになります。
多くの人が複数のアプリケーションで同じパスワードを再利用している実態を考えるとこれは悪夢です。

この問題に対処するためにすべてのユーザパスワードに固有のハッシュを生成し、そのハッシュ化されたパスワードをデータベースに保存するようにします。
ユーザーがログインしようとすると、同じアルゴリズムを使って提出されたパスワードをハッシュ化しそのユーザーのためにデータベースに保存されているものと出力を照合します。
このアプローチを使うことでユーザのパスワードを難読化、安全にユーザを認証する方法を提供できるようになります。

しかしこの場合にも課題があります。
ハッカーはレインボーテーブルと呼ばれる、ハッシュから平文を得るために事前に計算したハッシュテーブルを使い攻撃してきます。
これに対抗するためにハッシュアルゴリズムに独自のソルトを追加します。

ユーザがサインアップするたびに、新しいソルトがランダムに生成されてデータベースに保存されます。
ユーザが認証するとソルトとパスワードが結合されて暗号化ハッシュ関数で処理されます。

この方法はハッカーが一度に1つのパスワードのみを解読することを強制することで、マルチターゲット攻撃のバッチ処理の利点を緩和します。
これにより、ハッカーはレインボーテーブルに得られる利点がなくなります。

認証に成功したユーザには、そのユーザを一意に識別するための情報を含むエンコードされたステートレスなトークンが送信されます。
認証以降の全てのリクエストはヘッダーにこのトークンを含めなければなりません。
またトークンは一定期間後に有効期限が切れるので、ユーザは永遠に認証されるわけではありません。
開発者はアプリケーションごとに最適な時間帯を正確に決定することができます。

認証サービスの設定

まずは認証関連のロジックを処理する AuthService クラスを作成することから始めます。
クラスを作成せず関数でも同様のタスクを担うことが可能ですが、今後の記事でさまざまなロジックを追加していくことを予定しているためクラスを作成し、都度リファクタしていくことにしています。

$ mkdir /backend/app/services
$ touch /backend/app/services/__init__.py
$ touch /backend/app/services/authentication.py
import bcrypt
from app.models.user import UserPasswordUpdate
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


class AuthException(BaseException):
    pass


class AuthService:
    def create_salt_and_hashed_password(
            self, *, plaintext_password: str) -> UserPasswordUpdate:
        salt = self.generate_salt()
        hashed_password = self.hash_password(password=plaintext_password, salt=salt)
        return UserPasswordUpdate(salt=salt, password=hashed_password)

    def generate_salt(self) -> str:
        return bcrypt.gensalt().decode()

    def hash_password(self, *, password: str, salt: str) -> str:
        return pwd_context.hash(password + salt)

このユーティリティクラスはパスワードをソルトしたり、平文のパスワードをハッシュ化したものに変換したりするのに必要なメソッドを提供します。

initファイルでインスタンスを作成しアプリケーション全体で使用できるようにしておきます。

from app.services.authentication import AuthService


auth_service = AuthService()

ロジックのコードを書いていく前に上記で作成したサービスを試してみましょう。
コンテナに入りPythonのreplで実行してみます。

/backend # python
Python 3.9.1 (default, Dec 17 2020, 01:59:58) 
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.

>>> from app.services import auth_service 
>>> auth_service.generate_salt()
'$2b$12$6wn52AyWtHOXI1IKNNqCXe'

>>> auth_service.hash_password(password="password", salt="123")
'$2b$12$3OvH/BonjlS217.Z8Qs8zutMf.o9cVfOhE1CEsLEXo3dxlRpVjz7C'

>>> auth_service.create_salt_and_hashed_password(plaintext_password="password")
UserPasswordUpdate(password='$2b$12$to66CZSfSACEKLTlt6ZL0.oSHuJqEeaDIhtA33BLAYO.aAsMK2poW', salt='$2b$12$AywZiFPxtPlVTL7Tphp0Je')

>>> auth_service.hash_password(password="password", salt="123")
'$2b$12$u.MOo9pYlQ/3oo5Q5/SkV.oOjS8jUXhxTrqEe6EAEkDU0Jp9PBvf6'

auth_service によって生成されるすべてのソルトとハッシュ化されたパスワードは"$2b$12$"で始まります。
2b は ident であり新しいハッシュを作成する際に使用する bcrypt アルゴリズムのバージョンを指定しています。
12$ はパスワードのハッシュ化に何回ラウンドを使うかを指定します (厳密にはログラウンドなので実際には2の12乗を使います)

bcryptはパスワードがハッシュ化されるたびに自動的にソルトを生成し、それをハッシュテーブルに保存しています。
つまり、明示的に独自のソルトを作成したりユーザと一緒にデータベースに保存したりする必要はありません。
ただしほんの少しオーバーヘッドが発生します。

詳細についてはこちらのQAを確認してください。

ユーザ登録のリファクタリング

パスワードのハッシュ化アルゴリズムをユーザ登録に組み込む準備ができたので まずは新しいテストを追加しましょう。

from app.services import auth_service
...省略

class TestUserRegistration:
    ...省略

    async def test_users_saved_password_is_hashed_and_has_salt(
        self,
        app: FastAPI,
        client: AsyncClient,
        db: Database,
    ) -> None:
        user_repo = UsersRepository(db)
        new_user = {
            "email": "nmomo@mail.com",
            "username": "nmomo",
            "password": "nmomoishedgehog"}

        res = await client.post(
            app.url_path_for("users:register-new-user"),
            json={"new_user": new_user})
        assert res.status_code == HTTP_201_CREATED

        user_in_db = await user_repo.get_user_by_email(email=new_user["email"])
        assert user_in_db is not None
        assert user_in_db.salt is not None and user_in_db.salt != "123"
        assert user_in_db.password != new_user["password"]
        assert auth_service.verify_password(
            password=new_user["password"],
            salt=user_in_db.salt,
            hashed_pw=user_in_db.password,
        )

新しいユーザーを作成しそのユーザーがデータベースに存在することを確認します。
その後、データベースに保存しているソルトが現在ハードコードされている「123」ではないことを確認します。

また、データベースに保存されているパスワードがユーザが最初にサーバに送信したパスワードではないことも確認します。
最後に verify_password関数(まだ書いていません)で送信されたパスワードのハッシュ化されたバージョンが保存されているパスワードと一致することを保証します。

ここで一度テストを実行し失敗させてください。
このエラーは前回ハードコードした “123 " のソルトを削除することで簡単に修正できます。

UsersRepositoryを更新しましょう:

from app.db.repositories.base import BaseRepository
from app.models.user import UserCreate, UserInDB
from app.services import auth_service  # 追加
from databases import Database  # 追加
from fastapi import HTTPException, status
from pydantic import EmailStr

GET_USER_BY_EMAIL_QUERY = """
    SELECT
        id, username, email, email_verified, password,
        salt, is_active, is_superuser, created_at, updated_at
    FROM
        users
    WHERE
        email = :email;
"""
GET_USER_BY_USERNAME_QUERY = """
    SELECT
        id, username, email, email_verified, password,
        salt, is_active, is_superuser, created_at, updated_at
    FROM
        users
    WHERE
        username = :username;
"""
REGISTER_NEW_USER_QUERY = """
    INSERT INTO users
        (username, email, password, salt)
    VALUES
        (:username, :email, :password, :salt)
    RETURNING
        id, username, email, email_verified, password,
        salt, is_active, is_superuser, created_at, updated_at;
"""


class UsersRepository(BaseRepository):
    def __init__(self, db: Database) -> None:
        super().__init__(db)
        self.auth_service = auth_service

    async def get_user_by_email(self, *, email: EmailStr) -> UserInDB:
        user_record = await self.db.fetch_one(
            query=GET_USER_BY_EMAIL_QUERY,
            values={"email": email})
        if not user_record:
            return None
        return UserInDB(**user_record)

    async def get_user_by_username(self, *, username: str) -> UserInDB:
        user_record = await self.db.fetch_one(
            query=GET_USER_BY_USERNAME_QUERY,
            values={"username": username})
        if not user_record:
            return None
        return UserInDB(**user_record)

    async def register_new_user(self, *, new_user: UserCreate) -> UserInDB:
        if await self.get_user_by_email(email=new_user.email):
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="このメールアドレスはすでに登録されています")

        if await self.get_user_by_username(username=new_user.username):
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="このユーザ名はすでに登録されています"
            )

        # 以下更新箇所
        user_password_update = self.auth_service.create_salt_and_hashed_password(
            plaintext_password=new_user.password)
        new_user_params = new_user.copy(update=user_password_update.dict())
        created_user = await self.db.fetch_one(
            query=REGISTER_NEW_USER_QUERY,
            values=new_user_params.dict())
        # ここまで
        return UserInDB(**created_user)

auth_service を UsersRepository にアタッチし、create_salt_and_hashed_passwordメソッドをユーザの平文パスワードに適用しています。
メソッドから返された UserPasswordUpdate を取得したら、モデルをdictにエクスポートしてREGISTER_NEW_USER_QUERY に渡す前に new_user を更新させています。

次に verify_password メソッドを実装しましょう。

import bcrypt
from app.models.user import UserPasswordUpdate
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


class AuthException(BaseException):
    pass


class AuthService:
    def create_salt_and_hashed_password(
            self, *, plaintext_password: str) -> UserPasswordUpdate:
        salt = self.generate_salt()
        hashed_password = self.hash_password(password=plaintext_password, salt=salt)
        return UserPasswordUpdate(salt=salt, password=hashed_password)

    def generate_salt(self) -> str:
        return bcrypt.gensalt().decode()

    def hash_password(self, *, password: str, salt: str) -> str:
        return pwd_context.hash(password + salt)

    # 追加
    def verify_password(self, *, password: str, salt: str, hashed_pw: str) -> bool:
        return pwd_context.verify(password + salt, hashed_pw)

verify_password メソッドは、以前に定義したパスワードのコンテキストを使用してデータベースに保存されているハッシュ化されたパスワードとパスワード/ソルトを検証します。
これでテストを実行するとパスできるはずです、試してみてください。

これで認証システムの基礎ができました。
次はJWTを使用しユーザのログインを維持させる仕組みを実装しましょう。

Json Web Token

以前Djangoの投稿でもJWTについて簡単な説明をしていますが改めてポイントだけ記載するとJWTはピリオドで区切られた3つの部分を含むエンコードされた文字列です。
これらの3つの部分はそれぞれヘッダー、ペイロード、署名と呼ばれています。

ユーザのユーザ名(またはメールアドレス)はペイロードでエンコードされ、APIを介したすべてのリクエストでユーザを識別するために使用されます。
エンコードアルゴリズムはヘッダに格納され、署名は他の2つの部分を組み合わせた文字列で構成され base64 でエンコードされた後、セキュアシークレットを使用して署名されます。

Django REST FrameworkでのJWT Authentication

前回まではDjangoとVue.jsを使用したWEBアプリの実装について紹介していましたが、一旦今回は途中では ...

 https://nmomos.com/tips/2019/07/24/django-jwt-authentication...

トークンモデルの作成

認証トークンの形状を検証するために使用するPydanticモデルを定義しましょう:

$ touch /backend/app/models/token.py
from datetime import datetime, timedelta

from app.core.config import ACCESS_TOKEN_EXPIRE_MINUTES, JWT_AUDIENCE
from app.models.core import CoreModel
from pydantic import EmailStr


class JWTMeta(CoreModel):
    iss: str = "hedgehog-reservation.com"
    aud: str = JWT_AUDIENCE
    iat: float = datetime.timestamp(datetime.utcnow())
    exp: float = datetime.timestamp(
        datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))


class JWTCreds(CoreModel):
    sub: EmailStr
    username: str


class JWTPayload(JWTMeta, JWTCreds):
    pass


class AccessToken(CoreModel):
    access_token: str
    token_type: str

JWTにエンコードするペイロードを構成するために複数のモデルを使用しています。
まず JWTMeta ではペイロードに必要な属性のほとんどを持たせています。

  • iss – トークンの発行者
  • aud – このトークンは誰のためのものか
  • iat – このトークンがいつ発行されたか
  • exp – このトークンの有効期限

これらすべてにデフォルト値を設定していますが必要に応じて任意の属性をカスタマイズすることもできます。

次に JWTCreds にはユーザーを識別するために使用するフィールド(この場合はEメールとユーザー名)が格納されます。
JWTを構築する際はこの2つのモデルを組み合わせ、結果をエンコードしてトークンのペイロードとして使用します。

このエンコードされた文字列がアクセストークンに添付され、認証に成功するとユーザーに送信されます。
AccessToken の token_type は将来的に認証システムを変更できるように拡張性を持たせるために定義しました。

そしてアプリケーション全体でJWTの扱いを標準化するために config から新しい値をインポートしています。まず最初に必要なのはシークレットキーです。

シークレットキーを生成する方法はたくさんありますが一番簡単なのはターミナルで以下を実行することです:

$ openssl rand -hex 32
570a0fc12129c...省略

570a0fc12129c…. のような文字列が得られますのでこれをシークレットキーに使用しましょう。

backend/app/core/config.pyファイルを開きSECRET_KEYのデフォルトにしたCHANGEMEを削除します。
他にも必要なJWT固有のオプションを追加してみましょう。

from databases import DatabaseURL
from starlette.config import Config
from starlette.datastructures import Secret


config = Config(".env")

PROJECT_NAME = "Hedgehog Reservation"
VERSION = "1.0.0"
API_PREFIX = "/api"

SECRET_KEY = config("SECRET_KEY", cast=Secret)
ACCESS_TOKEN_EXPIRE_MINUTES = config(
    "ACCESS_TOKEN_EXPIRE_MINUTES",
    cast=int,
    default=7 * 24 * 60
)
JWT_ALGORITHM = config("JWT_ALGORITHM", cast=str, default="HS256")
JWT_AUDIENCE = config("JWT_AUDIENCE", cast=str, default="hedgehog-reservation:auth")
JWT_TOKEN_PREFIX = config("JWT_TOKEN_PREFIX", cast=str, default="Bearer")

POSTGRES_USER = config("POSTGRES_USER", cast=str)
POSTGRES_PASSWORD = config("POSTGRES_PASSWORD", cast=Secret)
POSTGRES_SERVER = config("POSTGRES_SERVER", cast=str, default="db")
POSTGRES_PORT = config("POSTGRES_PORT", cast=str, default="5432")
POSTGRES_DB = config("POSTGRES_DB", cast=str)

DATABASE_URL = config(
    "DATABASE_URL",
    cast=DatabaseURL,
    default=f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}:{POSTGRES_PORT}/{POSTGRES_DB}"
)

デフォルトのSECRET_KEYを削除しました。
つまり .envファイルにない場合はエラーになりますので先ほど openssl が生成してくれた秘密鍵を.envファイルに追加してください。

トークンのエンコードに使用するjwtアルゴリズムと認証済みリクエストのためにユーザーが送信する認証ヘッダに表示するトークンのプレフィックスを定義しました。

一度コンテナを終了し、再度立ち上げてこれらの変数をコンテナに読み込ませてから次のステップへ進んでください。

ユーザとトークンヘッダのPytest テストフィクスチャ構築

この工程は少し複雑な作業になります。
JWTを取り入れることで起きるユーザー関連のテストに生じた皺寄せを解消するために conftest.pyファイルを新しいフィクスチャで更新しましょう。

...省略
from app.models.hedgehog import HedgehogCreate, HedgehogInDB
from app.models.user import UserCreate, UserInDB
...省略

@pytest.fixture
async def test_user(db: Database) -> UserInDB:
    new_user = UserCreate(
        email="nmomos@mail.com",
        username="nmomoishedgehog",
        password="nmomosissocute",
    )
    user_repo = UsersRepository(db)
    existing_user = await user_repo.get_user_by_email(email=new_user.email)
    if existing_user:
        return existing_user
    return await user_repo.register_new_user(new_user=new_user)

test_user フィクスチャは必要なテストデータを指定して UserCreate モデルをインスタンス化し、同じメールアドレスを持つユーザがデータベースに存在するかどうかをチェックしてレコードが見つかった場合はそのユーザを返します。
見つからなければ新しいユーザーを作成して UserInDB モデルを返します。

なぜユーザーの存在をチェックをフィクスチャ内でするのかというと、テスト用のpostgresコンテナはスコープをテストセッションに指定しています。
従って存在確認をせずにこの test_user フィクスチャを複数回実行した場合、pytest はpostgresのユニーク制約に起因するエラーをスローします。

それではトークン周りのテストケースを作っていきます:

...省略
from typing import List, Union, Type, Optional
import pytest
import jwt
from pydantic import ValidationError
from starlette.datastructures import Secret
from app.core.config import SECRET_KEY, JWT_ALGORITHM, JWT_AUDIENCE, JWT_TOKEN_PREFIX, ACCESS_TOKEN_EXPIRE_MINUTES
from app.models.token import JWTMeta, JWTCreds, JWTPayload
...省略

class TestAuthTokens:
    async def test_can_create_access_token_successfully(
        self, app: FastAPI, client: AsyncClient, test_user: UserInDB
    ) -> None:
        access_token = auth_service.create_access_token_for_user(
            user=test_user,
            secret_key=str(SECRET_KEY),
            audience=JWT_AUDIENCE,
            expires_in=ACCESS_TOKEN_EXPIRE_MINUTES,
        )
        creds = jwt.decode(access_token, str(SECRET_KEY), audience=JWT_AUDIENCE, algorithms=[JWT_ALGORITHM])
        assert creds.get("username") is not None
        assert creds["username"] == test_user.username
        assert creds["aud"] == JWT_AUDIENCE
    async def test_token_missing_user_is_invalid(self, app: FastAPI, client: AsyncClient) -> None:
        access_token = auth_service.create_access_token_for_user(
            user=None,
            secret_key=str(SECRET_KEY),
            audience=JWT_AUDIENCE,
            expires_in=ACCESS_TOKEN_EXPIRE_MINUTES,
        )
        with pytest.raises(jwt.PyJWTError):
            jwt.decode(access_token, str(SECRET_KEY), audience=JWT_AUDIENCE, algorithms=[JWT_ALGORITHM])
    @pytest.mark.parametrize(
        "secret_key, jwt_audience, exception",
        (
            ("wrong-secret", JWT_AUDIENCE, jwt.InvalidSignatureError),
            (None, JWT_AUDIENCE, jwt.InvalidSignatureError),
            (SECRET_KEY, "othersite:auth", jwt.InvalidAudienceError),
            (SECRET_KEY, None, ValidationError),
        )
    )
    async def test_invalid_token_content_raises_error(
        self,
        app: FastAPI,
        client: AsyncClient,
        test_user: UserInDB,
        secret_key: Union[str, Secret],
        jwt_audience: str,
        exception: Type[BaseException],
    ) -> None:
        with pytest.raises(exception):
            access_token = auth_service.create_access_token_for_user(
                user=test_user,
                secret_key=str(secret_key),
                audience=jwt_audience,
                expires_in=ACCESS_TOKEN_EXPIRE_MINUTES,
            )
            jwt.decode(access_token, str(SECRET_KEY), audience=JWT_AUDIENCE, algorithms=[JWT_ALGORITHM])

まず、AuthServiceクラスを使ってユーザのアクセストークンを作成できるかどうかをテストしています。pyjwtパッケージを使ってトークンをデコードし、トークンを作成したユーザのユーザ名が含まれていることを確認します。

次に、トークンにユーザがエンコードされていない場合のテストを行い、ペイロードに何も含まれていないことを確認します。

最後に、無効なシークレットとオーディエンスを create_access_token_for_user メソッドに渡し、 pyjwtパッケージが PyJWTError を発生させることを確認します。

ここでテストを実行すると create_access_token_for_user メソッドが見つからないというエラーを返しますので対応します:

...省略
from datetime import datetime, timedelta

import jwt
from app.core.config import (ACCESS_TOKEN_EXPIRE_MINUTES, JWT_ALGORITHM,
                             JWT_AUDIENCE, JWT_TOKEN_PREFIX, SECRET_KEY)
from app.models.token import JWTCreds, JWTMeta, JWTPayload
from app.models.user import UserInDB, UserPasswordUpdate
...省略

class AuthService:
    ...省略

    def create_access_token_for_user(
        self,
        *,
        user: UserInDB,
        secret_key: str = str(SECRET_KEY),
        audience: str = JWT_AUDIENCE,
        expires_in: int = ACCESS_TOKEN_EXPIRE_MINUTES,
    ) -> str:
        if not user or not isinstance(user, UserInDB):
            return None
        jwt_meta = JWTMeta(
            aud=audience,
            iat=datetime.timestamp(datetime.utcnow()),
            exp=datetime.timestamp(datetime.utcnow() + timedelta(minutes=expires_in)),
        )
        jwt_creds = JWTCreds(sub=user.email, username=user.username)
        token_payload = JWTPayload(
            **jwt_meta.dict(),
            **jwt_creds.dict(),
        )
        access_token = jwt.encode(
            token_payload.dict(),
            secret_key,
            algorithm=JWT_ALGORITHM)
        return access_token

ユーザーが渡されていない場合や、ユーザーが UserInDB のインスタンスでない場合は None を返します。

問題がなければ、トークンのmetaとcredsを作成しペイロードを構成し、JWT_ALGORITHM と SECRET_KEY を文字列にキャストしてペイロードをトークンにエンコードした文字列を返します。

これでテストを実行するとパスするようになったはずです。
トークンのロジックができたので UserPublic モデルを更新してオプションのアクセストークンも格納するようにします:

...省略
from app.models.token import AccessToken
...省略

class UserPublic(IDModelMixin, DateTimeModelMixin, UserBase):
    access_token: Optional[AccessToken]

これでユーザーが登録するとすぐにトークンと一緒に作成したユーザーを返すことができるようになりましたので register_new_user ルートを以下のように更新します:

...省略
from app.models.token import AccessToken
from app.services import auth_service
...省略

@router.post("/",
             response_model=UserPublic,
             name="users:register-new-user",
             status_code=HTTP_201_CREATED)
async def register_new_user(
    new_user: UserCreate = Body(..., embed=True),
    user_repo: UsersRepository = Depends(get_repository(UsersRepository)),
) -> UserPublic:
    created_user = await user_repo.register_new_user(new_user=new_user)
    access_token = AccessToken(
        access_token=auth_service.create_access_token_for_user(
            user=created_user), token_type="bearer")
    return UserPublic(**created_user.dict(), access_token=access_token)

これで有効なアクセストークンを返すようになりました。
UserPublic モデルに追加の属性を追加したため、テストに合格しなくなったためこれを修正します:

...省略
from app.models.user import UserCreate, UserInDB, UserPublic
...省略

class TestUserRegistration:
    async def test_users_can_register_successfully(
        self, app: FastAPI, client: AsyncClient, db: Database,
    ) -> None:
        user_repo = UsersRepository(db)
        new_user = {
            "email": "foo@bar.com",
            "username": "foobar",
            "password": "bazquxquux"}

        user_in_db = await user_repo.get_user_by_email(email=new_user["email"])
        assert user_in_db is None

        res = await client.post(
            app.url_path_for("users:register-new-user"),
            json={"new_user": new_user})
        assert res.status_code == HTTP_201_CREATED

        user_in_db = await user_repo.get_user_by_email(email=new_user["email"])
        assert user_in_db is not None
        assert user_in_db.email == new_user["email"]
        assert user_in_db.username == new_user["username"]

        created_user = UserPublic(**res.json()).dict(exclude={"access_token"})
        assert created_user == user_in_db.dict(exclude={"password", "salt"})
    ...省略

これでテストを実行するとすべて合格するようになりました!

まとめ

今回の投稿では認証周りの基礎ロジックを実装しました。
次回の投稿ではこの認証機構を使用したエンドポイントの保護を実装していきます。

作成したコードはGitHubにアップしています。
part7ブランチが対応しています。

GitHub - Nmomos/hedgehog-reservation at part7

FastAPI を使ってWEBアプリを作ってみる. Contribute to Nmomos/hedgehog-reservation development by ...

 https://github.com/Nmomos/hedgehog-reservation/tree/part7

2021-02-21Python,TIPSDocker,FastAPI,JWT,login,users

Posted by Kenny


よろしければシェアお願いします

  • Twitter
  • Facebook
  • Pocket
  • LINE
  • LINE
  • RSS
FastAPI を使ってWEBアプリを作ってみる その8
Next
FastAPI を使ってWEBアプリを作ってみる その6
Prev

関連記事

Vue.jsアプリケーションをKubernetes Clasterで開発する

この投稿ではVue.jsアプリケーションをKubernetesへデプロイしホット ...

FastAPI を使ってWEBアプリを作ってみる その4

前回の投稿ではリポジトリパターンの導入と依存性注入を行い、APIエンドポイントを ...

FastAPI を使ってWEBアプリを作ってみる その3

前回の投稿ではPostgreSQLコンテナを立ち上げてAlembicからマイグレ ...

FastAPI を使ってWEBアプリを作ってみる その5

前回の投稿ではユニットテストを行うための土台作りをし、いくつかのテストケースを実 ...

FastAPI を使ってWEBアプリを作ってみる その6

前回の投稿ではhedgehogsリソースを操作する基本的なAPIエンドポイントを ...

この記事のトラックバックURL

Copyright © 2025 nMoMo's All Rights Reserved.

WordPress Luxeritas Theme is provided by "Thought is free".