1
0
mirror of https://gitlab.crans.org/bde/nk20 synced 2025-11-08 15:59:50 +01:00

Client Credential Flow implementation

This commit is contained in:
quark
2025-11-07 15:49:01 +01:00
parent 68341a2a7e
commit bfd50e3cd5
4 changed files with 56 additions and 31 deletions

View File

@@ -21,7 +21,7 @@ class PermissionBackend(ModelBackend):
Manage permissions of users Manage permissions of users
""" """
supports_object_permissions = True supports_object_permissions = True
supports_anonymous_user = False supports_anonymous_user = True
supports_inactive_user = False supports_inactive_user = False
@staticmethod @staticmethod
@@ -33,24 +33,18 @@ class PermissionBackend(ModelBackend):
:param t: The type of the permissions: view, change, add or delete :param t: The type of the permissions: view, change, add or delete
:return: The queryset of the permissions of the user (memoized) grouped by clubs :return: The queryset of the permissions of the user (memoized) grouped by clubs
""" """
if hasattr(request, 'auth') and request.auth is not None and hasattr(request.auth, 'scope'): if hasattr(request, 'oauth2') and request.oauth2 is not None and 'scope' in request.oauth2:
# OAuth2 Authentication # OAuth2 Authentication
user = request.auth.user user = request.oauth2['user']
def permission_filter(membership_obj): def permission_filter(membership_obj):
query = Q(pk=-1) query = Q(pk=-1)
if 'mask' in request.GET: for scope in request.oauth2['scope']:
try:
rank = int(request.GET['mask'])
except ValueError:
rank = 42
query &= Q(mask__rank__lte=rank)
for scope in request.auth.scope.split(' '):
if scope == "openid": if scope == "openid":
continue continue
permission_id, club_id = scope.split('_') permission_id, club_id = scope.split('_')
if int(club_id) == membership_obj.club_id: if int(club_id) == membership_obj.club_id:
query |= Q(pk=permission_id) query |= Q(pk=permission_id, mask__rank__lte=request.oauth2['mask'])
return query return query
else: else:
user = request.user user = request.user

View File

@@ -25,7 +25,9 @@ class PermissionScopes(BaseScopes):
if 'scopes' in kwargs: if 'scopes' in kwargs:
for scope in kwargs['scopes']: for scope in kwargs['scopes']:
if scope == 'openid': if scope == 'openid':
scopes['openid'] = "OpenID Connect" scopes['openid'] = _("OpenID Connect (username and email)")
elif scope == '0_0':
scopes['0_0'] = _("Useless scope which do nothing")
else: else:
p = Permission.objects.get(id=scope.split('_')[0]) p = Permission.objects.get(id=scope.split('_')[0])
club = Club.objects.get(id=scope.split('_')[1]) club = Club.objects.get(id=scope.split('_')[1])
@@ -35,6 +37,7 @@ class PermissionScopes(BaseScopes):
scopes = {f"{p.id}_{club.id}": f"{p.description} (club {club.name})" scopes = {f"{p.id}_{club.id}": f"{p.description} (club {club.name})"
for p in Permission.objects.all() for club in Club.objects.all()} for p in Permission.objects.all() for club in Club.objects.all()}
scopes['openid'] = _("OpenID Connect (username and email)") scopes['openid'] = _("OpenID Connect (username and email)")
scopes['0_0'] = _("Useless scope which do nothing")
return scopes return scopes
def get_available_scopes(self, application=None, request=None, *args, **kwargs): def get_available_scopes(self, application=None, request=None, *args, **kwargs):
@@ -43,7 +46,7 @@ class PermissionScopes(BaseScopes):
scopes = [f"{p.id}_{p.membership.club.id}" scopes = [f"{p.id}_{p.membership.club.id}"
for t in Permission.PERMISSION_TYPES for t in Permission.PERMISSION_TYPES
for p in PermissionBackend.get_raw_permissions(get_current_request(), t[0])] for p in PermissionBackend.get_raw_permissions(get_current_request(), t[0])]
scopes.append('openid') scopes.append('0_0') # always available
return scopes return scopes
def get_default_scopes(self, application=None, request=None, *args, **kwargs): def get_default_scopes(self, application=None, request=None, *args, **kwargs):
@@ -51,7 +54,7 @@ class PermissionScopes(BaseScopes):
return [] return []
scopes = [f"{p.id}_{p.membership.club.id}" scopes = [f"{p.id}_{p.membership.club.id}"
for p in PermissionBackend.get_raw_permissions(get_current_request(), 'view')] for p in PermissionBackend.get_raw_permissions(get_current_request(), 'view')]
scopes.append('openid') scopes.append('0_0')
return scopes return scopes
@@ -73,6 +76,37 @@ class PermissionOAuth2Validator(OAuth2Validator):
claims = super().get_discovery_claims(self) claims = super().get_discovery_claims(self)
return claims + ["name", "normalized_name", "email"] return claims + ["name", "normalized_name", "email"]
def validate_client_credentials_scopes(self, client_id, scopes, client, request, *args, **kwargs):
"""
For client credentials valid scopes are scope of the app owner
"""
valid_scopes = set()
request.oauth2 = {}
request.oauth2['user'] = client.user
request.oauth2['user'].is_anomymous = False
request.oauth2['scope'] = scopes
# mask implementation
if hasattr(request.decoded_body, 'mask'):
try:
request.oauth2['mask'] = int(request.decoded_body['mask'])
except ValueError:
request.oauth2['mask'] = 42
else:
request.oauth2['mask'] = 42
for t in Permission.PERMISSION_TYPES:
for p in PermissionBackend.get_raw_permissions(request, t[0]):
scope = f"{p.id}_{p.membership.club.id}"
if scope in scopes:
valid_scopes.add(scope)
# Always give one scope to generate token
if not valid_scopes:
valid_scopes.add('0_0')
request.scopes = valid_scopes
return valid_scopes
def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs): def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs):
""" """
User can request as many scope as he wants, including invalid scopes, User can request as many scope as he wants, including invalid scopes,
@@ -84,6 +118,9 @@ class PermissionOAuth2Validator(OAuth2Validator):
valid_scopes = set() valid_scopes = set()
if hasattr(request, 'grant_type') and request.grant_type == 'client_credentials':
return self.validate_client_credentials_scopes(client_id, scopes, client, request, args, kwargs)
# simple patch for have functionnal ROPB flow # simple patch for have functionnal ROPB flow
# TODO rewrite # TODO rewrite
r = get_current_request() r = get_current_request()
@@ -94,8 +131,8 @@ class PermissionOAuth2Validator(OAuth2Validator):
if scope in scopes: if scope in scopes:
valid_scopes.add(scope) valid_scopes.add(scope)
if 'openid' in scopes: if '0_0' in scopes:
valid_scopes.add('openid') valid_scopes.add('0_0')
request.scopes = valid_scopes request.scopes = valid_scopes
return valid_scopes return valid_scopes

View File

@@ -126,6 +126,7 @@ class OAuth2TestCase(TestCase):
**{'Authorization': f'Bearer {token.token}'}) **{'Authorization': f'Bearer {token.token}'})
# Token is not granted to see other api # Token is not granted to see other api
resp = self.client.get(f'/api/user/{self.user.pk}/', resp = self.client.get(f'/api/members/profile/{self.user.profile.pk}/',
**{'Authorization': f'Bearer {token.token}'}) **{'Authorization': f'Bearer {token.token}'})
self.assertEqual(resp.status_code, 404) self.assertEqual(resp.status_code, 404)

View File

@@ -7,7 +7,7 @@ from django.contrib.auth.models import User
from django.test import TestCase from django.test import TestCase
from member.models import Membership, Club from member.models import Membership, Club
from note.models import NoteUser from note.models import NoteUser
from oauth2_provider.models import Application from oauth2_provider.models import Application, AccessToken
from ..models import Role, Permission from ..models import Role, Permission
@@ -81,14 +81,10 @@ class OAuth2TestCase(TestCase):
self.assertEqual(resp.status_code, 200) self.assertEqual(resp.status_code, 200)
token = resp.json()['access_token'] token = AccessToken.objects.get(token=resp.json()['access_token'])
# Token is valid but has no right # Token do nothing, it should be have the useless scope
resp = self.client.get('/api/user/{self.user.pk}', self.assertEqual(token.scope, '0_0')
**{'Authorization': f'Bearer {token}'}
)
self.assertEqual(resp.status_code, 403)
# RFC6749 4.4.2 allows use of scope in client credential flow # RFC6749 4.4.2 allows use of scope in client credential flow
resp = self.client.post('/o/token/', resp = self.client.post('/o/token/',
@@ -100,13 +96,10 @@ class OAuth2TestCase(TestCase):
self.assertEqual(resp.status_code, 200) self.assertEqual(resp.status_code, 200)
token = resp.json()['access_token'] token = AccessToken.objects.get(token=resp.json()['access_token'])
# Now app can see his creator # Token can have access, it shouldn't have the useless scope
resp = self.client.post(f'/api/user/{self.user.pk}/', self.assertEqual(token.scope, self.base_scope)
**{'Authorization': f'Bearer {token}'})
self.assertEqual(resp.status_code, 200)
def test_oidc_flow(self): def test_oidc_flow(self):
""" """