Ich weiß, ich weiß, du denkst wahrscheinlich "was nochmal ?!"
Ja, auf Habré haben sie bereits viele Male über das FastAPI- Framework geschrieben . Ich schlage jedoch vor, dieses Tool etwas detaillierter zu betrachten und die API Ihres eigenen Mini-Habr ohne Karma und Bewertungen zu schreiben, jedoch
Datenbankschema und Migrationen
Zunächst beschreiben wir mit SQLAlchemy Expression Language das Datenbankschema. Erstellen wir eine Datei models / users.py :
import sqlalchemy
from sqlalchemy.dialects.postgresql import UUID
metadata = sqlalchemy.MetaData()
users_table = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("email", sqlalchemy.String(40), unique=True, index=True),
sqlalchemy.Column("name", sqlalchemy.String(100)),
sqlalchemy.Column("hashed_password", sqlalchemy.String()),
sqlalchemy.Column(
"is_active",
sqlalchemy.Boolean(),
server_default=sqlalchemy.sql.expression.true(),
nullable=False,
),
)
tokens_table = sqlalchemy.Table(
"tokens",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column(
"token",
UUID(as_uuid=False),
server_default=sqlalchemy.text("uuid_generate_v4()"),
unique=True,
nullable=False,
index=True,
),
sqlalchemy.Column("expires", sqlalchemy.DateTime()),
sqlalchemy.Column("user_id", sqlalchemy.ForeignKey("users.id")),
)
Und die Datei models / posts.py :
import sqlalchemy
from .users import users_table
metadata = sqlalchemy.MetaData()
posts_table = sqlalchemy.Table(
"posts",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("user_id", sqlalchemy.ForeignKey(users_table.c.id)),
sqlalchemy.Column("created_at", sqlalchemy.DateTime()),
sqlalchemy.Column("title", sqlalchemy.String(100)),
sqlalchemy.Column("content", sqlalchemy.Text()),
)
Installieren Sie alembic , um Datenbankmigrationen zu automatisieren :
$ pip install alembic
Führen Sie Folgendes aus, um Alembic zu initialisieren:
$ alembic init migrations
Dieser Befehl wird in der aktuellen Verzeichnisdatei alembic.ini und einem Migrationsverzeichnis erstellt, das Folgendes enthält:
- Versionen Verzeichnis wo Migrationsdateien gespeichert werden
- env.py- Skript , das ausgeführt wird, wenn alembic aufgerufen wird
- Eine script.py.mako- Datei, die die Vorlage für neue Migrationen enthält.
Wir geben die URL unserer Datenbank an. Fügen Sie dazu in der Datei alembic.ini die folgende Zeile hinzu:
sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASS)s@%(DB_HOST)s:5432/%(DB_NAME)s
Das Format % (Variablenname) ermöglicht es uns, je nach Umgebung unterschiedliche Werte für Variablen festzulegen und diese in der Datei env.py wie folgt zu überschreiben :
from os import environ
from alembic import context
from app.models import posts, users
# Alembic Config
# alembic.ini
config = context.config
section = config.config_ini_section
config.set_section_option(section, "DB_USER", environ.get("DB_USER"))
config.set_section_option(section, "DB_PASS", environ.get("DB_PASS"))
config.set_section_option(section, "DB_NAME", environ.get("DB_NAME"))
config.set_section_option(section, "DB_HOST", environ.get("DB_HOST"))
fileConfig(config.config_file_name)
target_metadata = [users.metadata, posts.metadata]
Hier greifen wir die Werte von DB_USER, DB_PASS, DB_NAME und DB_HOST aus Umgebungsvariablen ab. Darüber hinaus gibt die Datei env.py die Metadaten unserer Datenbank im Attribut target_metadata an , ohne die Alembic nicht bestimmen kann, welche Änderungen in der Datenbank vorgenommen werden müssen.
Alles ist fertig und wir können Migrationen generieren und die Datenbank aktualisieren:
$ alembic revision --autogenerate -m "Added required tables"
$ alembic upgrade head
Wir starten die Anwendung und verbinden die Datenbank
Erstellen wir eine main.py- Datei :
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World"}
Starten Sie die Anwendung, indem Sie den folgenden Befehl ausführen:
$ uvicorn main:app --reload
Stellen wir sicher, dass alles so funktioniert, wie es sollte. Öffnen Sie http://127.0.0.1:8000/ im Browser und sehen Sie
{"Hello": "World"}
Um eine Verbindung zur Datenbank herzustellen, verwenden wir das Datenbankmodul , mit dem wir Abfragen asynchron ausführen können.
Lassen Sie uns die Start- und Shutdhown- Ereignisse unseres Dienstes konfigurieren , bei denen die Verbindung und Trennung von der Datenbank erfolgt. Bearbeiten wir die Datei main.py :
from os import environ
import databases
#
DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")
DB_NAME = "async-blogs"
SQLALCHEMY_DATABASE_URL = (
f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
)
# database,
database = databases.Database(SQLALCHEMY_DATABASE_URL)
app = FastAPI()
@app.on_event("startup")
async def startup():
#
await database.connect()
@app.on_event("shutdown")
async def shutdown():
#
await database.disconnect()
@app.get("/")
async def read_root():
# ,
query = (
select(
[
posts_table.c.id,
posts_table.c.created_at,
posts_table.c.title,
posts_table.c.content,
posts_table.c.user_id,
users_table.c.name.label("user_name"),
]
)
.select_from(posts_table.join(users_table))
.order_by(desc(posts_table.c.created_at))
)
return await database.fetch_all(query)
Wir öffnen http://127.0.0.1:8000/ und wenn wir in der Antwort eine leere Liste [] sehen , ist alles gut gelaufen und wir können weitermachen.
Anforderungs- und Antwortvalidierung
Wir werden die Möglichkeit der Benutzerregistrierung implementieren. Dazu müssen wir HTTP-Anforderungen und -Antworten validieren. Um dieses Problem zu lösen, verwenden wir die pydantic Bibliothek :
pip install pydantic
Erstellen Sie eine Datei schemas / users.py und fügen Sie ein Modell hinzu, das für die Validierung des Anforderungshauptteils verantwortlich ist:
from pydantic import BaseModel, EmailStr
class UserCreate(BaseModel):
""" sign-up """
email: EmailStr
name: str
password: str
Beachten Sie, dass Feldtypen mithilfe von Typanmerkungen definiert werden. Zusätzlich zu den integrierten Datentypen wie int und str bietet pydantic eine große Anzahl von Typen, die eine zusätzliche Validierung ermöglichen. Beispielsweise überprüft der EmailStr- Typ , ob der empfangene Wert eine gültige E-Mail ist. Um den EmailStr- Typ zu verwenden, müssen Sie das E-Mail- Validierungsmodul installieren :
pip install email-validator
Der Hauptteil der Antwort sollte seine eigenen spezifischen Felder enthalten, z. B. id und access_token. Fügen wir also Modelle hinzu , die für die Generierung der Antwort in der Datei schemas / users.py verantwortlich sind :
from typing import Optional
from pydantic import UUID4, BaseModel, EmailStr, Field, validator
class UserCreate(BaseModel):
""" sign-up """
email: EmailStr
name: str
password: str
class UserBase(BaseModel):
""" """
id: int
email: EmailStr
name: str
class TokenBase(BaseModel):
token: UUID4 = Field(..., alias="access_token")
expires: datetime
token_type: Optional[str] = "bearer"
class Config:
allow_population_by_field_name = True
@validator("token")
def hexlify_token(cls, value):
""" UUID hex """
return value.hex
class User(UserBase):
""" """
token: TokenBase = {}
Für jedes Feld im Modell können Sie einen benutzerdefinierten Validator schreiben . Beispielsweise konvertiert hexlify_token den UUID-Wert in eine Hex-Zeichenfolge. Beachten Sie, dass Sie die Field- Klasse verwenden können, wenn Sie das Standardverhalten eines Modellfelds überschreiben müssen. Zum Beispiel Token: UUID4 = Field (..., alias = "access_token") setzt die access_token alias für das Token - Feld . Um anzuzeigen, dass das Feld erforderlich ist, wird als erster Parameter ein spezieller Wert - ... ( Auslassungspunkte ) übergeben .
Fügen Sie die Datei utils / users.py hinzu , in der die Methoden erstellt werden, die zum Schreiben eines Benutzers in die Datenbank erforderlich sind:
import hashlib
import random
import string
from datetime import datetime, timedelta
from sqlalchemy import and_
from app.models.database import database
from app.models.users import tokens_table, users_table
from app.schemas import users as user_schema
def get_random_string(length=12):
""" , """
return "".join(random.choice(string.ascii_letters) for _ in range(length))
def hash_password(password: str, salt: str = None):
""" """
if salt is None:
salt = get_random_string()
enc = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000)
return enc.hex()
def validate_password(password: str, hashed_password: str):
""" , """
salt, hashed = hashed_password.split("$")
return hash_password(password, salt) == hashed
async def get_user_by_email(email: str):
""" """
query = users_table.select().where(users_table.c.email == email)
return await database.fetch_one(query)
async def get_user_by_token(token: str):
""" """
query = tokens_table.join(users_table).select().where(
and_(
tokens_table.c.token == token,
tokens_table.c.expires > datetime.now()
)
)
return await database.fetch_one(query)
async def create_user_token(user_id: int):
""" user_id """
query = (
tokens_table.insert()
.values(expires=datetime.now() + timedelta(weeks=2), user_id=user_id)
.returning(tokens_table.c.token, tokens_table.c.expires)
)
return await database.fetch_one(query)
async def create_user(user: user_schema.UserCreate):
""" """
salt = get_random_string()
hashed_password = hash_password(user.password, salt)
query = users_table.insert().values(
email=user.email, name=user.name, hashed_password=f"{salt}${hashed_password}"
)
user_id = await database.execute(query)
token = await create_user_token(user_id)
token_dict = {"token": token["token"], "expires": token["expires"]}
return {**user.dict(), "id": user_id, "is_active": True, "token": token_dict}
Lassen Sie sich eine Datei Router / users.py und fügen Sie eine Anmelde- Route, was darauf hinweist , dass es ein erwartet AngelegtVon Modell in der Anforderung und gibt ein Benutzermodell :
from fastapi import APIRouter
from app.schemas import users
from app.utils import users as users_utils
router = APIRouter()
@router.post("/sign-up", response_model=users.User)
async def create_user(user: users.UserCreate):
db_user = await users_utils.get_user_by_email(email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return await users_utils.create_user(user=user)
Es bleibt nur die Verbindung der Routen aus der Datei routers / users.py . Fügen Sie dazu die folgenden Zeilen zu main.py hinzu :
from app.routers import users
app.include_router(users.router)
Authentifizierung und Zugriffskontrolle
Nachdem wir Benutzer in unserer Datenbank haben, können wir die Anwendungsauthentifizierung einrichten. Fügen wir einen Endpunkt hinzu, der einen Benutzernamen und ein Kennwort verwendet und ein Token zurückgibt. Aktualisieren Sie die Datei routers / users.py, um Folgendes hinzuzufügen :
from fastapi import Depends
from fastapi.security import OAuth2PasswordRequestForm
@router.post("/auth", response_model=users.TokenBase)
async def auth(form_data: OAuth2PasswordRequestForm = Depends()):
user = await users_utils.get_user_by_email(email=form_data.username)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
if not users_utils.validate_password(
password=form_data.password, hashed_password=user["hashed_password"]
):
raise HTTPException(status_code=400, detail="Incorrect email or password")
return await users_utils.create_user_token(user_id=user["id"])
Zur gleichen Zeit haben wir nicht die Anforderung Modell selbst beschreiben müssen, bietet Fastapi eine besondere Abhängigkeitsklasse OAuth2PasswordRequestForm , die die Strecke erwarten zwei Felder Benutzername und Passwort macht.
Um den Zugriff auf bestimmte Routen für nicht authentifizierte Benutzer einzuschränken, schreiben wir eine Abhängigkeitsmethode. Es wird überprüft, ob das bereitgestellte Token dem aktiven Benutzer gehört, und die Benutzerdetails zurückgegeben. Auf diese Weise können wir Benutzerinformationen auf allen Routen verwenden, für die eine Authentifizierung erforderlich ist. Erstellen wir eine Datei utils / dependecies.py :
from app.utils import users as users_utils
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth")
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = await users_utils.get_user_by_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not user["is_active"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"
)
return user
Bitte beachten Sie, dass eine Abhängigkeit wiederum von einer anderen Abhängigkeit abhängen kann. Beispielsweise ist OAuth2PasswordBearer eine Abhängigkeit, die FastAPI klar macht, dass für die aktuelle Route eine Authentifizierung erforderlich ist.
Um zu überprüfen, ob alles wie erwartet funktioniert, fügen Sie die Route / users / me hinzu , die die Details des aktuellen Benutzers zurückgibt. Fügen Sie die Zeilen zu routers / users.py hinzu :
from app.utils.dependencies import get_current_user
@router.get("/users/me", response_model=users.UserBase)
async def read_users_me(current_user: users.User = Depends(get_current_user)):
return current_user
Jetzt haben wir die Route / users / me , auf die nur authentifizierte Benutzer Zugriff haben.
Alles ist bereit, um den Benutzern endlich die Möglichkeit zu geben, Veröffentlichungen zu erstellen und zu bearbeiten:
utils / posts.py
from datetime import datetime
from app.models.database import database
from app.models.posts import posts_table
from app.models.users import users_table
from app.schemas import posts as post_schema
from sqlalchemy import desc, func, select
async def create_post(post: post_schema.PostModel, user):
query = (
posts_table.insert()
.values(
title=post.title,
content=post.content,
created_at=datetime.now(),
user_id=user["id"],
)
.returning(
posts_table.c.id,
posts_table.c.title,
posts_table.c.content,
posts_table.c.created_at,
)
)
post = await database.fetch_one(query)
# Convert to dict and add user_name key to it
post = dict(zip(post, post.values()))
post["user_name"] = user["name"]
return post
async def get_post(post_id: int):
query = (
select(
[
posts_table.c.id,
posts_table.c.created_at,
posts_table.c.title,
posts_table.c.content,
posts_table.c.user_id,
users_table.c.name.label("user_name"),
]
)
.select_from(posts_table.join(users_table))
.where(posts_table.c.id == post_id)
)
return await database.fetch_one(query)
async def get_posts(page: int):
max_per_page = 10
offset1 = (page - 1) * max_per_page
query = (
select(
[
posts_table.c.id,
posts_table.c.created_at,
posts_table.c.title,
posts_table.c.content,
posts_table.c.user_id,
users_table.c.name.label("user_name"),
]
)
.select_from(posts_table.join(users_table))
.order_by(desc(posts_table.c.created_at))
.limit(max_per_page)
.offset(offset1)
)
return await database.fetch_all(query)
async def get_posts_count():
query = select([func.count()]).select_from(posts_table)
return await database.fetch_val(query)
async def update_post(post_id: int, post: post_schema.PostModel):
query = (
posts_table.update()
.where(posts_table.c.id == post_id)
.values(title=post.title, content=post.content)
)
return await database.execute(query)
router / posts.py
from app.schemas.posts import PostDetailsModel, PostModel
from app.schemas.users import User
from app.utils import posts as post_utils
from app.utils.dependencies import get_current_user
from fastapi import APIRouter, Depends, HTTPException, status
router = APIRouter()
@router.post("/posts", response_model=PostDetailsModel, status_code=201)
async def create_post(post: PostModel, current_user: User = Depends(get_current_user)):
post = await post_utils.create_post(post, current_user)
return post
@router.get("/posts")
async def get_posts(page: int = 1):
total_cout = await post_utils.get_posts_count()
posts = await post_utils.get_posts(page)
return {"total_count": total_cout, "results": posts}
@router.get("/posts/{post_id}", response_model=PostDetailsModel)
async def get_post(post_id: int):
return await post_utils.get_post(post_id)
@router.put("/posts/{post_id}", response_model=PostDetailsModel)
async def update_post(
post_id: int, post_data: PostModel, current_user=Depends(get_current_user)
):
post = await post_utils.get_post(post_id)
if post["user_id"] != current_user["id"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to modify this post",
)
await post_utils.update_post(post_id=post_id, post=post_data)
return await post_utils.get_post(post_id)
Verbinden wir neue Routen, indem wir sie zu main.py hinzufügen
from app.routers import posts
app.include_router(posts.router)
Testen
Wir werden Tests in pytest schreiben :
$ pip install pytest
Zum Testen von Endpunkten bietet FastAPI ein spezielles Tool TestClient .
Schreiben wir einen Endpunkttest, für den keine Datenbankverbindung erforderlich ist:
from app.main import app
from fastapi.testclient import TestClient
client = TestClient(app)
def test_health_check():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"Hello": "World"}
Wie Sie sehen können, ist alles ganz einfach. Sie müssen TestClient initialisieren und zum Testen von HTTP-Anforderungen verwenden.
Um die restlichen Endpunkte zu testen, müssen Sie eine Testdatenbank erstellen. Lassen Sie uns die Datei main.py bearbeiten und die Testbasis- Konfiguration hinzufügen:
from os import environ
import databases
DB_USER = environ.get("DB_USER", "user")
DB_PASSWORD = environ.get("DB_PASSWORD", "password")
DB_HOST = environ.get("DB_HOST", "localhost")
TESTING = environ.get("TESTING")
if TESTING:
#
DB_NAME = "async-blogs-temp-for-test"
TEST_SQLALCHEMY_DATABASE_URL = (
f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
)
database = databases.Database(TEST_SQLALCHEMY_DATABASE_URL)
else:
DB_NAME = "async-blogs"
SQLALCHEMY_DATABASE_URL = (
f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"
)
database = databases.Database(SQLALCHEMY_DATABASE_URL)
Wir verwenden weiterhin die Datenbank "Async-Blogs" für unsere Anwendung. Wenn jedoch die Umgebungsvariable TESTING festgelegt ist, wird die Datenbank "Async-Blogs-Temp-for-Test" verwendet . Erstellen Sie ein Fixture in der Datei tests / conftest.py , um die
Datenbank "async-blog-temp-for-test" beim Ausführen von Tests automatisch zu erstellen und nach dem Ausführen zu löschen :
import os
import pytest
# `os.environ`,
os.environ['TESTING'] = 'True'
from alembic import command
from alembic.config import Config
from app.models import database
from sqlalchemy_utils import create_database, drop_database
@pytest.fixture(scope="module")
def temp_db():
create_database(database.TEST_SQLALCHEMY_DATABASE_URL) #
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
alembic_cfg = Config(os.path.join(base_dir, "alembic.ini")) # alembic
command.upgrade(alembic_cfg, "head") #
try:
yield database.TEST_SQLALCHEMY_DATABASE_URL
finally:
drop_database(database.TEST_SQLALCHEMY_DATABASE_URL) #
Zum Erstellen und Löschen der Datenbank verwenden wir die Bibliothek sqlalchemy_utils .
Mit dem temp_db- Fixture in unseren Tests können wir alle Endpunkte unserer Anwendung testen:
def test_sign_up(temp_db):
request_data = {
"email": "vader@deathstar.com",
"name": "Darth Vader",
"password": "rainbow"
}
with TestClient(app) as client:
response = client.post("/sign-up", json=request_data)
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["email"] == "vader@deathstar.com"
assert response.json()["name"] == "Darth"
assert response.json()["token"]["expires"] is not None
assert response.json()["token"]["access_token"] is not None
tests / test_posts.py
import asyncio
from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient
def test_create_post(temp_db):
user = UserCreate(
email="vader@deathstar.com",
name="Darth",
password="rainbow"
)
request_data = {
"title": "42",
"content": "Don't panic!"
}
with TestClient(app) as client:
# Create user and use his token to add new post
loop = asyncio.get_event_loop()
user_db = loop.run_until_complete(create_user(user))
response = client.post(
"/posts",
json=request_data,
headers={"Authorization": f"Bearer {user_db['token']['token']}"}
)
assert response.status_code == 201
assert response.json()["id"] == 1
assert response.json()["title"] == "42"
assert response.json()["content"] == "Don't panic!"
def test_create_post_forbidden_without_token(temp_db):
request_data = {
"title": "42",
"content": "Don't panic!"
}
with TestClient(app) as client:
response = client.post("/posts", json=request_data)
assert response.status_code == 401
def test_posts_list(temp_db):
with TestClient(app) as client:
response = client.get("/posts")
assert response.status_code == 200
assert response.json()["total_count"] == 1
assert response.json()["results"][0]["id"] == 1
assert response.json()["results"][0]["title"] == "42"
assert response.json()["results"][0]["content"] == "Don't panic!"
def test_post_detail(temp_db):
post_id = 1
with TestClient(app) as client:
response = client.get(f"/posts/{post_id}")
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["title"] == "42"
assert response.json()["content"] == "Don't panic!"
def test_update_post(temp_db):
post_id = 1
request_data = {
"title": "42",
"content": "Life? Don't talk to me about life."
}
with TestClient(app) as client:
# Create user token to add new post
loop = asyncio.get_event_loop()
token = loop.run_until_complete(create_user_token(user_id=1))
response = client.put(
f"/posts/{post_id}",
json=request_data,
headers={"Authorization": f"Bearer {token['token']}"}
)
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["title"] == "42"
assert response.json()["content"] == "Life? Don't talk to me about life."
def test_update_post_forbidden_without_token(temp_db):
post_id = 1
request_data = {
"title": "42",
"content": "Life? Don't talk to me about life."
}
with TestClient(app) as client:
response = client.put(f"/posts/{post_id}", json=request_data)
assert response.status_code == 401
tests / test_users.py
import asyncio
import pytest
from app.main import app
from app.schemas.users import UserCreate
from app.utils.users import create_user, create_user_token
from fastapi.testclient import TestClient
def test_sign_up(temp_db):
request_data = {
"email": "vader@deathstar.com",
"name": "Darth",
"password": "rainbow"
}
with TestClient(app) as client:
response = client.post("/sign-up", json=request_data)
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["email"] == "vader@deathstar.com"
assert response.json()["name"] == "Darth"
assert response.json()["token"]["expires"] is not None
assert response.json()["token"]["token"] is not None
def test_login(temp_db):
request_data = {"username": "vader@deathstar.com", "password": "rainbow"}
with TestClient(app) as client:
response = client.post("/auth", data=request_data)
assert response.status_code == 200
assert response.json()["token_type"] == "bearer"
assert response.json()["expires"] is not None
assert response.json()["access_token"] is not None
def test_login_with_invalid_password(temp_db):
request_data = {"username": "vader@deathstar.com", "password": "unicorn"}
with TestClient(app) as client:
response = client.post("/auth", data=request_data)
assert response.status_code == 400
assert response.json()["detail"] == "Incorrect email or password"
def test_user_detail(temp_db):
with TestClient(app) as client:
# Create user token to see user info
loop = asyncio.get_event_loop()
token = loop.run_until_complete(create_user_token(user_id=1))
response = client.get(
"/users/me",
headers={"Authorization": f"Bearer {token['token']}"}
)
assert response.status_code == 200
assert response.json()["id"] == 1
assert response.json()["email"] == "vader@deathstar.com"
assert response.json()["name"] == "Darth"
def test_user_detail_forbidden_without_token(temp_db):
with TestClient(app) as client:
response = client.get("/users/me")
assert response.status_code == 401
@pytest.mark.freeze_time("2015-10-21")
def test_user_detail_forbidden_with_expired_token(temp_db, freezer):
user = UserCreate(
email="sidious@deathstar.com",
name="Palpatine",
password="unicorn"
)
with TestClient(app) as client:
# Create user and use expired token
loop = asyncio.get_event_loop()
user_db = loop.run_until_complete(create_user(user))
freezer.move_to("'2015-11-10'")
response = client.get(
"/users/me",
headers={"Authorization": f"Bearer {user_db['token']['token']}"}
)
assert response.status_code == 401
PS-Quellen
Das ist alles, das Quell-Repository aus dem Beitrag kann auf GitHub angezeigt werden .