You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

118 lines
4.0 KiB

  1. import requests
  2. from django.conf import settings
  3. from social_core.backends.azuread_tenant import AzureADTenantOAuth2
  4. from social_core.backends.github import GithubOAuth2
  5. from social_core.backends.okta import OktaOAuth2
  6. from social_core.backends.okta_openidconnect import OktaOpenIdConnect
  7. # noinspection PyUnusedLocal
  8. def fetch_github_permissions(strategy, details, user=None, is_new=False, *args, **kwargs):
  9. org_name = getattr(settings, 'GITHUB_ADMIN_ORG_NAME', '')
  10. team_name = getattr(settings, 'GITHUB_ADMIN_TEAM_NAME', '')
  11. if not user or not isinstance(kwargs['backend'], GithubOAuth2) or not org_name or not team_name:
  12. return
  13. response = requests.post(
  14. url='https://api.github.com/graphql',
  15. headers={
  16. 'Authorization': 'Bearer {}'.format(kwargs['response']['access_token']),
  17. },
  18. json={
  19. 'query': '''
  20. query($userName: String!, $orgName: String!, $teamName: String!) {
  21. organization(login: $orgName) {
  22. teams(query: $teamName, userLogins: [$userName], first: 1) {
  23. nodes {
  24. name
  25. }
  26. }
  27. }
  28. }
  29. ''',
  30. 'variables': {
  31. 'userName': details['username'],
  32. 'orgName': org_name,
  33. 'teamName': team_name,
  34. }
  35. }
  36. )
  37. response.raise_for_status()
  38. response = response.json()
  39. is_superuser = {'name': team_name} in response['data']['organization']['teams']['nodes']
  40. if user.is_superuser != is_superuser:
  41. user.is_superuser = is_superuser
  42. user.save()
  43. # noinspection PyUnusedLocal
  44. def fetch_azuread_permissions(strategy, details, user=None, is_new=False, *args, **kwargs):
  45. group_id = getattr(settings, 'AZUREAD_ADMIN_GROUP_ID', '')
  46. if not user or not isinstance(kwargs['backend'], AzureADTenantOAuth2) or not group_id:
  47. return
  48. response = requests.post(
  49. url='https://graph.microsoft.com/v1.0/me/checkMemberGroups',
  50. headers={
  51. 'Authorization': 'Bearer {}'.format(kwargs['response']['access_token']),
  52. },
  53. json={
  54. 'groupIds': [group_id]
  55. }
  56. )
  57. response.raise_for_status()
  58. response = response.json()
  59. is_superuser = group_id in response['value']
  60. if user.is_superuser != is_superuser:
  61. user.is_superuser = is_superuser
  62. user.save()
  63. # noinspection PyUnusedLocal
  64. def fetch_okta_oauth2_permissions(strategy, details, user=None, is_new=False, *args, **kwargs):
  65. org_url = getattr(settings, 'SOCIAL_AUTH_OKTA_OAUTH2_API_URL', '')
  66. group_name = getattr(settings, "OKTA_OAUTH2_ADMIN_GROUP_NAME", "")
  67. if not user or not isinstance(kwargs['backend'], OktaOAuth2):
  68. return
  69. response = requests.post(
  70. url=f"{org_url}/v1/userinfo",
  71. headers={
  72. 'Authorization': 'Bearer {}'.format(kwargs['response']['access_token']),
  73. },
  74. )
  75. response.raise_for_status()
  76. response = response.json()
  77. is_superuser = group_name in response.get("groups", [])
  78. if user.is_superuser != is_superuser:
  79. user.is_superuser = is_superuser
  80. user.save()
  81. # noinspection PyUnusedLocal
  82. def fetch_okta_openidconnect_permissions(strategy, details, user=None, is_new=False, *args, **kwargs):
  83. org_url = getattr(settings, 'SOCIAL_AUTH_OKTA_OPENIDCONNECT_API_URL', '')
  84. group_name = getattr(settings, "OKTA_OPENIDCONNECT_ADMIN_GROUP_NAME", "")
  85. if not user or not isinstance(kwargs['backend'], OktaOpenIdConnect):
  86. return
  87. response = requests.post(
  88. url=f"{org_url}/v1/userinfo",
  89. headers={
  90. 'Authorization': 'Bearer {}'.format(kwargs['response']['access_token']),
  91. },
  92. )
  93. response.raise_for_status()
  94. response = response.json()
  95. is_superuser = group_name in response.get("groups", [])
  96. if user.is_superuser != is_superuser:
  97. user.is_superuser = is_superuser
  98. user.save()