I'm trying to add in a feature for people to create permanent api tokens
for use with my application in conjunction with the normal access/refresh tokens that users experience either via a browser or via CLI. The use case is they want an API token that can be used to easily script out access with my application. When a user creates one of these new API tokens for themselves, I save that token and all of the user's information into a database so I can look it up later. This should theoretically allow me to look up a user's information based on this new token (and allow it to be revoked by simply removing it from the database). However, I'm having some trouble getting this to work.
I added in the following to successfully return either the normal header token or my new api token:
class MyConfig(Configuration):
def get_authorization_header(self, request):
if "apitoken" in request.headers:
return "apitoken"
return "authorization"
def get_authorization_header_prefix(self, request):
if "apitoken" in request.headers:
return ""
return "Bearer"
The above seems to be working, but I can't seem to add in my own custom verification/authentication check anywhere. For an individual endpoint that I create in the my_views section, I can successfully do the following to get all of the user's api tokens (given normal access token or new api token):
async def get(self, request, *args, **kwargs):
try:
payload = self.instance.auth.extract_payload(request, verify=True)
try:
user = await utils.call(
self.instance.auth.retrieve_user, request, payload=payload
)
user_id = await self.instance.auth._get_user_id(user)
except Exception as e:
return json({"status": "error", "error": "failed to get user from token"})
except Exception as e:
print("can't parse token as JWT, gonna try database lookup")
try:
if 'apitoken' in request.headers:
try:
query = await db_model.apitokens_query()
db_apitoken = await db_objects.get(query, token=request.headers.get('apitoken'))
user_id = db_apitoken.operator.id
except Exception as e:
print("Failed to find api token in db")
return json({"status": "error", "error": "Failed to find api token in db"})
else:
return json({"failed to find token to authenticate with"})
except Exception as e:
raise e
query = await db_model.operator_query()
operator = await db_objects.get(query, id=user_id)
query = await db_model.apitokens_query()
tokens = await db_objects.execute(query.where(db_model.APITokens.operator == operator))
return json({"status": "success", "tokens": [t.to_json() for t in tokens]})
I can't get this to work for any arbitrary endpoint though. With debug True I get:
{"reasons":["Not enough segments."],"exception":"Unauthorized"}
, with debug False I get {"reasons":["Auth required."],"exception":"Unauthorized"}
.
I've tried extending the responses class, but it never seems to be called:
class MyResponses(Responses):
@staticmethod
async def extend_retrieve_user(request, user=None, payload=None):
print("called extend retrieve user")
return await retrieve_user(request, payload)
@staticmethod
async def extend_verify(request, user=None, payload=None):
print("called extend verify")
return {"Verify": True}
and I've even tried extending authenticate:
class MyAuthentication(Authentication):
async def authenticate(self, request, *args, **kwargs):
print("called authenticate")
return {"user_id": 1}
But i still don't see my print statements showing up, so these aren't getting called. I have all of the following added to my Initialize
function as well:
authenticate=app.routes.authentication.authenticate,
retrieve_user=app.routes.authentication.retrieve_user,
cookie_set=True,
cookie_strict=False,
cookie_access_token_name='access_token',
cookie_refresh_token_name='refresh_token',
cookie_httponly=True,
scopes_enabled=True,
add_scopes_to_payload=app.routes.authentication.add_scopes_to_payload,
scopes_name='scope',
secret='apfell_secret jwt for signing here',
url_prefix='/',
class_views=my_views,
path_to_authenticate='/auth',
path_to_retrieve_user='/me',
path_to_verify='/verify',
path_to_refresh='/refresh',
refresh_token_enabled=True,
expiration_delta=14400, # initial token expiration time
store_refresh_token=app.routes.authentication.store_refresh_token,
retrieve_refresh_token=app.routes.authentication.retrieve_refresh_token,
configuration_class=MyConfig,
responses_class=MyResponses,
authentication_class=MyAuthentication,
debug=True)
What am I missing to enable this kind of override check?
question