mirror of
https://gitlab.crans.org/bde/nk20
synced 2025-11-12 17:49:27 +01:00
445 lines
19 KiB
Python
445 lines
19 KiB
Python
# Copyright (C) 2018-2025 by BDE ENS Paris-Saclay
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import base64
|
|
import hashlib
|
|
|
|
from django.contrib.auth.hashers import PBKDF2PasswordHasher
|
|
from django.contrib.auth.models import User
|
|
from django.utils.crypto import get_random_string
|
|
from django.test import TestCase
|
|
from member.models import Membership, Club
|
|
from note.models import NoteUser
|
|
from oauth2_provider.models import Application, AccessToken, Grant
|
|
|
|
from ..models import Role, Permission
|
|
|
|
|
|
class OAuth2FlowTestCase(TestCase):
|
|
fixtures = ('initial', )
|
|
|
|
def setUp(self):
|
|
self.user_password = "toto1234"
|
|
hasher = PBKDF2PasswordHasher()
|
|
|
|
self.user = User.objects.create(
|
|
username="toto",
|
|
password=hasher.encode(self.user_password, hasher.salt()),
|
|
)
|
|
|
|
NoteUser.objects.create(user=self.user)
|
|
membership = Membership.objects.create(user=self.user, club_id=1)
|
|
membership.roles.add(Role.objects.get(name="Adhérent⋅e BDE"))
|
|
membership.save()
|
|
|
|
bde = Club.objects.get(name="BDE")
|
|
view_user_perm = Permission.objects.get(pk=1) # View own user detail
|
|
|
|
self.base_scope = f'{view_user_perm.pk}_{bde.pk}'
|
|
|
|
def test_oauth2_authorization_code_flow(self):
|
|
"""
|
|
Ensure OAuth2 Authorization Code Flow work
|
|
"""
|
|
|
|
app = Application.objects.create(
|
|
name="Test Authorization Code",
|
|
client_type=Application.CLIENT_CONFIDENTIAL,
|
|
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
|
|
user=self.user,
|
|
hash_client_secret=False,
|
|
redirect_uris='http://127.0.0.1:8000/noexist/callback',
|
|
algorithm=Application.NO_ALGORITHM,
|
|
)
|
|
|
|
credential = base64.b64encode(f'{app.client_id}:{app.client_secret}'.encode('utf-8')).decode()
|
|
|
|
############################
|
|
# Minimal RFC6749 requests #
|
|
############################
|
|
|
|
resp = self.client.get('/o/authorize/',
|
|
data={"response_type": "code", # REQUIRED
|
|
"client_id": app.client_id}, # REQUIRED
|
|
**{"Content-Type": 'application/x-www-form-urlencoded'})
|
|
|
|
# Get user authorization
|
|
|
|
##################################################################################
|
|
|
|
url = resp.url
|
|
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
|
|
|
|
resp = self.client.post(url,
|
|
data={"username": self.user.username,
|
|
"password": self.user_password,
|
|
"permission_mask": 1,
|
|
"csrfmiddlewaretoken": csrf_token})
|
|
|
|
url = resp.url
|
|
resp = self.client.get(url)
|
|
|
|
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
|
|
|
|
resp = self.client.post(url,
|
|
follow=True,
|
|
data={"allow": "Authorize",
|
|
"scope": "0_0",
|
|
"csrfmiddlewaretoken": csrf_token,
|
|
"response_type": "code",
|
|
"client_id": app.client_id,
|
|
"redirect_uri": app.redirect_uris})
|
|
|
|
keys = resp.request['QUERY_STRING'].split("&")
|
|
for key in keys:
|
|
if len(key.split('code=')) == 2:
|
|
code = key.split('code=')[1]
|
|
|
|
##################################################################################
|
|
|
|
grant = Grant.objects.get(code=code)
|
|
self.assertEqual(grant.scope, '0_0')
|
|
|
|
# Now we can ask an Access Token
|
|
|
|
resp = self.client.post('/o/token/',
|
|
data={"grant_type": 'authorization_code', # REQUIRED
|
|
"code": code}, # REQUIRED
|
|
**{"Content-Type": 'application/x-www-form-urlencoded',
|
|
"HTTP_Authorization": f'Basic {credential}'})
|
|
|
|
# We should have refresh token
|
|
self.assertEqual('refresh_token' in resp.json(), True)
|
|
|
|
token = AccessToken.objects.get(token=resp.json()['access_token'])
|
|
|
|
# Token do nothing, it should be have the useless scope
|
|
self.assertEqual(token.scope, '0_0')
|
|
|
|
# Logout user
|
|
self.client.logout()
|
|
|
|
#############################################
|
|
# Maximal RFC6749 + RFC7636 (PKCE) requests #
|
|
#############################################
|
|
|
|
state = get_random_string(32)
|
|
|
|
# PKCE
|
|
code_verifier = get_random_string(100) # 43-128 characters [A-Z,a-z,0-9,"-",".","_","~"]
|
|
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
|
|
code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8').replace('=', '')
|
|
cc_method = "S256"
|
|
|
|
resp = self.client.get('/o/authorize/',
|
|
data={"response_type": "code", # REQUIRED
|
|
"code_challenge": code_challenge, # PKCE REQUIRED
|
|
"code_challenge_method": cc_method, # PKCE REQUIRED
|
|
"client_id": app.client_id, # REQUIRED
|
|
"redirect_uri": app.redirect_uris, # OPTIONAL
|
|
"scope": self.base_scope, # OPTIONAL
|
|
"state": state}, # RECOMMENDED
|
|
**{"Content-Type": 'application/x-www-form-urlencoded'})
|
|
|
|
# Get user authorization
|
|
##################################################################################
|
|
url = resp.url
|
|
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
|
|
|
|
resp = self.client.post(url,
|
|
data={"username": self.user.username,
|
|
"password": self.user_password,
|
|
"permission_mask": 1,
|
|
"csrfmiddlewaretoken": csrf_token})
|
|
|
|
url = resp.url
|
|
resp = self.client.get(url)
|
|
|
|
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
|
|
|
|
resp = self.client.post(url,
|
|
follow=True,
|
|
data={"allow": "Authorize",
|
|
"scope": self.base_scope,
|
|
"csrfmiddlewaretoken": csrf_token,
|
|
"response_type": "code",
|
|
"code_challenge": code_challenge,
|
|
"code_challenge_method": cc_method,
|
|
"client_id": app.client_id,
|
|
"state": state,
|
|
"redirect_uri": app.redirect_uris})
|
|
|
|
keys = resp.request['QUERY_STRING'].split("&")
|
|
for key in keys:
|
|
if len(key.split('code=')) == 2:
|
|
code = key.split('code=')[1]
|
|
if len(key.split('state=')) == 2:
|
|
resp_state = key.split('state=')[1]
|
|
|
|
##################################################################################
|
|
|
|
grant = Grant.objects.get(code=code)
|
|
self.assertEqual(grant.scope, self.base_scope)
|
|
self.assertEqual(state, resp_state)
|
|
|
|
# Now we can ask an Access Token
|
|
|
|
resp = self.client.post('/o/token/',
|
|
data={"grant_type": 'authorization_code', # REQUIRED
|
|
"code": code, # REQUIRED
|
|
"code_verifier": code_verifier, # PKCE REQUIRED
|
|
"redirect_uri": app.redirect_uris}, # REQUIRED
|
|
**{"Content-Type": 'application/x-www-form-urlencoded',
|
|
"HTTP_Authorization": f'Basic {credential}'})
|
|
|
|
# We should have refresh token
|
|
self.assertEqual('refresh_token' in resp.json(), True)
|
|
|
|
token = AccessToken.objects.get(token=resp.json()['access_token'])
|
|
|
|
# Token can have access, it shouldn't have the useless scope
|
|
self.assertEqual(token.scope, self.base_scope)
|
|
|
|
def test_oauth2_implicit_flow(self):
|
|
"""
|
|
Ensure OAuth2 Implicit Flow work
|
|
"""
|
|
app = Application.objects.create(
|
|
name="Test Implicit Flow",
|
|
client_type=Application.CLIENT_CONFIDENTIAL,
|
|
authorization_grant_type=Application.GRANT_IMPLICIT,
|
|
user=self.user,
|
|
hash_client_secret=False,
|
|
algorithm=Application.NO_ALGORITHM,
|
|
redirect_uris='http://127.0.0.1:8000/noexist/callback/',
|
|
)
|
|
|
|
############################
|
|
# Minimal RFC6749 requests #
|
|
############################
|
|
|
|
resp = self.client.get('/o/authorize/',
|
|
data={'response_type': 'token', # REQUIRED
|
|
'client_id': app.client_id}, # REQUIRED
|
|
**{"Content-Type": 'application/x-www-form-urlencoded'}
|
|
)
|
|
|
|
# Get user authorization
|
|
##################################################################################
|
|
url = resp.url
|
|
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
|
|
|
|
resp = self.client.post(url,
|
|
data={"username": self.user.username,
|
|
"password": self.user_password,
|
|
"permission_mask": 1,
|
|
"csrfmiddlewaretoken": csrf_token})
|
|
|
|
url = resp.url
|
|
resp = self.client.get(url)
|
|
|
|
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
|
|
|
|
resp = self.client.post(url,
|
|
follow=True,
|
|
data={"allow": "Authorize",
|
|
"scope": '0_0',
|
|
"csrfmiddlewaretoken": csrf_token,
|
|
"response_type": "token",
|
|
"client_id": app.client_id,
|
|
"redirect_uri": app.redirect_uris})
|
|
|
|
url = resp.redirect_chain[0][0]
|
|
keys = url.split('#')[1]
|
|
refresh_token = ''
|
|
|
|
for couple in keys.split('&'):
|
|
if couple.split('=')[0] == 'access_token':
|
|
token = couple.split('=')[1]
|
|
if couple.split('=')[0] == 'refresh_token':
|
|
refresh_token = couple.split('=')[1]
|
|
|
|
##################################################################################
|
|
|
|
self.assertEqual(refresh_token, '')
|
|
|
|
access_token = AccessToken.objects.get(token=token)
|
|
|
|
# Token do nothing, it should be have the useless scope
|
|
self.assertEqual(access_token.scope, '0_0')
|
|
|
|
# Logout user
|
|
self.client.logout()
|
|
|
|
############################
|
|
# Maximal RFC6749 requests #
|
|
############################
|
|
|
|
state = get_random_string(32)
|
|
|
|
resp = self.client.get('/o/authorize/',
|
|
data={'response_type': 'token', # REQUIRED
|
|
'client_id': app.client_id, # REQUIRED
|
|
'redirect_uri': app.redirect_uris, # OPTIONAL
|
|
'scope': self.base_scope, # OPTIONAL
|
|
'state': state}, # RECOMMENDED
|
|
**{"Content-Type": 'application/x-www-form-urlencoded'}
|
|
)
|
|
|
|
# Get user authorization
|
|
##################################################################################
|
|
url = resp.url
|
|
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
|
|
|
|
resp = self.client.post(url,
|
|
data={"username": self.user.username,
|
|
"password": self.user_password,
|
|
"permission_mask": 1,
|
|
"csrfmiddlewaretoken": csrf_token})
|
|
|
|
url = resp.url
|
|
resp = self.client.get(url)
|
|
|
|
csrf_token = resp.text.split('CSRF_TOKEN = "')[0].split('"')[0]
|
|
|
|
resp = self.client.post(url,
|
|
follow=True,
|
|
data={"allow": "Authorize",
|
|
"scope": self.base_scope,
|
|
"state": state,
|
|
"csrfmiddlewaretoken": csrf_token,
|
|
"response_type": "token",
|
|
"client_id": app.client_id,
|
|
"redirect_uri": app.redirect_uris})
|
|
|
|
url = resp.redirect_chain[0][0]
|
|
keys = url.split('#')[1]
|
|
refresh_token = ''
|
|
|
|
for couple in keys.split('&'):
|
|
if couple.split('=')[0] == 'access_token':
|
|
token = couple.split('=')[1]
|
|
if couple.split('=')[0] == 'refresh_token':
|
|
refresh_token = couple.split('=')[1]
|
|
if couple.split('=')[0] == 'state':
|
|
resp_state = couple.split('=')[1]
|
|
|
|
##################################################################################
|
|
|
|
self.assertEqual(refresh_token, '')
|
|
|
|
access_token = AccessToken.objects.get(token=token)
|
|
|
|
# Token can have access, it shouldn't have the useless scope
|
|
self.assertEqual(access_token.scope, self.base_scope)
|
|
|
|
self.assertEqual(state, resp_state)
|
|
|
|
def test_oauth2_resource_owner_password_credentials_flow(self):
|
|
"""
|
|
Ensure OAuth2 Resource Owner Password Credentials Flow work
|
|
"""
|
|
app = Application.objects.create(
|
|
name="Test ROPB",
|
|
client_type=Application.CLIENT_CONFIDENTIAL,
|
|
authorization_grant_type=Application.GRANT_PASSWORD,
|
|
user=self.user,
|
|
hash_client_secret=False,
|
|
algorithm=Application.NO_ALGORITHM,
|
|
)
|
|
|
|
credential = base64.b64encode(f'{app.client_id}:{app.client_secret}'.encode('utf-8')).decode()
|
|
|
|
# No token without real password
|
|
resp = self.client.post('/o/token/',
|
|
data={"grant_type": "password", # REQUIRED
|
|
"username": self.user, # REQUIRED
|
|
"password": "password"}, # REQUIRED
|
|
**{"Content-Type": 'application/x-www-form-urlencoded',
|
|
"Http_Authorization": f'Basic {credential}'}
|
|
)
|
|
|
|
self.assertEqual(resp.status_code, 400)
|
|
|
|
resp = self.client.post('/o/token/',
|
|
data={"grant_type": "password", # REQUIRED
|
|
"username": self.user, # REQUIRED
|
|
"password": self.user_password}, # REQUIRED
|
|
**{"Content-Type": 'application/x-www-form-urlencoded',
|
|
"HTTP_Authorization": f'Basic {credential}'}
|
|
)
|
|
|
|
self.assertEqual(resp.status_code, 200)
|
|
|
|
access_token = AccessToken.objects.get(token=resp.json()['access_token'])
|
|
self.assertEqual('refresh_token' in resp.json(), True)
|
|
|
|
self.assertEqual(access_token.scope, '0_0') # token do nothing
|
|
|
|
# RFC6749 4.3.2 allows use of scope in ROPB token access request
|
|
|
|
resp = self.client.post('/o/token/',
|
|
data={"grant_type": "password", # REQUIRED
|
|
"username": self.user, # REQUIRED
|
|
"password": self.user_password, # REQUIRED
|
|
"scope": self.base_scope}, # OPTIONAL
|
|
**{"Content-Type": 'application/x-www-form-urlencoded',
|
|
"HTTP_Authorization": f'Basic {credential}'}
|
|
)
|
|
|
|
token = AccessToken.objects.get(token=resp.json()['access_token'])
|
|
|
|
self.assertEqual(token.scope, self.base_scope) # token do nothing more than base_scope
|
|
|
|
def test_oauth2_client_credentials(self):
|
|
"""
|
|
Ensure OAuth2 Client Credentials work
|
|
"""
|
|
app = Application.objects.create(
|
|
name="Test client_credentials",
|
|
client_type=Application.CLIENT_CONFIDENTIAL,
|
|
authorization_grant_type=Application.GRANT_CLIENT_CREDENTIALS,
|
|
user=self.user,
|
|
hash_client_secret=False,
|
|
algorithm=Application.NO_ALGORITHM,
|
|
)
|
|
|
|
# No token without credential
|
|
resp = self.client.post('/o/token/',
|
|
data={"grant_type": "client_credentials"}, # REQUIRED
|
|
**{"Content-Type": 'application/x-www-form-urlencoded'}
|
|
)
|
|
|
|
self.assertEqual(resp.status_code, 401)
|
|
|
|
# Access with credential
|
|
credential = base64.b64encode(f'{app.client_id}:{app.client_secret}'.encode('utf-8')).decode()
|
|
|
|
resp = self.client.post('/o/token/',
|
|
data={"grant_type": "client_credentials"}, # REQUIRED
|
|
**{'HTTP_Authorization': f'Basic {credential}',
|
|
"Content-Type": 'application/x-www-form-urlencoded'}
|
|
)
|
|
|
|
self.assertEqual(resp.status_code, 200)
|
|
|
|
token = AccessToken.objects.get(token=resp.json()['access_token'])
|
|
|
|
# Token do nothing, it should be have the useless scope
|
|
self.assertEqual(token.scope, '0_0')
|
|
|
|
# RFC6749 4.4.2 allows use of scope in client credential flow
|
|
resp = self.client.post('/o/token/',
|
|
data={"grant_type": "client_credentials", # REQUIRED
|
|
"scope": self.base_scope}, # OPTIONAL
|
|
**{'http_Authorization': f'Basic {credential}',
|
|
"Content-Type": 'application/x-www-form-urlencoded'}
|
|
)
|
|
|
|
self.assertEqual(resp.status_code, 200)
|
|
|
|
token = AccessToken.objects.get(token=resp.json()['access_token'])
|
|
|
|
# Token can have access, it shouldn't have the useless scope
|
|
self.assertEqual(token.scope, self.base_scope)
|