add get_response_type(). add keys and fix bug in is_wp_oauth_response(). fix bug in user_data() logging

This commit is contained in:
lpm0073 2022-10-06 11:19:05 -05:00
parent 50d81932d2
commit ef804ecbf4

View File

@ -10,6 +10,7 @@ usage: subclass of BaseOAuth2 Third Party Authtencation client to
actually needs.
"""
import json
import re
from urllib.parse import urlencode
from urllib.request import urlopen
from logging import getLogger
@ -136,6 +137,25 @@ class StepwiseMathWPOAuth2(BaseOAuth2):
return True
return False
def is_wp_oauth_error(self, response) -> bool:
"""
validate the structure of the response object conforms to a
wp-oauth error json dict.
"""
if not type(response) == dict:
logger.warning(
"is_wp_oauth_error() was expecting a dict but received an object of type: {type}".format(
type=type(response)
)
)
return False
if len(response.keys()) != 2:
return False
qc_keys = ["error" "error_description"]
if all(key in response for key in qc_keys):
return True
return False
def is_wp_oauth_response(self, response) -> bool:
"""
validate the structure of the response object from wp-oauth. it's
@ -143,12 +163,21 @@ class StepwiseMathWPOAuth2(BaseOAuth2):
"""
if not type(response) == dict:
logger.warning(
"is_valid_user_details() was expecting a dict but received an object of type: {type}".format(
"is_wp_oauth_response() was expecting a dict but received an object of type: {type}".format(
type=type(response)
)
)
return False
qc_keys = ["ID" "display_name", "user_email", "user_login", "user_roles"]
qc_keys = [
"ID",
"capabilities",
"display_name",
"user_email",
"user_login",
"user_roles",
"user_registered",
"user_status",
]
if all(key in response for key in qc_keys):
return True
return False
@ -177,6 +206,21 @@ class StepwiseMathWPOAuth2(BaseOAuth2):
return True
return False
def get_response_type(self, response):
if type(response) != dict:
return "unknown response of type {t}".format(t=type(response))
if self.is_wp_oauth_error(response):
return "error response json dict"
if self.is_get_user_details_extended_dict(response):
return "extended get_user_details() return dict"
if self.is_wp_oauth_refresh_token_response(response):
return "wp-oauth refresh token json dict"
if self.is_wp_oauth_response(response):
return "wp-oauth user data response json dict"
if self.is_valid_user_details(response):
return "get_user_details() return dict"
return "unrecognized response dict"
# override Python Social Auth default end points.
# see https://wp-oauth.com/docs/general/endpoints/
#
@ -231,8 +275,9 @@ class StepwiseMathWPOAuth2(BaseOAuth2):
self.is_valid_user_details(response) or self.is_wp_oauth_response(response)
):
logger.error(
"get_user_details() - received an unrecognized response object. Cannot continue: {response}".format(
response=json.dumps(response, sort_keys=True, indent=4)
"get_user_details() - received an unrecognized response of {t}. Cannot continue: {response}".format(
t=self.get_response_type(response),
response=json.dumps(response, sort_keys=True, indent=4),
)
)
# if we have cached results then we might be able to recover.
@ -270,8 +315,9 @@ class StepwiseMathWPOAuth2(BaseOAuth2):
# conform to the structure of a wp-oauth dict.
if not self.is_wp_oauth_response(response):
logger.warning(
"get_user_details() - response object is not a valid wp-oauth object. Cannot continue. {response}".format(
response=json.dumps(response, sort_keys=True, indent=4)
"get_user_details() - response object of {t} is not a valid wp-oauth object. Cannot continue. {response}".format(
t=self.get_response_type(response),
response=json.dumps(response, sort_keys=True, indent=4),
)
)
return self.user_details
@ -337,7 +383,7 @@ class StepwiseMathWPOAuth2(BaseOAuth2):
if VERBOSE_LOGGING:
logger.info(
"user_data() response: {response}".format(
response=json.dumps(self.user_details, sort_keys=True, indent=4)
response=json.dumps(response, sort_keys=True, indent=4)
)
)
@ -363,8 +409,9 @@ class StepwiseMathWPOAuth2(BaseOAuth2):
if not self.is_valid_user_details(user_details):
logger.error(
"user_data() user_details object is invalid: {user_details}".format(
user_details=json.dumps(user_details, sort_keys=True, indent=4)
"user_data() user_details return object of {t} is invalid: {user_details}".format(
t=self.get_response_type(response),
user_details=json.dumps(user_details, sort_keys=True, indent=4),
)
)
return self.user_details