edx-oauth2-wordpress-backend/wp_oauth_backend/wp_oauth.py

164 lines
5.8 KiB
Python
Raw Normal View History

"""
written by: Lawrence McDaniel
https://lawrencemcdaniel.com
date: oct-2022
usage: Abstract class implementation of BaseOAuth2 to handle the field
mapping and data converstions between the dict that WP Oauth
returns versus the dict that Open edX actually needs.
"""
from abc import abstractmethod
2022-10-03 19:46:35 +03:00
import json
from urllib.parse import urlencode
from urllib.request import urlopen
from social_core.backends.oauth import BaseOAuth2
from logging import getLogger
logger = getLogger(__name__)
VERBOSE_LOGGING = True
class WPOpenedxOAuth2AbstractClass(BaseOAuth2):
"""
WP OAuth authentication backend customized for Open edX
"""
# https://python-social-auth.readthedocs.io/en/latest/configuration/settings.html
SOCIAL_AUTH_SANITIZE_REDIRECTS = True # requires redirect domain to match the original initiating domain.
2022-10-03 19:46:35 +03:00
ACCESS_TOKEN_METHOD = 'POST'
@abstractmethod
def name(self):
raise NotImplementedError("Subclasses should implement this property.")
@abstractmethod
def BASE_URL(self):
raise NotImplementedError("Subclasses should implement this property.")
@abstractmethod
def SCOPE_SEPARATOR(self):
raise NotImplementedError("Subclasses should implement this property.")
2022-10-03 19:46:35 +03:00
@property
def base_url(self) -> str:
return self.BASE_URL
2022-10-03 19:46:35 +03:00
@property
def AUTHORIZATION_URL(self) -> str:
return f"{self.base_url}/oauth/authorize"
@property
def ACCESS_TOKEN_URL(self) -> str:
return f"{self.base_url}/oauth/token"
@property
def EXTRA_DATA(self) -> list:
return [
('id', 'id'),
('username', 'username'),
('email', 'email'),
('first_name', 'first_name'),
('last_name', 'last_name'),
('fullname', 'fullname'),
('is_superuser', 'is_superuser'),
('is_staff', 'is_staff'),
('date_joined', 'date_joined'),
]
2022-10-03 19:46:35 +03:00
@property
def USER_QUERY(self) -> str:
return f"{self.base_url}/oauth/me"
def get_user_details(self, response) -> dict:
2022-10-03 19:46:35 +03:00
"""Return user details from the WP account"""
if type(response)==dict:
if ('ID' not in response.keys()) or ('user_email' not in response.keys()):
logger.info('get_user_details() - response object lacks required keys. exiting.')
return {}
if VERBOSE_LOGGING:
if not response:
logger.info('get_user_details() - response is missing. exiting.')
return {}
logger.info('get_user_details() - start. response: {response}'.format(
response=json.dumps(response, sort_keys=True, indent=4)
))
# try to parse out the first and last names
split_name = response.get('display_name', '').split()
first_name = split_name[0] if len(split_name) > 0 else ''
last_name = split_name[-1] if len(split_name) == 2 else ''
# check for superuser / staff status
user_roles = response.get('user_roles', [])
super_user = 'administrator' in user_roles
is_staff = 'administrator' in user_roles
2022-10-03 19:46:35 +03:00
user_details = {
'id': int(response.get('ID'), 0),
2022-10-04 14:47:52 +03:00
'username': response.get('user_email', ''),
'wp_username': response.get('user_login', ''),
'email': response.get('user_email', ''),
'first_name': first_name,
'last_name': last_name,
2022-10-04 14:47:52 +03:00
'fullname': response.get('display_name', ''),
'is_superuser': super_user,
'is_staff': is_staff,
2022-10-04 14:47:52 +03:00
'refresh_token': response.get('refresh_token', ''),
'scope': response.get('scope', ''),
2022-10-04 14:47:52 +03:00
'token_type': response.get('token_type', ''),
'date_joined': response.get('user_registered', ''),
'user_status': response.get('user_status', ''),
2022-10-03 19:46:35 +03:00
}
if VERBOSE_LOGGING:
logger.info('get_user_details() - complete. user_details: {user_details}'.format(
user_details=json.dumps(user_details, sort_keys=True, indent=4)
))
2022-10-03 19:46:35 +03:00
return user_details
def user_data(self, access_token, *args, **kwargs) -> dict:
2022-10-03 19:46:35 +03:00
"""Loads user data from service"""
2022-10-03 19:46:35 +03:00
url = f'{self.USER_QUERY}?' + urlencode({
'access_token': access_token
})
if VERBOSE_LOGGING:
logger.info("user_data() url: {url}".format(url=url))
2022-10-03 19:46:35 +03:00
try:
response = json.loads(self.urlopen(url))
user_details = self.get_user_details(response)
return user_details
except ValueError as e:
logger.error('user_data() did not work: {err}'.format(err=e))
2022-10-03 19:46:35 +03:00
return None
# utility function. not part of psa.
2022-10-03 19:46:35 +03:00
def urlopen(self, url):
return urlopen(url).read().decode("utf-8")
class StepwiseMathOAuth2 (WPOpenedxOAuth2AbstractClass):
# This is the string value that will appear in the LMS Django Admin
# Third Party Authentication / Provider Configuration (OAuth)
# setup page drop-down box titled, "Backend name:", just above
# the "Client ID:" and "Client Secret:" fields.
def name(self):
return 'wp-oauth'
# note: no slash at the end of the base url. Python Social Auth
# might clean this up for you, but i'm not 100% certain of that.
def BASE_URL(self):
return "https://stepwisemath.ai"
# the value of the scope separator is user-defined. Check the
# scopes field value for your oauth client in your wordpress host.
# the wp-oauth default value for scopes is 'basic' but can be
# changed to a list. example 'basic, email, profile'. This
# list can be delimited with commas, spaces, whatever.
def SCOPE_SEPARATOR(self):
return ","