This section contains the generic implementation of RFC7009.
The revocation endpoint can be easily registered to Flask OAuth 2.0 Server. But there are missing methods to be implemented:
from authlib.oauth2.rfc7009 import RevocationEndpoint
class MyRevocationEndpoint(RevocationEndpoint):
def query_token(self, token, token_type_hint, client):
q = Token.query.filter_by(client_id=client.client_id)
if token_type_hint == 'access_token':
return q.filter_by(access_token=token).first()
elif token_type_hint == 'refresh_token':
return q.filter_by(refresh_token=token).first()
# without token_type_hint
item = q.filter_by(access_token=token).first()
if item:
return item
return q.filter_by(refresh_token=token).first()
def revoke_token(self, token):
token.revoked = True
db.session.add(token)
db.session.commit()
# register it to authorization server
server.register_endpoint(MyRevocationEndpoint)
There is also a shortcut method to create revocation endpoint:
from authlib.flask.oauth2.sqla import create_revocation_endpoint
RevocationEndpoint = create_revocation_endpoint(db.session, Token)
# register it to authorization server
server.register_endpoint(RevocationEndpoint)
After the registration, you can create a response with:
@app.route('/oauth/revoke', methods=['POST'])
def revoke_token():
return server.create_endpoint_response(MyRevocationEndpoint.ENDPOINT_NAME)
authlib.oauth2.rfc7009.
RevocationEndpoint
(request, server)¶Implementation of revocation endpoint which is described in RFC7009.
ENDPOINT_NAME
= 'revocation'¶Endpoint name to be registered
validate_endpoint_request
()¶The client constructs the request by including the following parameters using the “application/x-www-form-urlencoded” format in the HTTP request entity-body:
create_endpoint_response
()¶Validate revocation request and create the response for revocation. For example, a client may request the revocation of a refresh token with the following request:
POST /revoke HTTP/1.1
Host: server.example.com
Content-Type: application/x-www-form-urlencoded
Authorization: Basic czZCaGRSa3F0MzpnWDFmQmF0M2JW
token=45ghiukldjahdnhzdauz&token_type_hint=refresh_token
Returns: | (status_code, body, headers) |
---|
query_token
(token, token_type_hint, client)¶Get the token from database/storage by the given token string. Developers should implement this method:
def query_token(self, token, token_type_hint, client):
if token_type_hint == 'access_token':
return Token.query_by_access_token(token, client.client_id)
if token_type_hint == 'refresh_token':
return Token.query_by_refresh_token(token, client.client_id)
return Token.query_by_access_token(token, client.client_id) or Token.query_by_refresh_token(token, client.client_id)
revoke_token
(token)¶Mark token as revoked. Since token MUST be unique, it would be dangerous to delete it. Consider this situation:
It would be secure to mark a token as revoked:
def revoke_token(self, token):
token.revoked = True
session.add(token)
session.commit()
authenticate_endpoint_client
()¶Authentication client for endpoint with CLIENT_AUTH_METHODS
.