edx-oauth2-wordpress-backend/wp_oauth_backend/wp_oauth.py

407 lines
16 KiB
Python
Raw Normal View History

"""
written by: Lawrence McDaniel
https://lawrencemcdaniel.com
date: oct-2022
2022-10-04 17:58:18 +03:00
usage: subclass of BaseOAuth2 Third Party Authtencation client to
handle the field mapping and data conversions between
the dict that WP Oauth returns versus the dict that Open edX
actually needs.
"""
2022-10-03 19:46:35 +03:00
import json
from urllib.parse import urlencode
from urllib.request import urlopen
from logging import getLogger
2022-10-03 19:46:35 +03:00
from social_core.backends.oauth import BaseOAuth2
from django.contrib.auth import get_user_model
2022-10-03 19:46:35 +03:00
User = get_user_model()
2022-10-03 19:46:35 +03:00
logger = getLogger(__name__)
VERBOSE_LOGGING = True
2022-10-06 18:13:38 +03:00
2022-10-04 17:21:54 +03:00
class StepwiseMathWPOAuth2(BaseOAuth2):
"""
2022-10-04 17:58:18 +03:00
WP OAuth authentication backend customized for Open edX.
see https://python-social-auth.readthedocs.io/en/latest/backends/implementation.html
Notes:
- Python Social Auth social_core and/or Open edX's third party authentication core
are finicky about how the "properties" are implemented. Anything that actually
2022-10-06 18:13:38 +03:00
declared as a Python class variable needs to remain a Python class variable.
DO NOT refactor these into formal Python properties as something upstream will
break your code.
- for some reason adding an __init__() def to this class also causes something
2022-10-06 18:13:38 +03:00
upstream to break. If you try this then you'll get an error about a missing
positional argument, 'strategy'.
"""
2022-10-06 18:13:38 +03:00
_user_details = None
2022-10-06 18:13:38 +03:00
# This defines the backend name and identifies it during the auth process.
2022-10-04 17:58:18 +03:00
# The name is used in the URLs /login/<backend name> and /complete/<backend name>.
#
2022-10-04 17:21:54 +03:00
# This is the string value that will appear in the LMS Django Admin
# Third Party Authentication / Provider Configuration (OAuth)
# setup page drop-down box titled, "Backend name:", just above
# the "Client ID:" and "Client Secret:" fields.
2022-10-06 18:13:38 +03:00
name = "stepwisemath-oauth"
2022-10-04 17:21:54 +03:00
# note: no slash at the end of the base url. Python Social Auth
# might clean this up for you, but i'm not 100% certain of that.
2022-10-04 17:58:18 +03:00
BASE_URL = "https://stepwisemath.ai"
2022-10-06 18:13:38 +03:00
# The default key name where the user identification field is defined, its
# used in the auth process when some basic user data is returned. This Id
# is stored in the UserSocialAuth.uid field and this, together with the
2022-10-04 17:58:18 +03:00
# UserSocialAuth.provider field, is used to uniquely identify a user association.
2022-10-06 18:13:38 +03:00
ID_KEY = "id"
2022-10-04 17:58:18 +03:00
2022-10-06 18:13:38 +03:00
# Flags the backend to enforce email validation during the pipeline
2022-10-04 17:58:18 +03:00
# (if the corresponding pipeline social_core.pipeline.mail.mail_validation was enabled).
REQUIRES_EMAIL_VALIDATION = False
2022-10-06 18:13:38 +03:00
# Some providers give nothing about the user but some basic data like the
# user Id or an email address. The default scope attribute is used to
2022-10-04 17:58:18 +03:00
# specify a default value for the scope argument to request those extra bits.
2022-10-04 20:00:00 +03:00
#
2022-10-06 18:13:38 +03:00
# wp-oauth supports 4 scopes: basic, email, profile, openeid.
2022-10-04 20:00:00 +03:00
# we want the first three of these.
# see https://wp-oauth.com/docs/how-to/adding-supported-scopes/
2022-10-06 18:13:38 +03:00
DEFAULT_SCOPE = ["basic", "profile", "email"]
2022-10-04 17:58:18 +03:00
2022-10-06 18:13:38 +03:00
# Specifying the method type required to retrieve your access token if its
2022-10-04 17:58:18 +03:00
# not the default GET request.
2022-10-06 18:13:38 +03:00
ACCESS_TOKEN_METHOD = "POST"
2022-10-04 17:58:18 +03:00
# require redirect domain to match the original initiating domain.
SOCIAL_AUTH_SANITIZE_REDIRECTS = True
2022-10-06 18:13:38 +03:00
# During the auth process some basic user data is returned by the provider
# or retrieved by the user_data() method which usually is used to call
# some API on the provider to retrieve it. This data will be stored in the
# UserSocialAuth.extra_data attribute, but to make it accessible under some
# common names on different providers, this attribute defines a list of
# tuples in the form (name, alias) where name is the key in the user data
2022-10-04 17:58:18 +03:00
# (which should be a dict instance) and alias is the name to store it on extra_data.
EXTRA_DATA = [
2022-10-06 18:13:38 +03:00
("id", "id"),
("is_superuser", "is_superuser"),
("is_staff", "is_staff"),
("date_joined", "date_joined"),
]
2022-10-06 18:13:38 +03:00
# the value of the scope separator is user-defined. Check the
2022-10-04 17:21:54 +03:00
# scopes field value for your oauth client in your wordpress host.
# the wp-oauth default value for scopes is 'basic' but can be
2022-10-06 18:13:38 +03:00
# changed to a list. example 'basic, email, profile'. This
2022-10-04 17:21:54 +03:00
# list can be delimited with commas, spaces, whatever.
2022-10-04 17:58:18 +03:00
SCOPE_SEPARATOR = " "
2022-10-03 19:46:35 +03:00
2022-10-04 17:58:18 +03:00
# private utility function. not part of psa.
def _urlopen(self, url):
2022-10-06 05:00:21 +03:00
"""
ensure that url response object is utf-8 encoded.
"""
2022-10-04 17:58:18 +03:00
return urlopen(url).read().decode("utf-8")
2022-10-03 19:46:35 +03:00
2022-10-06 04:51:52 +03:00
def is_valid_user_details(self, response) -> bool:
2022-10-06 05:00:21 +03:00
"""
2022-10-06 18:13:38 +03:00
validate that the object passed is a dict containing at least the keys
2022-10-06 05:00:21 +03:00
in qc_keys.
"""
2022-10-06 18:13:38 +03:00
if not type(response) == dict:
logger.warning(
"is_valid_user_details() was expecting a dict but received an object of type: {type}".format(
type=type(response)
)
)
return False
2022-10-06 18:13:38 +03:00
qc_keys = [
"id",
"date_joined",
"email",
"first_name",
"fullname",
"is_staff",
"is_superuser",
"last_name",
"username",
]
if all(key in response for key in qc_keys):
return True
2022-10-06 04:51:52 +03:00
return False
def is_wp_oauth_response(self, response) -> bool:
2022-10-06 05:00:21 +03:00
"""
2022-10-06 18:13:38 +03:00
validate the structure of the response object from wp-oauth. it's
2022-10-06 05:00:21 +03:00
supposed to be a dict with at least the keys included in qc_keys.
"""
2022-10-06 18:13:38 +03:00
if not type(response) == dict:
logger.warning(
"is_valid_user_details() was expecting a dict but received an object of type: {type}".format(
type=type(response)
)
)
return False
2022-10-06 18:13:38 +03:00
qc_keys = ["ID" "display_name", "user_email", "user_login", "user_roles"]
if all(key in response for key in qc_keys):
return True
2022-10-06 04:51:52 +03:00
return False
2022-10-06 18:13:38 +03:00
def is_wp_oauth_refresh_token_response(self, response) -> bool:
"""
2022-10-06 18:13:38 +03:00
validate that the structure of the response contains the keys of
a refresh token dict.
"""
2022-10-06 18:13:38 +03:00
if not self.is_valid_user_details(response):
return False
qc_keys = ["access_token" "expires_in", "refresh_token", "scope", "token_type"]
if all(key in response for key in qc_keys):
return True
return False
def is_get_user_details_extended_dict(self, response) -> bool:
"""
validate whether the structure the response is a dict that
contains a.) all keys of a get_user_details() return, plus,
b.) all keys of a wp-oauth refresh token response.
"""
if not self.is_valid_user_details(response):
return False
if self.is_wp_oauth_refresh_token_response(response):
return True
return False
2022-10-06 15:57:58 +03:00
2022-10-04 18:16:48 +03:00
# override Python Social Auth default end points.
2022-10-04 17:21:54 +03:00
# see https://wp-oauth.com/docs/general/endpoints/
2022-10-04 18:16:48 +03:00
#
# Note that we're only implementing Python properties
# so that we can include logging for diagnostic purposes.
2022-10-03 19:46:35 +03:00
@property
def AUTHORIZATION_URL(self) -> str:
2022-10-04 18:16:48 +03:00
retval = f"{self.BASE_URL}/oauth/authorize"
if VERBOSE_LOGGING:
2022-10-06 18:13:38 +03:00
logger.info("AUTHORIZATION_URL: {url}".format(url=retval))
2022-10-04 18:16:48 +03:00
return retval
2022-10-03 19:46:35 +03:00
@property
def ACCESS_TOKEN_URL(self) -> str:
2022-10-04 18:16:48 +03:00
retval = f"{self.BASE_URL}/oauth/token"
if VERBOSE_LOGGING:
2022-10-06 18:13:38 +03:00
logger.info("ACCESS_TOKEN_URL: {url}".format(url=retval))
2022-10-04 18:16:48 +03:00
return retval
2022-10-03 19:46:35 +03:00
2022-10-04 17:21:54 +03:00
@property
def USER_QUERY(self) -> str:
2022-10-04 18:16:48 +03:00
retval = f"{self.BASE_URL}/oauth/me"
if VERBOSE_LOGGING:
2022-10-06 18:13:38 +03:00
logger.info("USER_QUERY: {url}".format(url=retval))
2022-10-04 18:16:48 +03:00
return retval
2022-10-04 17:21:54 +03:00
2022-10-06 03:35:44 +03:00
@property
def user_details(self) -> dict:
return self._user_details
@user_details.setter
def user_details(self, value: dict):
2022-10-06 04:51:52 +03:00
if self.is_valid_user_details(value):
if VERBOSE_LOGGING:
2022-10-06 18:13:38 +03:00
logger.info(
"user_details.setter: new value set {value}".format(
value=json.dumps(value, sort_keys=True, indent=4)
)
)
2022-10-06 04:51:52 +03:00
self._user_details = value
else:
2022-10-06 18:13:38 +03:00
logger.error(
"user_details.setter: tried to pass an invalid object {value}".format(
value=json.dumps(value, sort_keys=True, indent=4)
)
)
2022-10-06 04:36:27 +03:00
2022-10-04 17:21:54 +03:00
# see https://python-social-auth.readthedocs.io/en/latest/backends/implementation.html
2022-10-04 18:16:48 +03:00
# Return user details from the Wordpress user account
def get_user_details(self, response) -> dict:
2022-10-06 18:13:38 +03:00
if not (
self.is_valid_user_details(response) or self.is_wp_oauth_response(response)
):
logger.error(
"get_user_details() - received an unrecognized response object. Cannot continue: {response}".format(
response=json.dumps(response, sort_keys=True, indent=4)
)
)
# if we have cached results then we might be able to recover.
return self.user_details
2022-10-06 18:13:38 +03:00
if VERBOSE_LOGGING:
logger.info(
"get_user_details() begin with response: {response}".format(
response=json.dumps(response, sort_keys=True, indent=4)
)
)
# a def in the third_party_auth pipeline list calls get_user_details() after its already
# been called once. i don't know why. but, it passes the original get_user_details() dict
2022-10-06 18:13:38 +03:00
# enhanced with additional token-related keys. if we receive this modified dict then we
# should pass it along to the next defs in the pipeline.
#
# If most of the original keys (see dict definition below) exist in the response object
# then we can assume that this is our case.
2022-10-06 18:13:38 +03:00
if self.is_get_user_details_extended_dict(response):
# -------------------------------------------------------------
2022-10-06 18:13:38 +03:00
# expected use case #2: an enhanced derivation of an original
2022-10-06 15:57:58 +03:00
# user_details dict. This is created when get_user_details()
# is called from user_data().
# -------------------------------------------------------------
if VERBOSE_LOGGING:
2022-10-06 18:13:38 +03:00
logger.info(
"get_user_details() - detected an enhanced get_user_details() dict in the response: {response}".format(
response=json.dumps(response, sort_keys=True, indent=4)
)
)
return response
2022-10-06 18:13:38 +03:00
# at this point we've ruled out the possibility of the response object
# being a derivation of a user_details dict. So, it should therefore
2022-10-06 18:13:38 +03:00
# conform to the structure of a wp-oauth dict.
if not self.is_wp_oauth_response(response):
2022-10-06 18:13:38 +03:00
logger.warning(
"get_user_details() - response object is not a valid wp-oauth object. Cannot continue. {response}".format(
response=json.dumps(response, sort_keys=True, indent=4)
)
)
2022-10-06 03:35:44 +03:00
return self.user_details
# -------------------------------------------------------------
# expected use case #1: response object is a dict with all required keys.
# -------------------------------------------------------------
if VERBOSE_LOGGING:
2022-10-06 18:13:38 +03:00
logger.info(
"get_user_details() - start. response: {response}".format(
response=json.dumps(response, sort_keys=True, indent=4)
)
)
# try to parse out the first and last names
2022-10-06 18:13:38 +03:00
split_name = response.get("display_name", "").split()
first_name = split_name[0] if len(split_name) > 0 else ""
last_name = split_name[-1] if len(split_name) == 2 else ""
# check for superuser / staff status
2022-10-06 18:13:38 +03:00
user_roles = response.get("user_roles", [])
super_user = "administrator" in user_roles
is_staff = "administrator" in user_roles
2022-10-06 03:35:44 +03:00
self.user_details = {
2022-10-06 18:13:38 +03:00
"id": int(response.get("ID"), 0),
"username": response.get("user_login", ""),
"email": response.get("user_email", ""),
"first_name": first_name,
"last_name": last_name,
"fullname": response.get("display_name", ""),
"is_superuser": super_user,
"is_staff": is_staff,
"refresh_token": response.get("refresh_token", ""),
"scope": response.get("scope", ""),
"token_type": response.get("token_type", ""),
"date_joined": response.get("user_registered", ""),
"user_status": response.get("user_status", ""),
2022-10-03 19:46:35 +03:00
}
if VERBOSE_LOGGING:
2022-10-06 18:13:38 +03:00
logger.info(
"get_user_details() - finish. user_details: {user_details}".format(
user_details=json.dumps(self.user_details, sort_keys=True, indent=4)
)
)
2022-10-06 03:35:44 +03:00
return self.user_details
2022-10-03 19:46:35 +03:00
2022-10-06 18:13:38 +03:00
# Load user data from service url end point. Note that in the case of
2022-10-04 18:16:48 +03:00
# wp oauth, the response object returned by self.USER_QUERY
2022-10-04 17:21:54 +03:00
# is the same as the response object passed to get_user_details().
#
# see https://python-social-auth.readthedocs.io/en/latest/backends/implementation.html
def user_data(self, access_token, *args, **kwargs) -> dict:
2022-10-06 18:13:38 +03:00
response = None
user_details = None
url = f"{self.USER_QUERY}?" + urlencode({"access_token": access_token})
2022-10-03 19:46:35 +03:00
if VERBOSE_LOGGING:
2022-10-04 20:47:14 +03:00
logger.info("user_data() url: {url}".format(url=url))
2022-10-03 19:46:35 +03:00
try:
2022-10-04 17:33:05 +03:00
response = json.loads(self._urlopen(url))
2022-10-06 18:13:38 +03:00
if VERBOSE_LOGGING:
logger.info(
"user_data() response: {response}".format(
response=json.dumps(self.user_details, sort_keys=True, indent=4)
)
)
user_details = self.get_user_details(response)
if VERBOSE_LOGGING:
logger.info(
"user_data() local variable user_details: {user_details}".format(
user_details=json.dumps(user_details, sort_keys=True, indent=4)
)
)
if VERBOSE_LOGGING:
logger.info(
"user_data() class property value of self.user_details: {user_details}".format(
user_details=json.dumps(
self.user_details, sort_keys=True, indent=4
)
)
)
except ValueError as e:
2022-10-06 18:13:38 +03:00
logger.error("user_data() {err}".format(err=e))
2022-10-03 19:46:35 +03:00
return None
2022-10-06 18:13:38 +03:00
if not self.is_valid_user_details(user_details):
logger.error(
"user_data() user_details object is invalid: {user_details}".format(
user_details=json.dumps(user_details, sort_keys=True, indent=4)
)
)
2022-10-06 04:14:13 +03:00
return self.user_details
# add syncronization of any data fields that get missed by the built-in
# open edx third party authentication sync functionality.
try:
# this gets called just prior to account creation for
# new users, hence, we need to catch DoesNotExist
# exceptions.
2022-10-06 18:13:38 +03:00
user = User.objects.get(username=self.user_details["username"])
except User.DoesNotExist:
2022-10-06 04:14:13 +03:00
return self.user_details
2022-10-06 18:13:38 +03:00
if (user.is_superuser != self.user_details["is_superuser"]) or (
user.is_staff != self.user_details["is_staff"]
):
user.is_superuser = self.user_details["is_superuser"]
user.is_staff = self.user_details["is_staff"]
user.save()
2022-10-06 18:13:38 +03:00
logger.info(
"Updated the is_superuser/is_staff flags for user {username}".format(
username=user.username
)
)
if (user.first_name != self.user_details["first_name"]) or (
user.last_name != self.user_details["last_name"]
):
user.first_name = self.user_details["first_name"]
user.last_name = self.user_details["last_name"]
user.save()
2022-10-06 18:13:38 +03:00
logger.info(
"Updated first_name/last_name for user {username}".format(
username=user.username
)
)
2022-10-06 04:14:13 +03:00
return self.user_details