RFC7662: OAuth 2.0 Token Introspection

This section contains the generic implementation of RFC7662. OAuth 2.0 Token Introspection is usually designed to let resource servers to know content of a token.

Register Introspection Endpoint

Changed in version v1.0.

Authlib is designed to be very extendable, with the method of .register_endpoint on AuthorizationServer, it is easy to add the introspection endpoint to the authorization server. It works on both Flask OAuth 2.0 Server and Django OAuth 2.0 Server. But first, we need to implement the missing methods:

from authlib.oauth2.rfc7662 import IntrospectionEndpoint

class MyIntrospectionEndpoint(IntrospectionEndpoint):
    def query_token(self, token, token_type_hint):
        if token_type_hint == 'access_token':
            tok = Token.query.filter_by(access_token=token).first()
        elif token_type_hint == 'refresh_token':
            tok = Token.query.filter_by(refresh_token=token).first()
        else:
            # without token_type_hint
            tok = Token.query.filter_by(access_token=token).first()
            if not tok:
                tok = Token.query.filter_by(refresh_token=token).first()
        return tok

    def introspect_token(self, token):
        return {
            'active': True,
            'client_id': token.client_id,
            'token_type': token.token_type,
            'username': get_token_username(token),
            'scope': token.get_scope(),
            'sub': get_token_user_sub(token),
            'aud': token.client_id,
            'iss': 'https://server.example.com/',
            'exp': token.expires_at,
            'iat': token.issued_at,
        }

    def check_permission(self, token, client, request):
        # for example, we only allow internal client to access introspection endpoint
        return client.client_type == 'internal'

# register it to authorization server
server.register_endpoint(MyIntrospectionEndpoint)

After the registration, we can create a response with:

@app.route('/oauth/introspect', methods=['POST'])
def introspect_token():
    return server.create_endpoint_response(MyIntrospectionEndpoint.ENDPOINT_NAME)

Use Introspection in Resource Server

New in version v1.0.

When resource server has no access to token database, it can use introspection endpoint to validate the given token. Here is how:

import requests
from authlib.oauth2.rfc7662 import IntrospectTokenValidator
from your_project import secrets

class MyIntrospectTokenValidator(IntrospectTokenValidator):
    def introspect_token(self, token_string):
        url = 'https://example.com/oauth/introspect'
        data = {'token': token_string, 'token_type_hint': 'access_token'}
        auth = (secrets.internal_client_id, secrets.internal_client_secret)
        resp = requests.post(url, data=data, auth=auth)
        resp.raise_for_status()
        return resp.json()

We can then register this token validator in to resource protector:

require_oauth = ResourceProtector()
require_oauth.register_token_validator(MyIntrospectTokenValidator())

Please note, when using IntrospectTokenValidator, the current_token will be a dict.

API Reference

class authlib.oauth2.rfc7662.IntrospectionEndpoint(server)

Implementation of introspection endpoint which is described in RFC7662.

ENDPOINT_NAME = 'introspection'

Endpoint name to be registered

authenticate_token(request, client)

The protected resource calls the introspection endpoint using an HTTP POST request with parameters sent as “application/x-www-form-urlencoded” data. The protected resource sends a parameter representing the token along with optional parameters representing additional context that is known by the protected resource to aid the authorization server in its response.

token

REQUIRED The string value of the token. For access tokens, this is the access_token value returned from the token endpoint defined in OAuth 2.0. For refresh tokens, this is the refresh_token value returned from the token endpoint as defined in OAuth 2.0.

token_type_hint

OPTIONAL A hint about the type of the token submitted for introspection.

create_endpoint_response(request)

Validate introspection request and create the response.

Returns

(status_code, body, headers)

check_permission(token, client, request)

Check if the request has permission to introspect the token. Developers MUST implement this method:

def check_permission(self, token, client, request):
    # only allow a special client to introspect the token
    return client.client_id == 'introspection_client'
Returns

bool

query_token(token_string, token_type_hint)

Get the token from database/storage by the given token string. Developers should implement this method:

def query_token(self, token_string, token_type_hint):
    if token_type_hint == 'access_token':
        tok = Token.query_by_access_token(token_string)
    elif token_type_hint == 'refresh_token':
        tok = Token.query_by_refresh_token(token_string)
    else:
        tok = Token.query_by_access_token(token_string)
        if not tok:
            tok = Token.query_by_refresh_token(token_string)
    return tok
introspect_token(token)

Read given token and return its introspection metadata as a dictionary following Section 2.2:

def introspect_token(self, token):
    return {
        'active': True,
        'client_id': token.client_id,
        'token_type': token.token_type,
        'username': get_token_username(token),
        'scope': token.get_scope(),
        'sub': get_token_user_sub(token),
        'aud': token.client_id,
        'iss': 'https://server.example.com/',
        'exp': token.expires_at,
        'iat': token.issued_at,
    }
authenticate_endpoint_client(request)

Authentication client for endpoint with CLIENT_AUTH_METHODS.

class authlib.oauth2.rfc7662.IntrospectTokenValidator(realm=None, **extra_attributes)
authenticate_token(token_string)

A method to query token from database with the given token string. Developers MUST re-implement this method. For instance:

def authenticate_token(self, token_string):
    return get_token_from_database(token_string)
Parameters

token_string – A string to represent the access_token.

Returns

token

introspect_token(token_string)

Request introspection token endpoint with the given token string, authorization server will return token information in JSON format. Developers MUST implement this method before using it:

def introspect_token(self, token_string):
    # for example, introspection token endpoint has limited
    # internal IPs to access, so there is no need to add
    # authentication.
    url = 'https://example.com/oauth/introspect'
    resp = requests.post(url, data={'token': token_string})
    resp.raise_for_status()
    return resp.json()
validate_token(token, scopes, request)

A method to validate if the authorized token is valid, if it has the permission on the given scopes. Developers MUST re-implement this method. e.g, check if token is expired, revoked:

def validate_token(self, token, scopes, request):
    if not token:
        raise InvalidTokenError()
    if token.is_expired() or token.is_revoked():
        raise InvalidTokenError()
    if not match_token_scopes(token, scopes):
        raise InsufficientScopeError()