|
|
@ -2,6 +2,8 @@ import requests |
|
|
|
from django.conf import settings |
|
|
|
from social_core.backends.azuread_tenant import AzureADTenantOAuth2 |
|
|
|
from social_core.backends.github import GithubOAuth2 |
|
|
|
from social_core.backends.okta import OktaOAuth2 |
|
|
|
from social_core.backends.okta_openidconnect import OktaOpenIdConnect |
|
|
|
|
|
|
|
|
|
|
|
# noinspection PyUnusedLocal |
|
|
@ -68,3 +70,49 @@ def fetch_azuread_permissions(strategy, details, user=None, is_new=False, *args, |
|
|
|
if user.is_superuser != is_superuser: |
|
|
|
user.is_superuser = is_superuser |
|
|
|
user.save() |
|
|
|
|
|
|
|
|
|
|
|
# noinspection PyUnusedLocal |
|
|
|
def fetch_okta_oauth2_permissions(strategy, details, user=None, is_new=False, *args, **kwargs): |
|
|
|
org_url = getattr(settings, 'SOCIAL_AUTH_OKTA_OAUTH2_API_URL', '') |
|
|
|
group_name = getattr(settings, "OKTA_OAUTH2_ADMIN_GROUP_NAME", "") |
|
|
|
if not user or not isinstance(kwargs['backend'], OktaOAuth2): |
|
|
|
return |
|
|
|
|
|
|
|
response = requests.post( |
|
|
|
url=f"{org_url}/v1/userinfo", |
|
|
|
headers={ |
|
|
|
'Authorization': 'Bearer {}'.format(kwargs['response']['access_token']), |
|
|
|
}, |
|
|
|
) |
|
|
|
response.raise_for_status() |
|
|
|
response = response.json() |
|
|
|
|
|
|
|
is_superuser = group_name in response.get("groups", []) |
|
|
|
|
|
|
|
if user.is_superuser != is_superuser: |
|
|
|
user.is_superuser = is_superuser |
|
|
|
user.save() |
|
|
|
|
|
|
|
|
|
|
|
# noinspection PyUnusedLocal |
|
|
|
def fetch_okta_openidconnect_permissions(strategy, details, user=None, is_new=False, *args, **kwargs): |
|
|
|
org_url = getattr(settings, 'SOCIAL_AUTH_OKTA_OPENIDCONNECT_API_URL', '') |
|
|
|
group_name = getattr(settings, "OKTA_OPENIDCONNECT_ADMIN_GROUP_NAME", "") |
|
|
|
if not user or not isinstance(kwargs['backend'], OktaOpenIdConnect): |
|
|
|
return |
|
|
|
|
|
|
|
response = requests.post( |
|
|
|
url=f"{org_url}/v1/userinfo", |
|
|
|
headers={ |
|
|
|
'Authorization': 'Bearer {}'.format(kwargs['response']['access_token']), |
|
|
|
}, |
|
|
|
) |
|
|
|
response.raise_for_status() |
|
|
|
response = response.json() |
|
|
|
|
|
|
|
is_superuser = group_name in response.get("groups", []) |
|
|
|
|
|
|
|
if user.is_superuser != is_superuser: |
|
|
|
user.is_superuser = is_superuser |
|
|
|
user.save() |