Step 3: Add Authentication¶
In this step, you'll learn how to secure your API with JWT authentication and implement role-based access control.
What You'll Learn¶
- Setting up JWT authentication
 - Protecting endpoints
 - Implementing role-based access
 - Creating login/register endpoints
 - Testing authenticated requests
 
Prerequisites¶
Make sure you've completed: - Step 1: Define Your Model - Step 2: Create CRUD Views
Setting Up JWT Keys¶
Generate RSA Keys (Recommended for Production)¶
# Generate private key
openssl genrsa -out private_key.pem 2048
# Generate public key
openssl rsa -in private_key.pem -pubout -out public_key.pem
Store Keys Securely¶
# settings.py
import os
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent
# JWT Settings
JWT_PRIVATE_KEY_PATH = os.path.join(BASE_DIR, 'private_key.pem')
JWT_PUBLIC_KEY_PATH = os.path.join(BASE_DIR, 'public_key.pem')
# Read keys
with open(JWT_PUBLIC_KEY_PATH, 'r') as f:
    JWT_PUBLIC_KEY = f.read()
with open(JWT_PRIVATE_KEY_PATH, 'r') as f:
    JWT_PRIVATE_KEY = f.read()
# Token expiration (in seconds)
JWT_ACCESS_TOKEN_EXPIRE = 60 * 15  # 15 minutes
JWT_REFRESH_TOKEN_EXPIRE = 60 * 60 * 24 * 7  # 7 days
# JWT Claims
JWT_ISSUER = "https://your-api.com"
JWT_AUDIENCE = "your-api"
Security
Never commit your private key to version control! Add private_key.pem to your .gitignore.
Create User Model¶
Update your User model to work with authentication:
# models.py
from django.contrib.auth.models import AbstractUser
from django.db import models
from ninja_aio.models import ModelSerializer
class User(AbstractUser, ModelSerializer):
    email = models.EmailField(unique=True)
    bio = models.TextField(blank=True)
    avatar = models.URLField(blank=True)
    class ReadSerializer:
        fields = ["id", "username", "email", "first_name", "last_name", "bio", "avatar"]
        excludes = ["password"]
    class CreateSerializer:
        fields = ["username", "email", "password", "first_name", "last_name"]
        optionals = [("bio", str), ("avatar", str)]
    class UpdateSerializer:
        optionals = [
            ("first_name", str),
            ("last_name", str),
            ("bio", str),
            ("avatar", str),
        ]
        excludes = ["username", "email", "password"]
    def __str__(self):
        return self.username
# Update Article model to use custom User
class Article(ModelSerializer):
    # ... existing fields ...
    author = models.ForeignKey(
        "User",  # Use string reference
        on_delete=models.CASCADE,
        related_name="articles"
    )
    # ... rest of model ...
Configure Django to Use Custom User¶
Run Migrations¶
Create Authentication Class¶
# auth.py
from ninja_aio.auth import AsyncJwtBearer
from joserfc import jwk
from django.conf import settings
from .models import User
class JWTAuth(AsyncJwtBearer):
    # Import public key for verification
    jwt_public = jwk.RSAKey.import_key(settings.JWT_PUBLIC_KEY)
    jwt_alg = "RS256"
    # Validate required claims
    claims = {
        "iss": {"essential": True, "value": settings.JWT_ISSUER},
        "aud": {"essential": True, "value": settings.JWT_AUDIENCE},
        "sub": {"essential": True},  # User ID
    }
    async def auth_handler(self, request):
        """
        Called after token validation.
        Returns the user object that will be attached to request.auth
        """
        # Get user ID from token
        user_id = self.dcd.claims.get("sub")
        try:
            # Fetch user from database
            user = await User.objects.aget(id=user_id, is_active=True)
            return user
        except User.DoesNotExist:
            return False
Create Token Generation Helper¶
# utils.py
from datetime import datetime, timedelta
import jwt
from django.conf import settings
def create_access_token(user_id: int, **extra_claims) -> str:
    """Generate JWT access token"""
    now = datetime.utcnow()
    payload = {
        "sub": str(user_id),
        "iss": settings.JWT_ISSUER,
        "aud": settings.JWT_AUDIENCE,
        "iat": now,
        "exp": now + timedelta(seconds=settings.JWT_ACCESS_TOKEN_EXPIRE),
        **extra_claims
    }
    token = jwt.encode(
        payload,
        settings.JWT_PRIVATE_KEY,
        algorithm="RS256"
    )
    return token
def create_refresh_token(user_id: int) -> str:
    """Generate JWT refresh token"""
    now = datetime.utcnow()
    payload = {
        "sub": str(user_id),
        "iss": settings.JWT_ISSUER,
        "aud": settings.JWT_AUDIENCE,
        "iat": now,
        "exp": now + timedelta(seconds=settings.JWT_REFRESH_TOKEN_EXPIRE),
        "type": "refresh"
    }
    token = jwt.encode(
        payload,
        settings.JWT_PRIVATE_KEY,
        algorithm="RS256"
    )
    return token
Create Login/Register Endpoints¶
# views.py
from ninja_aio import NinjaAIO
from ninja import Schema
from ninja_aio.exceptions import SerializeError
from django.contrib.auth.hashers import make_password, check_password
from .models import User
from .utils import create_access_token, create_refresh_token
from .auth import JWTAuth
api = NinjaAIO(title="Blog API", version="1.0.0")
# Schemas for authentication
class RegisterSchema(Schema):
    username: str
    email: str
    password: str
    first_name: str = ""
    last_name: str = ""
class LoginSchema(Schema):
    username: str
    password: str
class TokenResponse(Schema):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"
    expires_in: int
class UserResponse(Schema):
    id: int
    username: str
    email: str
    first_name: str
    last_name: str
# Register endpoint
@api.post("/auth/register/", response=TokenResponse)
async def register(request, data: RegisterSchema):
    """Register a new user"""
    # Check if username exists
    if await User.objects.filter(username=data.username).aexists():
        raise SerializeError(
            {"username": "Username already taken"},
            status_code=400
        )
    # Check if email exists
    if await User.objects.filter(email=data.email).aexists():
        raise SerializeError(
            {"email": "Email already registered"},
            status_code=400
        )
    # Create user
    user = await User.objects.acreate(
        username=data.username,
        email=data.email,
        password=make_password(data.password),
        first_name=data.first_name,
        last_name=data.last_name,
    )
    # Generate tokens
    access_token = create_access_token(user.id)
    refresh_token = create_refresh_token(user.id)
    from django.conf import settings
    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "expires_in": settings.JWT_ACCESS_TOKEN_EXPIRE
    }
# Login endpoint
@api.post("/auth/login/", response=TokenResponse)
async def login(request, data: LoginSchema):
    """Login user"""
    try:
        user = await User.objects.aget(username=data.username)
    except User.DoesNotExist:
        raise SerializeError(
            {"detail": "Invalid credentials"},
            status_code=401
        )
    # Check password
    if not check_password(data.password, user.password):
        raise SerializeError(
            {"detail": "Invalid credentials"},
            status_code=401
        )
    # Check if user is active
    if not user.is_active:
        raise SerializeError(
            {"detail": "Account is disabled"},
            status_code=401
        )
    # Generate tokens
    access_token = create_access_token(user.id)
    refresh_token = create_refresh_token(user.id)
    from django.conf import settings
    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "expires_in": settings.JWT_ACCESS_TOKEN_EXPIRE
    }
# Get current user
@api.get("/auth/me/", response=UserResponse, auth=JWTAuth())
async def me(request):
    """Get current authenticated user"""
    user = request.auth
    return {
        "id": user.id,
        "username": user.username,
        "email": user.email,
        "first_name": user.first_name,
        "last_name": user.last_name,
    }
# Refresh token
@api.post("/auth/refresh/", response=TokenResponse)
async def refresh(request, refresh_token: str):
    """Refresh access token"""
    import jwt
    from django.conf import settings
    try:
        # Decode refresh token
        payload = jwt.decode(
            refresh_token,
            settings.JWT_PUBLIC_KEY,
            algorithms=["RS256"],
            audience=settings.JWT_AUDIENCE,
            issuer=settings.JWT_ISSUER
        )
        # Check token type
        if payload.get("type") != "refresh":
            raise SerializeError(
                {"detail": "Invalid token type"},
                status_code=401
            )
        user_id = int(payload.get("sub"))
        # Generate new tokens
        new_access_token = create_access_token(user_id)
        new_refresh_token = create_refresh_token(user_id)
        return {
            "access_token": new_access_token,
            "refresh_token": new_refresh_token,
            "expires_in": settings.JWT_ACCESS_TOKEN_EXPIRE
        }
    except jwt.ExpiredSignatureError:
        raise SerializeError(
            {"detail": "Refresh token expired"},
            status_code=401
        )
    except jwt.InvalidTokenError:
        raise SerializeError(
            {"detail": "Invalid refresh token"},
            status_code=401
        )
Protect Your ViewSets¶
Now let's add authentication to your CRUD endpoints:
# views.py
from ninja_aio.views import APIViewSet
from .models import Article
from .auth import JWTAuth
api = NinjaAIO(title="Blog API", version="1.0.0")
class ArticleViewSet(APIViewSet):
    model = Article
    api = api
    # Public read, authenticated write
    get_auth = None  # List and retrieve are public
    post_auth = [JWTAuth()]  # Create requires auth
    patch_auth = [JWTAuth()]  # Update requires auth
    delete_auth = [JWTAuth()]  # Delete requires auth
ArticleViewSet().add_views_to_route()
Set Author Automatically¶
Modify the Article model to set the author from the authenticated user:
# models.py
class Article(ModelSerializer):
    # ... existing fields ...
    @classmethod
    async def queryset_request(cls, request):
        """Filter articles based on authentication"""
        qs = cls.objects.select_related('author', 'category').prefetch_related('tags')
        # Show all published articles
        # Plus user's own drafts if authenticated
        if request.auth:
            from django.db.models import Q
            return qs.filter(
                Q(is_published=True) | Q(author=request.auth)
            )
        return qs.filter(is_published=True)
    async def custom_actions(self, payload: dict):
        """Set author from request"""
        # This is called during creation
        if hasattr(self, '_request') and self._request.auth:
            self.author = self._request.auth
            await self.asave(update_fields=['author'])
        # Call parent
        await super().custom_actions(payload)
Role-Based Access Control¶
Create different authentication classes for different roles:
# auth.py
from ninja_aio.auth import AsyncJwtBearer
from joserfc import jwk
from django.conf import settings
from .models import User
class JWTAuth(AsyncJwtBearer):
    """Base JWT authentication"""
    jwt_public = jwk.RSAKey.import_key(settings.JWT_PUBLIC_KEY)
    jwt_alg = "RS256"
    claims = {
        "iss": {"essential": True, "value": settings.JWT_ISSUER},
        "aud": {"essential": True, "value": settings.JWT_AUDIENCE},
        "sub": {"essential": True},
    }
    async def auth_handler(self, request):
        user_id = self.dcd.claims.get("sub")
        try:
            user = await User.objects.aget(id=user_id, is_active=True)
            return user
        except User.DoesNotExist:
            return False
class AdminAuth(JWTAuth):
    """Requires admin/staff privileges"""
    async def auth_handler(self, request):
        user = await super().auth_handler(request)
        if not user.is_staff:
            return False
        return user
class SuperuserAuth(JWTAuth):
    """Requires superuser privileges"""
    async def auth_handler(self, request):
        user = await super().auth_handler(request)
        if not user.is_superuser:
            return False
        return user
Apply Role-Based Auth¶
# views.py
from .auth import JWTAuth, AdminAuth
class ArticleViewSet(APIViewSet):
    model = Article
    api = api
    # Public read
    get_auth = None
    # Regular users can create
    post_auth = [JWTAuth()]
    # Regular users can update (own articles)
    patch_auth = [JWTAuth()]
    # Only admins can delete
    delete_auth = [AdminAuth()]
class UserViewSet(APIViewSet):
    model = User
    api = api
    # Only admins can manage users
    auth = [AdminAuth()]
ArticleViewSet().add_views_to_route()
UserViewSet().add_views_to_route()
Ownership Validation¶
Ensure users can only edit their own articles:
# views.py
class ArticleViewSet(APIViewSet):
    model = Article
    api = api
    get_auth = None
    post_auth = [JWTAuth()]
    patch_auth = [JWTAuth()]
    delete_auth = [JWTAuth()]
    def views(self):
        # Override update to check ownership
        @self.router.patch("/{pk}/")
        async def update(request, pk: int, data: Article.generate_update_s()):
            """Update article (owner or admin only)"""
            try:
                article = await Article.objects.aget(pk=pk)
            except Article.DoesNotExist:
                raise SerializeError({"article": "not found"}, status_code=404)
            # Check ownership (unless admin)
            user = request.auth
            if article.author_id != user.id and not user.is_staff:
                raise SerializeError(
                    {"detail": "You can only edit your own articles"},
                    status_code=403
                )
            # Update article
            from ninja_aio.models import ModelUtil
            util = ModelUtil(Article)
            schema = Article.generate_read_s()
            return await util.update_s(request, article, data, schema)
        # Override delete to check ownership
        @self.router.delete("/{pk}/")
        async def delete(request, pk: int):
            """Delete article (owner or admin only)"""
            try:
                article = await Article.objects.aget(pk=pk)
            except Article.DoesNotExist:
                raise SerializeError({"article": "not found"}, status_code=404)
            # Check ownership (unless admin)
            user = request.auth
            if article.author_id != user.id and not user.is_staff:
                raise SerializeError(
                    {"detail": "You can only delete your own articles"},
                    status_code=403
                )
            await article.adelete()
            return {"message": "Article deleted successfully"}
ArticleViewSet().add_views_to_route()
Testing Authentication¶
Register a User¶
curl -X POST http://localhost:8000/api/auth/register/ \
  -H "Content-Type: application/json" \
  -d '{
    "username": "johndoe",
    "email": "john@example.com",
    "password": "secure_password_123",
    "first_name": "John",
    "last_name": "Doe"
  }'
Response:
{
  "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
  "refresh_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "bearer",
  "expires_in": 900
}
Login¶
curl -X POST http://localhost:8000/api/auth/login/ \
  -H "Content-Type: application/json" \
  -d '{
    "username": "johndoe",
    "password": "secure_password_123"
  }'
Get Current User¶
Create Article (Authenticated)¶
curl -X POST http://localhost:8000/api/article/ \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "title": "My Article",
    "content": "Article content...",
    "category": 1
  }'
Refresh Token¶
curl -X POST http://localhost:8000/api/auth/refresh/ \
  -H "Content-Type: application/json" \
  -d '{"refresh_token": "YOUR_REFRESH_TOKEN"}'
Error Responses¶
Missing Token¶
curl http://localhost:8000/api/article/ \
  -X POST \
  -H "Content-Type: application/json" \
  -d '{"title": "Test"}'
Response (401):
Invalid Token¶
Response (401):
Expired Token¶
Response (401):
Insufficient Permissions¶
# Regular user trying to delete
curl -X DELETE http://localhost:8000/api/article/1/ \
  -H "Authorization: Bearer USER_TOKEN"
Response (403):
Using Swagger UI with Auth¶
The Swagger UI at /api/docs has built-in authentication support:
- Click the "Authorize" button at the top
 - Enter your token: 
Bearer YOUR_ACCESS_TOKEN - Click "Authorize"
 - Now all requests will include the token
 
Custom Claims¶
Add custom claims to your tokens:
# utils.py
def create_access_token(user_id: int, **extra_claims) -> str:
    """Generate JWT access token with custom claims"""
    now = datetime.utcnow()
    # Add custom claims
    payload = {
        "sub": str(user_id),
        "iss": settings.JWT_ISSUER,
        "aud": settings.JWT_AUDIENCE,
        "iat": now,
        "exp": now + timedelta(seconds=settings.JWT_ACCESS_TOKEN_EXPIRE),
        **extra_claims
    }
    return jwt.encode(payload, settings.JWT_PRIVATE_KEY, algorithm="RS256")
# In login endpoint
async def login(request, data: LoginSchema):
    # ... authentication logic ...
    # Create token with custom claims
    access_token = create_access_token(
        user.id,
        email=user.email,
        username=user.username,
        is_staff=user.is_staff,
        permissions=["read:articles", "write:articles"]
    )
    # ...
Access custom claims in your auth handler:
class JWTAuth(AsyncJwtBearer):
    # ...
    async def auth_handler(self, request):
        user_id = self.dcd.claims.get("sub")
        user = await User.objects.aget(id=user_id, is_active=True)
        # Attach custom claims to request
        request.user_permissions = self.dcd.claims.get("permissions", [])
        request.user_email = self.dcd.claims.get("email")
        return user
Best Practices¶
- 
Use RSA keys in production:
 - 
Keep access tokens short-lived:
 - 
Use refresh tokens:
 - 
Validate claims:
 - 
Hash passwords properly:
 - 
Check user ownership:
 - 
Use HTTPS in production - Never send tokens over HTTP
 - 
Implement token blacklist for logout functionality
 
Next Steps¶
Now that you have authentication set up, let's customize schemas in Step 4: Filtering & Pagination.
What You've Learned
- ✅ Setting up JWT authentication
 - ✅ Creating login/register endpoints
 - ✅ Protecting API endpoints
 - ✅ Implementing role-based access control
 - ✅ Validating ownership
 - ✅ Testing authenticated requests
 
See Also¶
- Authentication API Reference - Complete authentication documentation
 - APIViewSet Auth Options - ViewSet authentication options