Authentication¶
Django Ninja Aio CRUD provides built-in async JWT authentication support with flexible configuration and easy integration with your API endpoints.
Overview¶
Authentication in Django Ninja Aio CRUD: - Fully Async - No blocking operations - JWT-Based - Industry-standard JSON Web Tokens - Type-Safe - Proper type hints and validation - Flexible - Per-endpoint or global authentication - Customizable - Override default behavior - RSA/HMAC Support - Multiple signing algorithms
Quick Start¶
1. Create Authentication Class¶
# auth.py
from ninja_aio.auth import AsyncJwtBearer
from joserfc import jwk
PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA...
-----END PUBLIC KEY-----"""
class JWTAuth(AsyncJwtBearer):
jwt_public = jwk.RSAKey.import_key(PUBLIC_KEY)
claims = {
"iss": {"essential": True, "value": "https://your-issuer.com"},
"aud": {"essential": True, "value": "your-api"}
}
async def auth_handler(self, request):
user_id = self.dcd.claims.get("sub")
user = await User.objects.aget(id=user_id)
return user
2. Apply to ViewSet¶
# views.py
from ninja_aio import NinjaAIO
from ninja_aio.views import APIViewSet
from .models import Article
from .auth import JWTAuth
api = NinjaAIO()
class ArticleViewSet(APIViewSet):
model = Article
api = api
auth = [JWTAuth()] # Apply to all endpoints
ArticleViewSet().add_views_to_route()
3. Make Authenticated Request¶
AsyncJwtBearer¶
Base class for JWT authentication.
Class Definition¶
from ninja_aio.auth import AsyncJwtBearer
from joserfc import jwk
class MyAuth(AsyncJwtBearer):
jwt_public: jwk.RSAKey | jwk.OctKey # Public key for verification
jwt_alg: str = "RS256" # Signing algorithm
claims: dict = {} # Required claims
async def auth_handler(self, request):
# Return user object or custom auth context
pass
Required Attributes¶
jwt_public¶
Public key for JWT verification.
RSA Key (Recommended):
from joserfc import jwk
PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA...
-----END PUBLIC KEY-----"""
class JWTAuth(AsyncJwtBearer):
jwt_public = jwk.RSAKey.import_key(PUBLIC_KEY)
jwt_alg = "RS256"
HMAC Key (Shared Secret):
from joserfc import jwk
SECRET = "your-secret-key"
class JWTAuth(AsyncJwtBearer):
jwt_public = jwk.OctKey.import_key(SECRET)
jwt_alg = "HS256"
From JWK (JSON Web Key):
import json
from joserfc import jwk
jwk_data = {
"kty": "RSA",
"n": "xGOr-H7A-PWgPZ...",
"e": "AQAB",
"alg": "RS256",
"use": "sig"
}
class JWTAuth(AsyncJwtBearer):
jwt_public = jwk.RSAKey.import_key(jwk_data)
jwt_alg¶
JWT signing algorithm (optional, default: "RS256").
Supported Algorithms:
| Algorithm | Type | Description |
|---|---|---|
RS256 |
RSA | RSA Signature with SHA-256 (recommended) |
RS384 |
RSA | RSA Signature with SHA-384 |
RS512 |
RSA | RSA Signature with SHA-512 |
HS256 |
HMAC | HMAC with SHA-256 |
HS384 |
HMAC | HMAC with SHA-384 |
HS512 |
HMAC | HMAC with SHA-512 |
ES256 |
ECDSA | ECDSA with SHA-256 |
ES384 |
ECDSA | ECDSA with SHA-384 |
ES512 |
ECDSA | ECDSA with SHA-512 |
class JWTAuth(AsyncJwtBearer):
jwt_public = jwk.RSAKey.import_key(PUBLIC_KEY)
jwt_alg = "RS512" # Use RS512 instead of default RS256
claims¶
Dictionary of required JWT claims for validation.
Claim Options:
| Key | Type | Description |
|---|---|---|
essential |
bool |
Claim must be present |
value |
Any |
Exact value required |
values |
list |
One of the values required |
Examples:
class JWTAuth(AsyncJwtBearer):
jwt_public = jwk.RSAKey.import_key(PUBLIC_KEY)
claims = {
# Issuer must be exact match
"iss": {
"essential": True,
"value": "https://auth.example.com"
},
# Audience must be one of these
"aud": {
"essential": True,
"values": ["api-prod", "api-staging"]
},
# Subject must be present (any value)
"sub": {
"essential": True
},
# Optional claim with default
"scope": {
"essential": False,
"value": "read"
}
}
Required Methods¶
auth_handler()¶
Process authenticated request and return user/auth context.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
request |
HttpRequest |
Django HTTP request |
Return Value:
Object attached to request.auth (typically User instance).
Access to JWT Data:
self.dcd- Decoded JWT claimsself.dcd.claims- Claims dictionaryself.dcd.header- JWT header
Examples:
Return User Object:
async def auth_handler(self, request):
user_id = self.dcd.claims.get("sub")
user = await User.objects.aget(id=user_id)
return user
# In view
async def my_view(request):
user = request.auth # User instance
print(user.username)
Return Custom Context:
async def auth_handler(self, request):
return {
"user_id": self.dcd.claims.get("sub"),
"email": self.dcd.claims.get("email"),
"roles": self.dcd.claims.get("roles", []),
"scopes": self.dcd.claims.get("scope", "").split()
}
# In view
async def my_view(request):
context = request.auth
print(context["user_id"])
print(context["roles"])
With Additional Validation:
async def auth_handler(self, request):
user_id = self.dcd.claims.get("sub")
# Check if user exists and is active
try:
user = await User.objects.aget(id=user_id, is_active=True)
except User.DoesNotExist:
return False
# Check subscription status
if not await user.has_active_subscription():
return False
return user
With Caching:
from django.core.cache import cache
async def auth_handler(self, request):
user_id = self.dcd.claims.get("sub")
# Try cache first
cache_key = f"user:{user_id}"
user = cache.get(cache_key)
if user is None:
user = await User.objects.aget(id=user_id)
cache.set(cache_key, user, 300) # Cache 5 minutes
return user
Authentication Levels¶
Global Authentication¶
Apply authentication to all endpoints in a ViewSet:
class ArticleViewSet(APIViewSet):
model = Article
api = api
auth = [JWTAuth()] # All endpoints require auth
Generated Endpoints:
| Method | Endpoint | Auth Required |
|---|---|---|
| GET | /article/ |
✓ |
| POST | /article/ |
✓ |
| GET | /article/{id} |
✓ |
| PATCH | /article/{id}/ |
✓ |
| DELETE | /article/{id}/ |
✓ |
Per-Method Authentication¶
Apply authentication to specific HTTP methods:
class ArticleViewSet(APIViewSet):
model = Article
api = api
get_auth = None # GET endpoints public
post_auth = [JWTAuth()] # POST requires auth
patch_auth = [JWTAuth()] # PATCH requires auth
delete_auth = [JWTAuth()] # DELETE requires auth
Generated Endpoints:
| Method | Endpoint | Auth Required |
|---|---|---|
| GET | /article/ |
✗ (public) |
| POST | /article/ |
✓ |
| GET | /article/{id} |
✗ (public) |
| PATCH | /article/{id}/ |
✓ |
| DELETE | /article/{id}/ |
✓ |
Custom View Authentication¶
Apply authentication to custom views:
class ArticleViewSet(APIViewSet):
model = Article
api = api
auth = None # CRUD endpoints public
def views(self):
# Authenticated custom endpoint
@self.router.post("/publish/{pk}/", auth=JWTAuth())
async def publish(request, pk: int):
article = await Article.objects.aget(pk=pk)
article.is_published = True
await article.asave()
return {"message": "Article published"}
# Public custom endpoint
@self.router.get("/stats/")
async def stats(request):
total = await Article.objects.acount()
return {"total": total}
Mixed Authentication¶
Combine different authentication strategies:
class AdminAuth(AsyncJwtBearer):
jwt_public = jwk.RSAKey.import_key(PUBLIC_KEY)
async def auth_handler(self, request):
user_id = self.dcd.claims.get("sub")
user = await User.objects.aget(id=user_id)
if not user.is_staff:
return False
return user
class ArticleViewSet(APIViewSet):
model = Article
api = api
get_auth = None # Public read
post_auth = [JWTAuth()] # Regular user can create
patch_auth = [JWTAuth()] # Regular user can edit own
delete_auth = [AdminAuth()] # Only admin can delete
Advanced Usage¶
Role-Based Access Control (RBAC)¶
class RoleAuth(AsyncJwtBearer):
jwt_public = jwk.RSAKey.import_key(PUBLIC_KEY)
required_roles: list[str] = []
async def auth_handler(self, request):
user_id = self.dcd.claims.get("sub")
user = await User.objects.aget(id=user_id)
# Check roles
user_roles = self.dcd.claims.get("roles", [])
if self.required_roles:
if not any(role in user_roles for role in self.required_roles):
return False
request.user_roles = user_roles
return user
class AdminAuth(RoleAuth):
required_roles = ["admin"]
class EditorAuth(RoleAuth):
required_roles = ["editor", "admin"]
class ArticleViewSet(APIViewSet):
model = Article
api = api
get_auth = None
post_auth = [EditorAuth()] # Editors and admins
delete_auth = [AdminAuth()] # Only admins
Permission-Based Access¶
class PermissionAuth(AsyncJwtBearer):
jwt_public = jwk.RSAKey.import_key(PUBLIC_KEY)
required_permissions: list[str] = []
async def auth_handler(self, request):
user_id = self.dcd.claims.get("sub")
user = await User.objects.select_related('role').aget(id=user_id)
# Get user permissions
permissions = await sync_to_async(list)(
user.role.permissions.values_list('code', flat=True)
)
# Check permissions
if self.required_permissions:
missing = set(self.required_permissions) - set(permissions)
if missing:
return False
request.permissions = permissions
return user
class ArticleCreateAuth(PermissionAuth):
required_permissions = ["article.create"]
class ArticleDeleteAuth(PermissionAuth):
required_permissions = ["article.delete"]
class ArticleViewSet(APIViewSet):
model = Article
api = api
post_auth = [ArticleCreateAuth()]
delete_auth = [ArticleDeleteAuth()]
Tenant/Organization Isolation¶
class TenantAuth(AsyncJwtBearer):
jwt_public = jwk.RSAKey.import_key(PUBLIC_KEY)
async def auth_handler(self, request):
user_id = self.dcd.claims.get("sub")
tenant_id = self.dcd.claims.get("tenant_id")
if not tenant_id:
return False
user = await User.objects.aget(
id=user_id,
tenant_id=tenant_id,
is_active=True
)
request.tenant_id = tenant_id
return user
class Article(ModelSerializer):
title = models.CharField(max_length=200)
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE)
@classmethod
async def queryset_request(cls, request):
# Automatically filter by tenant
qs = cls.objects.all()
if hasattr(request, 'tenant_id'):
qs = qs.filter(tenant_id=request.tenant_id)
return qs
class ArticleViewSet(APIViewSet):
model = Article
api = api
auth = [TenantAuth()]
Scope-Based Access¶
class ScopeAuth(AsyncJwtBearer):
jwt_public = jwk.RSAKey.import_key(PUBLIC_KEY)
required_scopes: list[str] = []
async def auth_handler(self, request):
# Get scopes from token
scope_str = self.dcd.claims.get("scope", "")
scopes = scope_str.split()
# Check required scopes
if self.required_scopes:
missing = set(self.required_scopes) - set(scopes)
if missing:
return False
user_id = self.dcd.claims.get("sub")
user = await User.objects.aget(id=user_id)
request.scopes = scopes
return user
class ReadAuth(ScopeAuth):
required_scopes = ["read"]
class WriteAuth(ScopeAuth):
required_scopes = ["write"]
class ArticleViewSet(APIViewSet):
model = Article
api = api
get_auth = [ReadAuth()] # read scope
post_auth = [WriteAuth()] # write scope
patch_auth = [WriteAuth()] # write scope
delete_auth = [WriteAuth()] # write scope
API Key Authentication¶
For machine-to-machine communication:
from ninja.security import APIKeyHeader
class APIKeyAuth(APIKeyHeader):
param_name = "X-API-Key"
async def authenticate(self, request, key):
try:
api_key = await APIKey.objects.select_related('user').aget(
key=key,
is_active=True
)
# Check expiration
if api_key.expires_at and api_key.expires_at < timezone.now():
return None
# Update last used
api_key.last_used_at = timezone.now()
await api_key.asave(update_fields=['last_used_at'])
return api_key.user
except APIKey.DoesNotExist:
return None
class ArticleViewSet(APIViewSet):
model = Article
api = api
auth = [APIKeyAuth()]
Usage:
Multiple Authentication Methods¶
Support both JWT and API Key:
class ArticleViewSet(APIViewSet):
model = Article
api = api
auth = [JWTAuth() | APIKeyAuth()] # Either JWT or API Key
Django Ninja will try both methods; if either succeeds, the request is authenticated.
Token Validation¶
Expiration Validation¶
JWT tokens include exp claim (expiration timestamp):
AsyncJwtBearer automatically validates expiration:
class JWTAuth(AsyncJwtBearer):
jwt_public = jwk.RSAKey.import_key(PUBLIC_KEY)
# Expiration checked automatically
Error Response:
Not Before Validation¶
Use nbf claim for tokens that become valid in the future:
# Token payload
{
"sub": "123",
"nbf": 1704063600, # Not valid before this time
"exp": 1704067200
}
Automatically validated by AsyncJwtBearer.
Custom Validation¶
class StrictAuth(AsyncJwtBearer):
jwt_public = jwk.RSAKey.import_key(PUBLIC_KEY)
async def auth_handler(self, request):
# Check token type
token_type = self.dcd.claims.get("typ")
if token_type != "access":
return False
# Check IP whitelist
allowed_ips = self.dcd.claims.get("allowed_ips", [])
client_ip = request.META.get('REMOTE_ADDR')
if allowed_ips and client_ip not in allowed_ips:
return False
# Continue with normal auth
user_id = self.dcd.claims.get("sub")
return await User.objects.aget(id=user_id)
Testing Authentication¶
Unit Tests¶
import pytest
from ninja.testing import TestAsyncClient
from myapp.views import api
from myapp.auth import JWTAuth
import jwt
from datetime import datetime, timedelta
def create_token(user_id: int, **claims) -> str:
payload = {
"sub": str(user_id),
"exp": datetime.utcnow() + timedelta(hours=1),
"iat": datetime.utcnow(),
**claims
}
return jwt.encode(payload, PRIVATE_KEY, algorithm="RS256")
@pytest.mark.asyncio
async def test_authenticated_request():
client = TestAsyncClient(api)
# Create user
user = await User.objects.acreate(username="testuser")
# Create token
token = create_token(user.id)
# Make authenticated request
response = await client.get(
"/article/",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_missing_token():
client = TestAsyncClient(api)
response = await client.get("/article/")
assert response.status_code == 401
assert "detail" in response.json()
@pytest.mark.asyncio
async def test_expired_token():
client = TestAsyncClient(api)
# Create expired token
payload = {
"sub": "123",
"exp": datetime.utcnow() - timedelta(hours=1), # Expired
"iat": datetime.utcnow() - timedelta(hours=2)
}
token = jwt.encode(payload, PRIVATE_KEY, algorithm="RS256")
response = await client.get(
"/article/",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 401
@pytest.mark.asyncio
async def test_invalid_signature():
client = TestAsyncClient(api)
# Create token with wrong key
payload = {"sub": "123", "exp": datetime.utcnow() + timedelta(hours=1)}
token = jwt.encode(payload, "wrong-secret", algorithm="HS256")
response = await client.get(
"/article/",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 401
Mock Authentication¶
For testing without real tokens:
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
@patch('myapp.auth.JWTAuth.auth_handler')
async def test_with_mock_auth(mock_auth):
# Mock auth to return test user
user = await User.objects.acreate(username="testuser")
mock_auth.return_value = user
client = TestAsyncClient(api)
response = await client.get("/article/")
assert response.status_code == 200
Best Practices¶
-
Use RSA (asymmetric) keys for production:
-
Validate essential claims:
-
Keep tokens short-lived:
-
Cache user objects:
-
Log authentication failures:
-
Use different auth for different operations:
-
Implement rate limiting for auth endpoints:
Integration Examples¶
With Auth0¶
import httpx
from joserfc import jwk
class Auth0JWT(AsyncJwtBearer):
jwt_alg = "RS256"
def __init__(self):
super().__init__()
# Fetch JWKS from Auth0
self.domain = "your-domain.auth0.com"
self.audience = "your-api-identifier"
async def get_jwks(self):
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://{self.domain}/.well-known/jwks.json"
)
return response.json()
claims = {
"iss": {"essential": True, "value": "https://your-domain.auth0.com/"},
"aud": {"essential": True, "value": "your-api-identifier"}
}
async def auth_handler(self, request):
user_id = self.dcd.claims.get("sub")
# Extract user info from token or fetch from database
return {"user_id": user_id, "email": self.dcd.claims.get("email")}
With Keycloak¶
class KeycloakJWT(AsyncJwtBearer):
jwt_alg = "RS256"
def __init__(self):
super().__init__()
self.realm_url = "https://keycloak.example.com/realms/your-realm"
async def get_public_key(self):
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.realm_url}")
data = response.json()
return jwk.RSAKey.import_key(data["public_key"])
claims = {
"iss": {"essential": True, "value": "https://keycloak.example.com/realms/your-realm"},
"azp": {"essential": True, "value": "your-client-id"}
}
async def auth_handler(self, request):
user_id = self.dcd.claims.get("sub")
roles = self.dcd.claims.get("realm_access", {}).get("roles", [])
return {
"user_id": user_id,
"roles": roles,
"email": self.dcd.claims.get("email")
}
With Firebase¶
import google.auth.transport.requests
from google.oauth2 import id_token
class FirebaseAuth(HttpBearer):
def __init__(self):
self.project_id = "your-firebase-project"
async def authenticate(self, request, token):
try:
# Verify Firebase ID token
decoded_token = id_token.verify_firebase_token(
token,
google.auth.transport.requests.Request(),
audience=self.project_id
)
user_id = decoded_token["uid"]
user = await User.objects.aget(firebase_uid=user_id)
return user
except Exception as e:
return None
See Also¶
- API ViewSet - Applying auth to ViewSets
- Tutorial: Authentication - Step-by-step guide
- Model Serializer - Filtering by authenticated user