mirror of https://github.com/doccano/doccano.git
pythonannotation-tooldatasetsactive-learningtext-annotationdatasetnatural-language-processingdata-labelingmachine-learning
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
143 lines
4.7 KiB
143 lines
4.7 KiB
import requests
|
|
from django.conf import settings
|
|
from social_core.backends.azuread_tenant import AzureADTenantOAuth2
|
|
from social_core.backends.github import GithubOAuth2
|
|
from social_core.backends.okta import OktaOAuth2
|
|
from social_core.backends.okta_openidconnect import OktaOpenIdConnect
|
|
|
|
|
|
# noinspection PyUnusedLocal
|
|
def fetch_github_permissions(strategy, details, user=None, is_new=False, *args, **kwargs):
|
|
org_name = getattr(settings, 'GITHUB_ADMIN_ORG_NAME', '')
|
|
team_name = getattr(settings, 'GITHUB_ADMIN_TEAM_NAME', '')
|
|
if not user or not isinstance(kwargs['backend'], GithubOAuth2) or not org_name or not team_name:
|
|
return
|
|
|
|
response = requests.post(
|
|
url='https://api.github.com/graphql',
|
|
headers={
|
|
'Authorization': 'Bearer {}'.format(kwargs['response']['access_token']),
|
|
},
|
|
json={
|
|
'query': '''
|
|
query($userName: String!, $orgName: String!, $teamName: String!) {
|
|
organization(login: $orgName) {
|
|
teams(query: $teamName, userLogins: [$userName], first: 1) {
|
|
nodes {
|
|
name
|
|
}
|
|
}
|
|
}
|
|
}
|
|
''',
|
|
'variables': {
|
|
'userName': details['username'],
|
|
'orgName': org_name,
|
|
'teamName': team_name,
|
|
}
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
response = response.json()
|
|
|
|
is_superuser = {'name': team_name} in response['data']['organization']['teams']['nodes']
|
|
|
|
if user.is_superuser != is_superuser:
|
|
user.is_superuser = is_superuser
|
|
user.save()
|
|
|
|
|
|
# noinspection PyUnusedLocal
|
|
def fetch_azuread_permissions(strategy, details, user=None, is_new=False, *args, **kwargs):
|
|
group_id = getattr(settings, 'AZUREAD_ADMIN_GROUP_ID', '')
|
|
if not user or not isinstance(kwargs['backend'], AzureADTenantOAuth2) or not group_id:
|
|
return
|
|
|
|
response = requests.post(
|
|
url='https://graph.microsoft.com/v1.0/me/checkMemberGroups',
|
|
headers={
|
|
'Authorization': 'Bearer {}'.format(kwargs['response']['access_token']),
|
|
},
|
|
json={
|
|
'groupIds': [group_id]
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
response = response.json()
|
|
|
|
is_superuser = group_id in response['value']
|
|
|
|
if user.is_superuser != is_superuser:
|
|
user.is_superuser = is_superuser
|
|
user.save()
|
|
|
|
|
|
# noinspection PyUnusedLocal
|
|
def fetch_okta_oauth2_permissions(strategy, details, user=None, is_new=False, *args, **kwargs):
|
|
org_url = getattr(settings, 'SOCIAL_AUTH_OKTA_OAUTH2_API_URL', '')
|
|
admin_group_name = getattr(settings, "OKTA_OAUTH2_ADMIN_GROUP_NAME", "")
|
|
if not user or not isinstance(kwargs['backend'], OktaOAuth2):
|
|
return
|
|
|
|
# OktaOpenIdConnect inherits `OktaOAuth2`, so we have to explicitly skip OAuth2 trying
|
|
# to fetch permissions when using OIDC backend.
|
|
if isinstance(kwargs['backend'], OktaOpenIdConnect):
|
|
return
|
|
|
|
response = requests.post(
|
|
url=f"{org_url}/v1/userinfo",
|
|
headers={
|
|
'Authorization': 'Bearer {}'.format(kwargs['response']['access_token']),
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
response = response.json()
|
|
|
|
is_superuser = admin_group_name in response.get("groups", [])
|
|
is_staff = admin_group_name in response.get("groups", [])
|
|
|
|
user_changed = False
|
|
|
|
if user.is_superuser != is_superuser:
|
|
user.is_superuser = is_superuser
|
|
user_changed = user_changed or True
|
|
|
|
if user.is_staff != is_staff:
|
|
user.is_staff = is_staff
|
|
user_changed = user_changed or True
|
|
|
|
if user_changed:
|
|
user.save()
|
|
|
|
|
|
# noinspection PyUnusedLocal
|
|
def fetch_okta_openidconnect_permissions(strategy, details, user=None, is_new=False, *args, **kwargs):
|
|
org_url = getattr(settings, 'SOCIAL_AUTH_OKTA_OPENIDCONNECT_API_URL', '')
|
|
admin_group_name = getattr(settings, "OKTA_OPENIDCONNECT_ADMIN_GROUP_NAME", "")
|
|
if not user or not isinstance(kwargs['backend'], OktaOpenIdConnect):
|
|
return
|
|
|
|
response = requests.post(
|
|
url=f"{org_url}/v1/userinfo",
|
|
headers={
|
|
'Authorization': 'Bearer {}'.format(kwargs['response']['access_token']),
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
response = response.json()
|
|
|
|
is_superuser = admin_group_name in response.get("groups", [])
|
|
is_staff = admin_group_name in response.get("groups", [])
|
|
|
|
user_changed = False
|
|
|
|
if user.is_superuser != is_superuser:
|
|
user.is_superuser = is_superuser
|
|
user_changed = user_changed or True
|
|
|
|
if user.is_staff != is_staff:
|
|
user.is_staff = is_staff
|
|
user_changed = user_changed or True
|
|
|
|
if user_changed:
|
|
user.save()
|