FastAPI を使ってWEBアプリを作ってみる その7
前回の投稿では、ユーザーがサインアップできるようにユーザモデルを実装しました。
現在、ユーザーのパスワードはプレーンテキストで保存されており、ソルトについてもハードコーディングしています。
この投稿ではこれらを解決し適切な暗号化を施した上でユーザにログイン機能を提供します。
前置きをしておくと今回は少し長めになるのでコードだけ見たい方は最下部にあるGitHubリンクから入手することができます。
過去の投稿はこちらから辿ることができます。
FastAPI を使ってWEBアプリを作ってみる その1 | FastAPIとDockerでHelloWorld |
FastAPI を使ってWEBアプリを作ってみる その2 | AlembicとPostgreSQLでDB Migrate |
FastAPI を使ってWEBアプリを作ってみる その3 | APIエンドポイントをPostgreSQLに接続 |
FastAPI を使ってWEBアプリを作ってみる その4 | PytestとDockerでテスト構築 |
FastAPI を使ってWEBアプリを作ってみる その5 | RESTAPIエンドポイントを実装 |
FastAPI を使ってWEBアプリを作ってみる その6 | ユーザーモデルの作成 |
FastAPI を使ってWEBアプリを作ってみる その7 | 今ここ |
セキュリティ関連の新しいパッケージ
コードを記述していく前に今回利用する新しいパッケージをインストールしておきます。
fastapi==0.63.0
uvicorn==0.13.3
pydantic==1.7.3
email-validator==1.1.2
databases[postgresql]==0.4.1
SQLAlchemy==1.3.23
alembic==1.5.4
psycopg2==2.8.6
pyjwt==2.0.1
passlib[bcrypt]==1.7.4
pytest==6.2.2
pytest-asyncio==0.14.0
httpx==0.16.1
asgi-lifespan==1.0.1
docker==4.4.3
2つの新しいパッケージをインストールします。
※ bcryptのビルドにはCの処理系が必要なため使用しているディストリビューションによってはDockerfileにbuild-base libffi-dev
を追加してください。
pyjwt – ユーザーの認証に使われるJSON Web Tokensのエンコードとデコードに使われます。
JWTを認証に使用するかどうかの議論はこの記事の範囲外なのでここでは触れません。
OAuth, SAMLをはじめとした他の形式の認証は FastAPI でも利用可能なので、このアプローチに違和感を感じる方は他の方法を試すことをお勧めします
passlib – 平文でのパスワードの保存を避けるために使用するパスワードハッシュライブラリです。
前の記事で述べたようにパスワードをプレーンテキストで保存すること絶対に避けるべきです。
コンテナのリビルドをおこない依存関係をインストールします。
$ docker-compose build
コンテナが構築されるのを待つ間、セキュリティについて少し話します。
そもそもなぜパスワードをハッシュ化する必要があるのでしょうか?
これは逆の場合の問題を考える方が簡単です。
もし私たちが平文でパスワードをデータベースに保存していたら、チームの運用・管理メンバーは誰でも好きなときに全ユーザのパスワードを見ることができてしまいます。
仮に一緒に働くメンバー全員を完全に信頼することができたとしても、インターネット越しにアプリケーションにアクセスしてくる全てのトラフィックを信用することは不可能です。
もしアプリケーションがハッキングされた場合、すべてのユーザのパスワードは、追加作業なしにハッカーがそのまま利用できるようになります。
多くの人が複数のアプリケーションで同じパスワードを再利用している実態を考えるとこれは悪夢です。
この問題に対処するためにすべてのユーザパスワードに固有のハッシュを生成し、そのハッシュ化されたパスワードをデータベースに保存するようにします。
ユーザーがログインしようとすると、同じアルゴリズムを使って提出されたパスワードをハッシュ化しそのユーザーのためにデータベースに保存されているものと出力を照合します。
このアプローチを使うことでユーザのパスワードを難読化、安全にユーザを認証する方法を提供できるようになります。
しかしこの場合にも課題があります。
ハッカーはレインボーテーブルと呼ばれる、ハッシュから平文を得るために事前に計算したハッシュテーブルを使い攻撃してきます。
これに対抗するためにハッシュアルゴリズムに独自のソルトを追加します。
ユーザがサインアップするたびに、新しいソルトがランダムに生成されてデータベースに保存されます。
ユーザが認証するとソルトとパスワードが結合されて暗号化ハッシュ関数で処理されます。
この方法はハッカーが一度に1つのパスワードのみを解読することを強制することで、マルチターゲット攻撃のバッチ処理の利点を緩和します。
これにより、ハッカーはレインボーテーブルに得られる利点がなくなります。
認証に成功したユーザには、そのユーザを一意に識別するための情報を含むエンコードされたステートレスなトークンが送信されます。
認証以降の全てのリクエストはヘッダーにこのトークンを含めなければなりません。
またトークンは一定期間後に有効期限が切れるので、ユーザは永遠に認証されるわけではありません。
開発者はアプリケーションごとに最適な時間帯を正確に決定することができます。
認証サービスの設定
まずは認証関連のロジックを処理する AuthService クラスを作成することから始めます。
クラスを作成せず関数でも同様のタスクを担うことが可能ですが、今後の記事でさまざまなロジックを追加していくことを予定しているためクラスを作成し、都度リファクタしていくことにしています。
$ mkdir /backend/app/services
$ touch /backend/app/services/__init__.py
$ touch /backend/app/services/authentication.py
import bcrypt
from app.models.user import UserPasswordUpdate
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class AuthException(BaseException):
pass
class AuthService:
def create_salt_and_hashed_password(
self, *, plaintext_password: str) -> UserPasswordUpdate:
salt = self.generate_salt()
hashed_password = self.hash_password(password=plaintext_password, salt=salt)
return UserPasswordUpdate(salt=salt, password=hashed_password)
def generate_salt(self) -> str:
return bcrypt.gensalt().decode()
def hash_password(self, *, password: str, salt: str) -> str:
return pwd_context.hash(password + salt)
このユーティリティクラスはパスワードをソルトしたり、平文のパスワードをハッシュ化したものに変換したりするのに必要なメソッドを提供します。
initファイルでインスタンスを作成しアプリケーション全体で使用できるようにしておきます。
from app.services.authentication import AuthService
auth_service = AuthService()
ロジックのコードを書いていく前に上記で作成したサービスを試してみましょう。
コンテナに入りPythonのreplで実行してみます。
/backend # python
Python 3.9.1 (default, Dec 17 2020, 01:59:58)
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from app.services import auth_service
>>> auth_service.generate_salt()
'$2b$12$6wn52AyWtHOXI1IKNNqCXe'
>>> auth_service.hash_password(password="password", salt="123")
'$2b$12$3OvH/BonjlS217.Z8Qs8zutMf.o9cVfOhE1CEsLEXo3dxlRpVjz7C'
>>> auth_service.create_salt_and_hashed_password(plaintext_password="password")
UserPasswordUpdate(password='$2b$12$to66CZSfSACEKLTlt6ZL0.oSHuJqEeaDIhtA33BLAYO.aAsMK2poW', salt='$2b$12$AywZiFPxtPlVTL7Tphp0Je')
>>> auth_service.hash_password(password="password", salt="123")
'$2b$12$u.MOo9pYlQ/3oo5Q5/SkV.oOjS8jUXhxTrqEe6EAEkDU0Jp9PBvf6'
auth_service
によって生成されるすべてのソルトとハッシュ化されたパスワードは"$2b$12$"で始まります。
2b は ident であり新しいハッシュを作成する際に使用する bcrypt アルゴリズムのバージョンを指定しています。
12$ はパスワードのハッシュ化に何回ラウンドを使うかを指定します (厳密にはログラウンドなので実際には2の12乗を使います)
bcryptはパスワードがハッシュ化されるたびに自動的にソルトを生成し、それをハッシュテーブルに保存しています。
つまり、明示的に独自のソルトを作成したりユーザと一緒にデータベースに保存したりする必要はありません。
ただしほんの少しオーバーヘッドが発生します。
詳細についてはこちらのQAを確認してください。
ユーザ登録のリファクタリング
パスワードのハッシュ化アルゴリズムをユーザ登録に組み込む準備ができたので まずは新しいテストを追加しましょう。
from app.services import auth_service
...省略
class TestUserRegistration:
...省略
async def test_users_saved_password_is_hashed_and_has_salt(
self,
app: FastAPI,
client: AsyncClient,
db: Database,
) -> None:
user_repo = UsersRepository(db)
new_user = {
"email": "nmomo@mail.com",
"username": "nmomo",
"password": "nmomoishedgehog"}
res = await client.post(
app.url_path_for("users:register-new-user"),
json={"new_user": new_user})
assert res.status_code == HTTP_201_CREATED
user_in_db = await user_repo.get_user_by_email(email=new_user["email"])
assert user_in_db is not None
assert user_in_db.salt is not None and user_in_db.salt != "123"
assert user_in_db.password != new_user["password"]
assert auth_service.verify_password(
password=new_user["password"],
salt=user_in_db.salt,
hashed_pw=user_in_db.password,
)
新しいユーザーを作成しそのユーザーがデータベースに存在することを確認します。
その後、データベースに保存しているソルトが現在ハードコードされている「123」ではないことを確認します。
また、データベースに保存されているパスワードがユーザが最初にサーバに送信したパスワードではないことも確認します。
最後に verify_password関数(まだ書いていません)で送信されたパスワードのハッシュ化されたバージョンが保存されているパスワードと一致することを保証します。
ここで一度テストを実行し失敗させてください。
このエラーは前回ハードコードした “123 " のソルトを削除することで簡単に修正できます。
UsersRepositoryを更新しましょう:
from app.db.repositories.base import BaseRepository
from app.models.user import UserCreate, UserInDB
from app.services import auth_service # 追加
from databases import Database # 追加
from fastapi import HTTPException, status
from pydantic import EmailStr
GET_USER_BY_EMAIL_QUERY = """
SELECT
id, username, email, email_verified, password,
salt, is_active, is_superuser, created_at, updated_at
FROM
users
WHERE
email = :email;
"""
GET_USER_BY_USERNAME_QUERY = """
SELECT
id, username, email, email_verified, password,
salt, is_active, is_superuser, created_at, updated_at
FROM
users
WHERE
username = :username;
"""
REGISTER_NEW_USER_QUERY = """
INSERT INTO users
(username, email, password, salt)
VALUES
(:username, :email, :password, :salt)
RETURNING
id, username, email, email_verified, password,
salt, is_active, is_superuser, created_at, updated_at;
"""
class UsersRepository(BaseRepository):
def __init__(self, db: Database) -> None:
super().__init__(db)
self.auth_service = auth_service
async def get_user_by_email(self, *, email: EmailStr) -> UserInDB:
user_record = await self.db.fetch_one(
query=GET_USER_BY_EMAIL_QUERY,
values={"email": email})
if not user_record:
return None
return UserInDB(**user_record)
async def get_user_by_username(self, *, username: str) -> UserInDB:
user_record = await self.db.fetch_one(
query=GET_USER_BY_USERNAME_QUERY,
values={"username": username})
if not user_record:
return None
return UserInDB(**user_record)
async def register_new_user(self, *, new_user: UserCreate) -> UserInDB:
if await self.get_user_by_email(email=new_user.email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="このメールアドレスはすでに登録されています")
if await self.get_user_by_username(username=new_user.username):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="このユーザ名はすでに登録されています"
)
# 以下更新箇所
user_password_update = self.auth_service.create_salt_and_hashed_password(
plaintext_password=new_user.password)
new_user_params = new_user.copy(update=user_password_update.dict())
created_user = await self.db.fetch_one(
query=REGISTER_NEW_USER_QUERY,
values=new_user_params.dict())
# ここまで
return UserInDB(**created_user)
auth_service を UsersRepository にアタッチし、create_salt_and_hashed_passwordメソッドをユーザの平文パスワードに適用しています。
メソッドから返された UserPasswordUpdate を取得したら、モデルをdictにエクスポートしてREGISTER_NEW_USER_QUERY に渡す前に new_user を更新させています。
次に verify_password メソッドを実装しましょう。
import bcrypt
from app.models.user import UserPasswordUpdate
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class AuthException(BaseException):
pass
class AuthService:
def create_salt_and_hashed_password(
self, *, plaintext_password: str) -> UserPasswordUpdate:
salt = self.generate_salt()
hashed_password = self.hash_password(password=plaintext_password, salt=salt)
return UserPasswordUpdate(salt=salt, password=hashed_password)
def generate_salt(self) -> str:
return bcrypt.gensalt().decode()
def hash_password(self, *, password: str, salt: str) -> str:
return pwd_context.hash(password + salt)
# 追加
def verify_password(self, *, password: str, salt: str, hashed_pw: str) -> bool:
return pwd_context.verify(password + salt, hashed_pw)
verify_password メソッドは、以前に定義したパスワードのコンテキストを使用してデータベースに保存されているハッシュ化されたパスワードとパスワード/ソルトを検証します。
これでテストを実行するとパスできるはずです、試してみてください。
これで認証システムの基礎ができました。
次はJWTを使用しユーザのログインを維持させる仕組みを実装しましょう。
Json Web Token
以前Djangoの投稿でもJWTについて簡単な説明をしていますが改めてポイントだけ記載するとJWTはピリオドで区切られた3つの部分を含むエンコードされた文字列です。
これらの3つの部分はそれぞれヘッダー、ペイロード、署名と呼ばれています。
ユーザのユーザ名(またはメールアドレス)はペイロードでエンコードされ、APIを介したすべてのリクエストでユーザを識別するために使用されます。
エンコードアルゴリズムはヘッダに格納され、署名は他の2つの部分を組み合わせた文字列で構成され base64 でエンコードされた後、セキュアシークレットを使用して署名されます。
トークンモデルの作成
認証トークンの形状を検証するために使用するPydanticモデルを定義しましょう:
$ touch /backend/app/models/token.py
from datetime import datetime, timedelta
from app.core.config import ACCESS_TOKEN_EXPIRE_MINUTES, JWT_AUDIENCE
from app.models.core import CoreModel
from pydantic import EmailStr
class JWTMeta(CoreModel):
iss: str = "hedgehog-reservation.com"
aud: str = JWT_AUDIENCE
iat: float = datetime.timestamp(datetime.utcnow())
exp: float = datetime.timestamp(
datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
class JWTCreds(CoreModel):
sub: EmailStr
username: str
class JWTPayload(JWTMeta, JWTCreds):
pass
class AccessToken(CoreModel):
access_token: str
token_type: str
JWTにエンコードするペイロードを構成するために複数のモデルを使用しています。
まず JWTMeta ではペイロードに必要な属性のほとんどを持たせています。
- iss – トークンの発行者
- aud – このトークンは誰のためのものか
- iat – このトークンがいつ発行されたか
- exp – このトークンの有効期限
これらすべてにデフォルト値を設定していますが必要に応じて任意の属性をカスタマイズすることもできます。
次に JWTCreds にはユーザーを識別するために使用するフィールド(この場合はEメールとユーザー名)が格納されます。
JWTを構築する際はこの2つのモデルを組み合わせ、結果をエンコードしてトークンのペイロードとして使用します。
このエンコードされた文字列がアクセストークンに添付され、認証に成功するとユーザーに送信されます。
AccessToken の token_type は将来的に認証システムを変更できるように拡張性を持たせるために定義しました。
そしてアプリケーション全体でJWTの扱いを標準化するために config から新しい値をインポートしています。まず最初に必要なのはシークレットキーです。
シークレットキーを生成する方法はたくさんありますが一番簡単なのはターミナルで以下を実行することです:
$ openssl rand -hex 32
570a0fc12129c...省略
570a0fc12129c…. のような文字列が得られますのでこれをシークレットキーに使用しましょう。
backend/app/core/config.pyファイルを開きSECRET_KEYのデフォルトにしたCHANGEMEを削除します。
他にも必要なJWT固有のオプションを追加してみましょう。
from databases import DatabaseURL
from starlette.config import Config
from starlette.datastructures import Secret
config = Config(".env")
PROJECT_NAME = "Hedgehog Reservation"
VERSION = "1.0.0"
API_PREFIX = "/api"
SECRET_KEY = config("SECRET_KEY", cast=Secret)
ACCESS_TOKEN_EXPIRE_MINUTES = config(
"ACCESS_TOKEN_EXPIRE_MINUTES",
cast=int,
default=7 * 24 * 60
)
JWT_ALGORITHM = config("JWT_ALGORITHM", cast=str, default="HS256")
JWT_AUDIENCE = config("JWT_AUDIENCE", cast=str, default="hedgehog-reservation:auth")
JWT_TOKEN_PREFIX = config("JWT_TOKEN_PREFIX", cast=str, default="Bearer")
POSTGRES_USER = config("POSTGRES_USER", cast=str)
POSTGRES_PASSWORD = config("POSTGRES_PASSWORD", cast=Secret)
POSTGRES_SERVER = config("POSTGRES_SERVER", cast=str, default="db")
POSTGRES_PORT = config("POSTGRES_PORT", cast=str, default="5432")
POSTGRES_DB = config("POSTGRES_DB", cast=str)
DATABASE_URL = config(
"DATABASE_URL",
cast=DatabaseURL,
default=f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_SERVER}:{POSTGRES_PORT}/{POSTGRES_DB}"
)
デフォルトのSECRET_KEYを削除しました。
つまり .envファイルにない場合はエラーになりますので先ほど openssl が生成してくれた秘密鍵を.envファイルに追加してください。
トークンのエンコードに使用するjwtアルゴリズムと認証済みリクエストのためにユーザーが送信する認証ヘッダに表示するトークンのプレフィックスを定義しました。
一度コンテナを終了し、再度立ち上げてこれらの変数をコンテナに読み込ませてから次のステップへ進んでください。
ユーザとトークンヘッダのPytest テストフィクスチャ構築
この工程は少し複雑な作業になります。
JWTを取り入れることで起きるユーザー関連のテストに生じた皺寄せを解消するために conftest.pyファイルを新しいフィクスチャで更新しましょう。
...省略
from app.models.hedgehog import HedgehogCreate, HedgehogInDB
from app.models.user import UserCreate, UserInDB
...省略
@pytest.fixture
async def test_user(db: Database) -> UserInDB:
new_user = UserCreate(
email="nmomos@mail.com",
username="nmomoishedgehog",
password="nmomosissocute",
)
user_repo = UsersRepository(db)
existing_user = await user_repo.get_user_by_email(email=new_user.email)
if existing_user:
return existing_user
return await user_repo.register_new_user(new_user=new_user)
test_user フィクスチャは必要なテストデータを指定して UserCreate モデルをインスタンス化し、同じメールアドレスを持つユーザがデータベースに存在するかどうかをチェックしてレコードが見つかった場合はそのユーザを返します。
見つからなければ新しいユーザーを作成して UserInDB モデルを返します。
なぜユーザーの存在をチェックをフィクスチャ内でするのかというと、テスト用のpostgresコンテナはスコープをテストセッションに指定しています。
従って存在確認をせずにこの test_user フィクスチャを複数回実行した場合、pytest はpostgresのユニーク制約に起因するエラーをスローします。
それではトークン周りのテストケースを作っていきます:
...省略
from typing import List, Union, Type, Optional
import pytest
import jwt
from pydantic import ValidationError
from starlette.datastructures import Secret
from app.core.config import SECRET_KEY, JWT_ALGORITHM, JWT_AUDIENCE, JWT_TOKEN_PREFIX, ACCESS_TOKEN_EXPIRE_MINUTES
from app.models.token import JWTMeta, JWTCreds, JWTPayload
...省略
class TestAuthTokens:
async def test_can_create_access_token_successfully(
self, app: FastAPI, client: AsyncClient, test_user: UserInDB
) -> None:
access_token = auth_service.create_access_token_for_user(
user=test_user,
secret_key=str(SECRET_KEY),
audience=JWT_AUDIENCE,
expires_in=ACCESS_TOKEN_EXPIRE_MINUTES,
)
creds = jwt.decode(access_token, str(SECRET_KEY), audience=JWT_AUDIENCE, algorithms=[JWT_ALGORITHM])
assert creds.get("username") is not None
assert creds["username"] == test_user.username
assert creds["aud"] == JWT_AUDIENCE
async def test_token_missing_user_is_invalid(self, app: FastAPI, client: AsyncClient) -> None:
access_token = auth_service.create_access_token_for_user(
user=None,
secret_key=str(SECRET_KEY),
audience=JWT_AUDIENCE,
expires_in=ACCESS_TOKEN_EXPIRE_MINUTES,
)
with pytest.raises(jwt.PyJWTError):
jwt.decode(access_token, str(SECRET_KEY), audience=JWT_AUDIENCE, algorithms=[JWT_ALGORITHM])
@pytest.mark.parametrize(
"secret_key, jwt_audience, exception",
(
("wrong-secret", JWT_AUDIENCE, jwt.InvalidSignatureError),
(None, JWT_AUDIENCE, jwt.InvalidSignatureError),
(SECRET_KEY, "othersite:auth", jwt.InvalidAudienceError),
(SECRET_KEY, None, ValidationError),
)
)
async def test_invalid_token_content_raises_error(
self,
app: FastAPI,
client: AsyncClient,
test_user: UserInDB,
secret_key: Union[str, Secret],
jwt_audience: str,
exception: Type[BaseException],
) -> None:
with pytest.raises(exception):
access_token = auth_service.create_access_token_for_user(
user=test_user,
secret_key=str(secret_key),
audience=jwt_audience,
expires_in=ACCESS_TOKEN_EXPIRE_MINUTES,
)
jwt.decode(access_token, str(SECRET_KEY), audience=JWT_AUDIENCE, algorithms=[JWT_ALGORITHM])
まず、AuthServiceクラスを使ってユーザのアクセストークンを作成できるかどうかをテストしています。pyjwtパッケージを使ってトークンをデコードし、トークンを作成したユーザのユーザ名が含まれていることを確認します。
次に、トークンにユーザがエンコードされていない場合のテストを行い、ペイロードに何も含まれていないことを確認します。
最後に、無効なシークレットとオーディエンスを create_access_token_for_user
メソッドに渡し、 pyjwtパッケージが PyJWTError
を発生させることを確認します。
ここでテストを実行すると create_access_token_for_user
メソッドが見つからないというエラーを返しますので対応します:
...省略
from datetime import datetime, timedelta
import jwt
from app.core.config import (ACCESS_TOKEN_EXPIRE_MINUTES, JWT_ALGORITHM,
JWT_AUDIENCE, JWT_TOKEN_PREFIX, SECRET_KEY)
from app.models.token import JWTCreds, JWTMeta, JWTPayload
from app.models.user import UserInDB, UserPasswordUpdate
...省略
class AuthService:
...省略
def create_access_token_for_user(
self,
*,
user: UserInDB,
secret_key: str = str(SECRET_KEY),
audience: str = JWT_AUDIENCE,
expires_in: int = ACCESS_TOKEN_EXPIRE_MINUTES,
) -> str:
if not user or not isinstance(user, UserInDB):
return None
jwt_meta = JWTMeta(
aud=audience,
iat=datetime.timestamp(datetime.utcnow()),
exp=datetime.timestamp(datetime.utcnow() + timedelta(minutes=expires_in)),
)
jwt_creds = JWTCreds(sub=user.email, username=user.username)
token_payload = JWTPayload(
**jwt_meta.dict(),
**jwt_creds.dict(),
)
access_token = jwt.encode(
token_payload.dict(),
secret_key,
algorithm=JWT_ALGORITHM)
return access_token
ユーザーが渡されていない場合や、ユーザーが UserInDB のインスタンスでない場合は None を返します。
問題がなければ、トークンのmetaとcredsを作成しペイロードを構成し、JWT_ALGORITHM と SECRET_KEY を文字列にキャストしてペイロードをトークンにエンコードした文字列を返します。
これでテストを実行するとパスするようになったはずです。
トークンのロジックができたので UserPublic モデルを更新してオプションのアクセストークンも格納するようにします:
...省略
from app.models.token import AccessToken
...省略
class UserPublic(IDModelMixin, DateTimeModelMixin, UserBase):
access_token: Optional[AccessToken]
これでユーザーが登録するとすぐにトークンと一緒に作成したユーザーを返すことができるようになりましたので register_new_user
ルートを以下のように更新します:
...省略
from app.models.token import AccessToken
from app.services import auth_service
...省略
@router.post("/",
response_model=UserPublic,
name="users:register-new-user",
status_code=HTTP_201_CREATED)
async def register_new_user(
new_user: UserCreate = Body(..., embed=True),
user_repo: UsersRepository = Depends(get_repository(UsersRepository)),
) -> UserPublic:
created_user = await user_repo.register_new_user(new_user=new_user)
access_token = AccessToken(
access_token=auth_service.create_access_token_for_user(
user=created_user), token_type="bearer")
return UserPublic(**created_user.dict(), access_token=access_token)
これで有効なアクセストークンを返すようになりました。
UserPublic モデルに追加の属性を追加したため、テストに合格しなくなったためこれを修正します:
...省略
from app.models.user import UserCreate, UserInDB, UserPublic
...省略
class TestUserRegistration:
async def test_users_can_register_successfully(
self, app: FastAPI, client: AsyncClient, db: Database,
) -> None:
user_repo = UsersRepository(db)
new_user = {
"email": "foo@bar.com",
"username": "foobar",
"password": "bazquxquux"}
user_in_db = await user_repo.get_user_by_email(email=new_user["email"])
assert user_in_db is None
res = await client.post(
app.url_path_for("users:register-new-user"),
json={"new_user": new_user})
assert res.status_code == HTTP_201_CREATED
user_in_db = await user_repo.get_user_by_email(email=new_user["email"])
assert user_in_db is not None
assert user_in_db.email == new_user["email"]
assert user_in_db.username == new_user["username"]
created_user = UserPublic(**res.json()).dict(exclude={"access_token"})
assert created_user == user_in_db.dict(exclude={"password", "salt"})
...省略
これでテストを実行するとすべて合格するようになりました!
まとめ
今回の投稿では認証周りの基礎ロジックを実装しました。
次回の投稿ではこの認証機構を使用したエンドポイントの保護を実装していきます。
作成したコードはGitHubにアップしています。
part7ブランチが対応しています。