refactor all dict methods

This commit is contained in:
lpm0073 2022-10-06 13:05:02 -05:00
parent 9ef6849a1e
commit 01c697717b
2 changed files with 30 additions and 56 deletions

View File

@ -1,12 +1,13 @@
import json import json
# Opening JSON file # Opening JSON file
f = open('get_user_details_extended.json') f = open("get_user_details_extended.json")
# returns JSON object as # returns JSON object as
# a dictionary # a dictionary
response = json.load(f) response = json.load(f)
def is_valid_user_details(response) -> bool: def is_valid_user_details(response) -> bool:
""" """
validate that the object passed is a dict containing at least the keys validate that the object passed is a dict containing at least the keys
@ -27,6 +28,7 @@ def is_valid_user_details(response) -> bool:
return True return True
return False return False
def is_wp_oauth_refresh_token_response(response) -> bool: def is_wp_oauth_refresh_token_response(response) -> bool:
""" """
validate that the structure of the response contains the keys of validate that the structure of the response contains the keys of
@ -39,10 +41,9 @@ def is_wp_oauth_refresh_token_response(response) -> bool:
return True return True
return False return False
print('is_valid_user_details')
print("is_valid_user_details")
print(is_valid_user_details(response)) print(is_valid_user_details(response))
print('is_wp_oauth_refresh_token_response') print("is_wp_oauth_refresh_token_response")
print(is_wp_oauth_refresh_token_response(response)) print(is_wp_oauth_refresh_token_response(response))

View File

@ -109,19 +109,22 @@ class StepwiseMathWPOAuth2(BaseOAuth2):
""" """
return urlopen(url).read().decode("utf-8") return urlopen(url).read().decode("utf-8")
def is_valid_dict(self, response, qc_keys) -> bool:
if not type(response) == dict:
logger.warning(
"is_valid_dict() was expecting a dict but received an object of type: {type}".format(
type=type(response)
)
)
return False
return all(key in response for key in qc_keys)
def is_valid_user_details(self, response) -> bool: def is_valid_user_details(self, response) -> bool:
""" """
validate that the object passed is a json dict containing at least validate that the object passed is a json dict containing at least
the keys in qc_keys. These are the dict keys created in get_user_details() the keys in qc_keys. These are the dict keys created in get_user_details()
default return object. default return object.
""" """
if not type(response) == dict:
logger.warning(
"is_valid_user_details() was expecting a dict but received an object of type: {type}".format(
type=type(response)
)
)
return False
qc_keys = [ qc_keys = [
"id", "id",
"date_joined", "date_joined",
@ -133,41 +136,21 @@ class StepwiseMathWPOAuth2(BaseOAuth2):
"last_name", "last_name",
"username", "username",
] ]
if all(key in response for key in qc_keys): return self.is_valid_dict(response, qc_keys)
return True
return False
def is_wp_oauth_error(self, response) -> bool: def is_wp_oauth_error(self, response) -> bool:
""" """
validate the structure of the response object conforms to a validate the structure of the response object conforms to a
wp-oauth error json dict. wp-oauth error json dict.
""" """
if not type(response) == dict:
logger.warning(
"is_wp_oauth_error() was expecting a dict but received an object of type: {type}".format(
type=type(response)
)
)
return False
if len(response.keys()) != 2:
return False
qc_keys = ["error" "error_description"] qc_keys = ["error" "error_description"]
if all(key in response for key in qc_keys): return self.is_valid_dict(response, qc_keys) and len(response.keys()) == 2
return True
return False
def is_wp_oauth_response(self, response) -> bool: def is_wp_oauth_response(self, response) -> bool:
""" """
validate the structure of the response object from wp-oauth. it's validate the structure of the response object from wp-oauth. it's
supposed to be a dict with at least the keys included in qc_keys. supposed to be a dict with at least the keys included in qc_keys.
""" """
if not type(response) == dict:
logger.warning(
"is_wp_oauth_response() was expecting a dict but received an object of type: {type}".format(
type=type(response)
)
)
return False
qc_keys = [ qc_keys = [
"ID", "ID",
"capabilities", "capabilities",
@ -178,21 +161,15 @@ class StepwiseMathWPOAuth2(BaseOAuth2):
"user_registered", "user_registered",
"user_status", "user_status",
] ]
if all(key in response for key in qc_keys): return self.is_valid_dict(response, qc_keys)
return True
return False
def is_wp_oauth_refresh_token_response(self, response) -> bool: def is_wp_oauth_refresh_token_response(self, response) -> bool:
""" """
validate that the structure of the response contains the keys of validate that the structure of the response contains the keys of
a refresh token dict. a refresh token dict.
""" """
if not self.is_valid_user_details(response):
return False
qc_keys = ["access_token", "expires_in", "refresh_token", "scope", "token_type"] qc_keys = ["access_token", "expires_in", "refresh_token", "scope", "token_type"]
if all(key in response for key in qc_keys): return self.is_valid_dict(response, qc_keys)
return True
return False
def is_get_user_details_extended_dict(self, response) -> bool: def is_get_user_details_extended_dict(self, response) -> bool:
""" """
@ -200,21 +177,17 @@ class StepwiseMathWPOAuth2(BaseOAuth2):
contains a.) all keys of a get_user_details() return, plus, contains a.) all keys of a get_user_details() return, plus,
b.) all keys of a wp-oauth refresh token response. b.) all keys of a wp-oauth refresh token response.
""" """
if not self.is_valid_user_details(response): return self.is_valid_user_details(
return False response
if self.is_wp_oauth_refresh_token_response(response): ) and self.is_wp_oauth_refresh_token_response(response)
return True
return False
def is_valid_get_user_details_response(self, response) -> bool: def is_valid_get_user_details_response(self, response) -> bool:
""" """
True if the response object can be processed by get_user_details() True if the response object can be processed by get_user_details()
""" """
if self.is_valid_user_details(response): return self.is_valid_user_details(response) or self.is_wp_oauth_response(
return True response
if self.is_wp_oauth_response(response): )
return True
return False
def get_response_type(self, response) -> str: def get_response_type(self, response) -> str:
if type(response) != dict: if type(response) != dict:
@ -295,7 +268,7 @@ class StepwiseMathWPOAuth2(BaseOAuth2):
logger.info( logger.info(
"get_user_details() received {t}: {response}".format( "get_user_details() received {t}: {response}".format(
t=self.get_response_type(response), t=self.get_response_type(response),
response=json.dumps(response, sort_keys=True, indent=4) response=json.dumps(response, sort_keys=True, indent=4),
) )
) )
# a def in the third_party_auth pipeline list calls get_user_details() after its already # a def in the third_party_auth pipeline list calls get_user_details() after its already