diff --git a/data/wp-oauth-me.json b/data/wp-oauth-me.json new file mode 100644 index 0000000..4831262 --- /dev/null +++ b/data/wp-oauth-me.json @@ -0,0 +1,7 @@ +{ + "access_token": "x39tb1rws4gcpsyvahkofrj7xwnrefxuk3jonrjn", + "expires_in": 3600, + "refresh_token": "bo85pscgut9nqy56snnzgw6ixljzspac3f74eemy", + "scope": "basic profile email", + "token_type": "Bearer" +} \ No newline at end of file diff --git a/setup.py b/setup.py index 5be3919..ba48d8b 100644 --- a/setup.py +++ b/setup.py @@ -11,10 +11,12 @@ os.chdir(os.path.normpath(os.path.join(os.path.abspath(__file__), os.pardir))) HERE = os.path.abspath(os.path.dirname(__file__)) + def load_readme(): with io.open(os.path.join(HERE, "README.rst"), "rt", encoding="utf8") as f: return f.read() + def load_about(): about = {} with io.open( @@ -25,6 +27,7 @@ def load_about(): exec(f.read(), about) # pylint: disable=exec-used return about + def load_requirements(*requirements_paths): """ Load all requirements from the specified requirements files. @@ -34,7 +37,9 @@ def load_requirements(*requirements_paths): requirements = set() for path in requirements_paths: requirements.update( - line.split("#")[0].strip() for line in open(path).readlines() if is_requirement(line.strip()) + line.split("#")[0].strip() + for line in open(path).readlines() + if is_requirement(line.strip()) ) return list(requirements) @@ -54,19 +59,22 @@ def is_requirement(line): or line.startswith("git+") ) + README = load_readme() ABOUT = load_about() VERSION = ABOUT["__version__"] setup( - name='wp-oauth-backend', + name="wp-oauth-backend", version=VERSION, - description=('An OAuth backend for the WP OAuth Wordpress Plugin, ' - 'that is customized for use in Open edX installations.'), + description=( + "An OAuth backend for the WP OAuth Wordpress Plugin, " + "that is customized for use in Open edX installations." + ), long_description=README, - author='Lawrence McDaniel, lpm0073@gmail.com', - author_email='lpm0073@gmail.com', - url='https://github.com/StepwiseMath/wp-oauth-backend', + author="Lawrence McDaniel, lpm0073@gmail.com", + author_email="lpm0073@gmail.com", + url="https://github.com/StepwiseMath/wp-oauth-backend", project_urls={ "Code": "https://github.com/StepwiseMath/wp-oauth-backend", "Issue tracker": "https://github.com/StepwiseMath/wp-oauth-backend/issues", @@ -76,11 +84,11 @@ setup( include_package_data=True, package_data={"": ["*.html"]}, # include any templates found in this repo. zip_safe=False, - keywords='WP OAuth', + keywords="WP OAuth", python_requires=">=3.7", install_requires=load_requirements("requirements/stable-psa.txt"), classifiers=[ - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", ], ) diff --git a/wp_oauth_backend/__about__.py b/wp_oauth_backend/__about__.py index b794fd4..3dc1f76 100644 --- a/wp_oauth_backend/__about__.py +++ b/wp_oauth_backend/__about__.py @@ -1 +1 @@ -__version__ = '0.1.0' +__version__ = "0.1.0" diff --git a/wp_oauth_backend/wp_oauth.py b/wp_oauth_backend/wp_oauth.py index 78ebf6a..83a7372 100644 --- a/wp_oauth_backend/wp_oauth.py +++ b/wp_oauth_backend/wp_oauth.py @@ -21,6 +21,8 @@ User = get_user_model() logger = getLogger(__name__) VERBOSE_LOGGING = True + + class StepwiseMathWPOAuth2(BaseOAuth2): """ WP OAuth authentication backend customized for Open edX. @@ -29,73 +31,74 @@ class StepwiseMathWPOAuth2(BaseOAuth2): Notes: - Python Social Auth social_core and/or Open edX's third party authentication core are finicky about how the "properties" are implemented. Anything that actually - declared as a Python class variable needs to remain a Python class variable. + declared as a Python class variable needs to remain a Python class variable. DO NOT refactor these into formal Python properties as something upstream will break your code. - for some reason adding an __init__() def to this class also causes something - upstream to break. If you try this then you'll get an error about a missing + upstream to break. If you try this then you'll get an error about a missing positional argument, 'strategy'. """ + _user_details = None - # This defines the backend name and identifies it during the auth process. + # This defines the backend name and identifies it during the auth process. # The name is used in the URLs /login/ and /complete/. # # This is the string value that will appear in the LMS Django Admin # Third Party Authentication / Provider Configuration (OAuth) # setup page drop-down box titled, "Backend name:", just above # the "Client ID:" and "Client Secret:" fields. - name = 'stepwisemath-oauth' + name = "stepwisemath-oauth" # note: no slash at the end of the base url. Python Social Auth # might clean this up for you, but i'm not 100% certain of that. BASE_URL = "https://stepwisemath.ai" - # The default key name where the user identification field is defined, it’s - # used in the auth process when some basic user data is returned. This Id - # is stored in the UserSocialAuth.uid field and this, together with the + # The default key name where the user identification field is defined, it’s + # used in the auth process when some basic user data is returned. This Id + # is stored in the UserSocialAuth.uid field and this, together with the # UserSocialAuth.provider field, is used to uniquely identify a user association. - ID_KEY = 'id' + ID_KEY = "id" - # Flags the backend to enforce email validation during the pipeline + # Flags the backend to enforce email validation during the pipeline # (if the corresponding pipeline social_core.pipeline.mail.mail_validation was enabled). REQUIRES_EMAIL_VALIDATION = False - # Some providers give nothing about the user but some basic data like the - # user Id or an email address. The default scope attribute is used to + # Some providers give nothing about the user but some basic data like the + # user Id or an email address. The default scope attribute is used to # specify a default value for the scope argument to request those extra bits. # - # wp-oauth supports 4 scopes: basic, email, profile, openeid. + # wp-oauth supports 4 scopes: basic, email, profile, openeid. # we want the first three of these. # see https://wp-oauth.com/docs/how-to/adding-supported-scopes/ - DEFAULT_SCOPE = ['basic', 'profile', 'email'] + DEFAULT_SCOPE = ["basic", "profile", "email"] - # Specifying the method type required to retrieve your access token if it’s + # Specifying the method type required to retrieve your access token if it’s # not the default GET request. - ACCESS_TOKEN_METHOD = 'POST' + ACCESS_TOKEN_METHOD = "POST" # require redirect domain to match the original initiating domain. SOCIAL_AUTH_SANITIZE_REDIRECTS = True - # During the auth process some basic user data is returned by the provider - # or retrieved by the user_data() method which usually is used to call - # some API on the provider to retrieve it. This data will be stored in the - # UserSocialAuth.extra_data attribute, but to make it accessible under some - # common names on different providers, this attribute defines a list of - # tuples in the form (name, alias) where name is the key in the user data + # During the auth process some basic user data is returned by the provider + # or retrieved by the user_data() method which usually is used to call + # some API on the provider to retrieve it. This data will be stored in the + # UserSocialAuth.extra_data attribute, but to make it accessible under some + # common names on different providers, this attribute defines a list of + # tuples in the form (name, alias) where name is the key in the user data # (which should be a dict instance) and alias is the name to store it on extra_data. EXTRA_DATA = [ - ('id', 'id'), - ('is_superuser', 'is_superuser'), - ('is_staff', 'is_staff'), - ('date_joined', 'date_joined'), - ] + ("id", "id"), + ("is_superuser", "is_superuser"), + ("is_staff", "is_staff"), + ("date_joined", "date_joined"), + ] - # the value of the scope separator is user-defined. Check the + # the value of the scope separator is user-defined. Check the # scopes field value for your oauth client in your wordpress host. # the wp-oauth default value for scopes is 'basic' but can be - # changed to a list. example 'basic, email, profile'. This + # changed to a list. example 'basic, email, profile'. This # list can be delimited with commas, spaces, whatever. SCOPE_SEPARATOR = " " @@ -108,40 +111,70 @@ class StepwiseMathWPOAuth2(BaseOAuth2): def is_valid_user_details(self, response) -> bool: """ - validate that the object passed is a dict containing at least the keys + validate that the object passed is a dict containing at least the keys in qc_keys. """ - if not type(response) == dict: - logger.warning('is_valid_user_details() was expecting a dict but received an object of type: {type}'.format( - type=type(response) - )) + if not type(response) == dict: + logger.warning( + "is_valid_user_details() was expecting a dict but received an object of type: {type}".format( + type=type(response) + ) + ) return False - qc_keys = ['id', 'date_joined', 'email', 'first_name', 'fullname', 'is_staff', 'is_superuser', 'last_name', 'username'] - if all(key in response for key in qc_keys): return True + qc_keys = [ + "id", + "date_joined", + "email", + "first_name", + "fullname", + "is_staff", + "is_superuser", + "last_name", + "username", + ] + if all(key in response for key in qc_keys): + return True return False def is_wp_oauth_response(self, response) -> bool: """ - validate the structure of the response object from wp-oauth. it's + validate the structure of the response object from wp-oauth. it's supposed to be a dict with at least the keys included in qc_keys. """ - if not type(response) == dict: - logger.warning('is_valid_user_details() was expecting a dict but received an object of type: {type}'.format( - type=type(response) - )) + if not type(response) == dict: + logger.warning( + "is_valid_user_details() was expecting a dict but received an object of type: {type}".format( + type=type(response) + ) + ) return False - qc_keys = ['ID' 'display_name', 'user_email', 'user_login', 'user_roles'] - if all(key in response for key in qc_keys): return True + qc_keys = ["ID" "display_name", "user_email", "user_login", "user_roles"] + if all(key in response for key in qc_keys): + return True return False - def is_wp_oauth_extended_response(self, response) -> bool: + def is_wp_oauth_refresh_token_response(self, response) -> bool: """ - validate the structure of the extended response object from wp-oauth. it's - supposed to be a dict with at least the keys included in qc_keys. + validate that the structure of the response contains the keys of + a refresh token dict. """ - if not self.is_valid_user_details(response): return False - qc_keys = ['access_token' 'expires_in', 'refresh_token', 'scope', 'token_type'] - if all(key in response for key in qc_keys): return True + if not self.is_valid_user_details(response): + return False + qc_keys = ["access_token" "expires_in", "refresh_token", "scope", "token_type"] + if all(key in response for key in qc_keys): + return True + return False + + def is_get_user_details_extended_dict(self, response) -> bool: + """ + validate whether the structure the response is a dict that + contains a.) all keys of a get_user_details() return, plus, + b.) all keys of a wp-oauth refresh token response. + """ + if not self.is_valid_user_details(response): + return False + if self.is_wp_oauth_refresh_token_response(response): + return True return False # override Python Social Auth default end points. @@ -153,21 +186,21 @@ class StepwiseMathWPOAuth2(BaseOAuth2): def AUTHORIZATION_URL(self) -> str: retval = f"{self.BASE_URL}/oauth/authorize" if VERBOSE_LOGGING: - logger.info('AUTHORIZATION_URL: {url}'.format(url=retval)) + logger.info("AUTHORIZATION_URL: {url}".format(url=retval)) return retval @property def ACCESS_TOKEN_URL(self) -> str: retval = f"{self.BASE_URL}/oauth/token" if VERBOSE_LOGGING: - logger.info('ACCESS_TOKEN_URL: {url}'.format(url=retval)) + logger.info("ACCESS_TOKEN_URL: {url}".format(url=retval)) return retval @property def USER_QUERY(self) -> str: retval = f"{self.BASE_URL}/oauth/me" if VERBOSE_LOGGING: - logger.info('USER_QUERY: {url}'.format(url=retval)) + logger.info("USER_QUERY: {url}".format(url=retval)) return retval @property @@ -178,124 +211,163 @@ class StepwiseMathWPOAuth2(BaseOAuth2): def user_details(self, value: dict): if self.is_valid_user_details(value): if VERBOSE_LOGGING: - logger.info('user_details.setter: new value set {value}'.format( - value=json.dumps(value, sort_keys=True, indent=4) - )) + logger.info( + "user_details.setter: new value set {value}".format( + value=json.dumps(value, sort_keys=True, indent=4) + ) + ) self._user_details = value else: - logger.error('user_details.setter: tried to pass an invalid object {value}'.format( - value=json.dumps(value, sort_keys=True, indent=4) - )) + logger.error( + "user_details.setter: tried to pass an invalid object {value}".format( + value=json.dumps(value, sort_keys=True, indent=4) + ) + ) # see https://python-social-auth.readthedocs.io/en/latest/backends/implementation.html # Return user details from the Wordpress user account def get_user_details(self, response) -> dict: - if not (self.is_valid_user_details(response) or self.is_wp_oauth_response(response)): - logger.error('get_user_details() - received an unrecognized response object. Cannot continue: {response}'.format( - response=json.dumps(response, sort_keys=True, indent=4) - )) + if not ( + self.is_valid_user_details(response) or self.is_wp_oauth_response(response) + ): + logger.error( + "get_user_details() - received an unrecognized response object. Cannot continue: {response}".format( + response=json.dumps(response, sort_keys=True, indent=4) + ) + ) # if we have cached results then we might be able to recover. return self.user_details - if VERBOSE_LOGGING: logger.info('get_user_details() begin with response: {response}'.format( - response=json.dumps(response, sort_keys=True, indent=4) - )) + if VERBOSE_LOGGING: + logger.info( + "get_user_details() begin with response: {response}".format( + response=json.dumps(response, sort_keys=True, indent=4) + ) + ) # a def in the third_party_auth pipeline list calls get_user_details() after its already # been called once. i don't know why. but, it passes the original get_user_details() dict - # enhanced with additional token-related keys. if we receive this modified dict then we + # enhanced with additional token-related keys. if we receive this modified dict then we # should pass it along to the next defs in the pipeline. # # If most of the original keys (see dict definition below) exist in the response object # then we can assume that this is our case. - if self.is_wp_oauth_extended_response(response): + if self.is_get_user_details_extended_dict(response): # ------------------------------------------------------------- - # expected use case #2: an enhanced derivation of an original + # expected use case #2: an enhanced derivation of an original # user_details dict. This is created when get_user_details() # is called from user_data(). # ------------------------------------------------------------- if VERBOSE_LOGGING: - logger.info('get_user_details() - detected an enhanced get_user_details() dict in the response: {response}'.format( - response=json.dumps(response, sort_keys=True, indent=4) - )) + logger.info( + "get_user_details() - detected an enhanced get_user_details() dict in the response: {response}".format( + response=json.dumps(response, sort_keys=True, indent=4) + ) + ) return response - # at this point we've ruled out the possibility of the response object + # at this point we've ruled out the possibility of the response object # being a derivation of a user_details dict. So, it should therefore - # conform to the structure of a wp-oauth dict. + # conform to the structure of a wp-oauth dict. if not self.is_wp_oauth_response(response): - logger.warning('get_user_details() - response object is not a valid wp-oauth object. Cannot continue. {response}'.format( - response=json.dumps(response, sort_keys=True, indent=4) - )) + logger.warning( + "get_user_details() - response object is not a valid wp-oauth object. Cannot continue. {response}".format( + response=json.dumps(response, sort_keys=True, indent=4) + ) + ) return self.user_details # ------------------------------------------------------------- # expected use case #1: response object is a dict with all required keys. # ------------------------------------------------------------- if VERBOSE_LOGGING: - logger.info('get_user_details() - start. response: {response}'.format( - response=json.dumps(response, sort_keys=True, indent=4) - )) + logger.info( + "get_user_details() - start. response: {response}".format( + response=json.dumps(response, sort_keys=True, indent=4) + ) + ) # try to parse out the first and last names - split_name = response.get('display_name', '').split() - first_name = split_name[0] if len(split_name) > 0 else '' - last_name = split_name[-1] if len(split_name) == 2 else '' + split_name = response.get("display_name", "").split() + first_name = split_name[0] if len(split_name) > 0 else "" + last_name = split_name[-1] if len(split_name) == 2 else "" # check for superuser / staff status - user_roles = response.get('user_roles', []) - super_user = 'administrator' in user_roles - is_staff = 'administrator' in user_roles + user_roles = response.get("user_roles", []) + super_user = "administrator" in user_roles + is_staff = "administrator" in user_roles self.user_details = { - 'id': int(response.get('ID'), 0), - 'username': response.get('user_login', ''), - 'email': response.get('user_email', ''), - 'first_name': first_name, - 'last_name': last_name, - 'fullname': response.get('display_name', ''), - 'is_superuser': super_user, - 'is_staff': is_staff, - 'refresh_token': response.get('refresh_token', ''), - 'scope': response.get('scope', ''), - 'token_type': response.get('token_type', ''), - 'date_joined': response.get('user_registered', ''), - 'user_status': response.get('user_status', ''), + "id": int(response.get("ID"), 0), + "username": response.get("user_login", ""), + "email": response.get("user_email", ""), + "first_name": first_name, + "last_name": last_name, + "fullname": response.get("display_name", ""), + "is_superuser": super_user, + "is_staff": is_staff, + "refresh_token": response.get("refresh_token", ""), + "scope": response.get("scope", ""), + "token_type": response.get("token_type", ""), + "date_joined": response.get("user_registered", ""), + "user_status": response.get("user_status", ""), } if VERBOSE_LOGGING: - logger.info('get_user_details() - finish. user_details: {user_details}'.format( - user_details=json.dumps(self.user_details, sort_keys=True, indent=4) - )) + logger.info( + "get_user_details() - finish. user_details: {user_details}".format( + user_details=json.dumps(self.user_details, sort_keys=True, indent=4) + ) + ) return self.user_details - # Load user data from service url end point. Note that in the case of + # Load user data from service url end point. Note that in the case of # wp oauth, the response object returned by self.USER_QUERY # is the same as the response object passed to get_user_details(). # # see https://python-social-auth.readthedocs.io/en/latest/backends/implementation.html def user_data(self, access_token, *args, **kwargs) -> dict: - - url = f'{self.USER_QUERY}?' + urlencode({ - 'access_token': access_token - }) + response = None + user_details = None + url = f"{self.USER_QUERY}?" + urlencode({"access_token": access_token}) if VERBOSE_LOGGING: logger.info("user_data() url: {url}".format(url=url)) try: response = json.loads(self._urlopen(url)) + if VERBOSE_LOGGING: + logger.info( + "user_data() response: {response}".format( + response=json.dumps(self.user_details, sort_keys=True, indent=4) + ) + ) + + user_details = self.get_user_details(response) + + if VERBOSE_LOGGING: + logger.info( + "user_data() local variable user_details: {user_details}".format( + user_details=json.dumps(user_details, sort_keys=True, indent=4) + ) + ) + if VERBOSE_LOGGING: + logger.info( + "user_data() class property value of self.user_details: {user_details}".format( + user_details=json.dumps( + self.user_details, sort_keys=True, indent=4 + ) + ) + ) except ValueError as e: - logger.error('user_data() {err}'.format(err=e)) + logger.error("user_data() {err}".format(err=e)) return None - if not self.is_valid_user_details(response): - logger.error('user_data() response object is invalid: {response}'.format( - response=json.dumps(self.user_details, sort_keys=True, indent=4) - )) + if not self.is_valid_user_details(user_details): + logger.error( + "user_data() user_details object is invalid: {user_details}".format( + user_details=json.dumps(user_details, sort_keys=True, indent=4) + ) + ) return self.user_details - - # refresh our internal user_details property after having validated - # response from USER_QUERY - self.get_user_details(response) # add syncronization of any data fields that get missed by the built-in # open edx third party authentication sync functionality. @@ -303,20 +375,32 @@ class StepwiseMathWPOAuth2(BaseOAuth2): # this gets called just prior to account creation for # new users, hence, we need to catch DoesNotExist # exceptions. - user=User.objects.get(username=self.user_details['username']) + user = User.objects.get(username=self.user_details["username"]) except User.DoesNotExist: return self.user_details - if (user.is_superuser != self.user_details['is_superuser']) or (user.is_staff != self.user_details['is_staff']): - user.is_superuser = self.user_details['is_superuser'] - user.is_staff = self.user_details['is_staff'] + if (user.is_superuser != self.user_details["is_superuser"]) or ( + user.is_staff != self.user_details["is_staff"] + ): + user.is_superuser = self.user_details["is_superuser"] + user.is_staff = self.user_details["is_staff"] user.save() - logger.info('Updated the is_superuser/is_staff flags for user {username}'.format(username=user.username)) + logger.info( + "Updated the is_superuser/is_staff flags for user {username}".format( + username=user.username + ) + ) - if (user.first_name != self.user_details['first_name']) or (user.last_name != self.user_details['last_name']): - user.first_name = self.user_details['first_name'] - user.last_name = self.user_details['last_name'] + if (user.first_name != self.user_details["first_name"]) or ( + user.last_name != self.user_details["last_name"] + ): + user.first_name = self.user_details["first_name"] + user.last_name = self.user_details["last_name"] user.save() - logger.info('Updated first_name/last_name for user {username}'.format(username=user.username)) + logger.info( + "Updated first_name/last_name for user {username}".format( + username=user.username + ) + ) return self.user_details