mirror of
https://gitlab.crans.org/mediatek/med.git
synced 2025-02-06 08:13:03 +00:00
170 lines
4.7 KiB
Python
170 lines
4.7 KiB
Python
# -*- mode: python; coding: utf-8 -*-
|
|
# Copyright (C) 2017-2019 by BDE ENS Paris-Saclay
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
from datetime import datetime
|
|
|
|
import requests
|
|
from authlib.integrations.django_client import OAuth
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import AbstractUser
|
|
from django.db import models
|
|
from django.db.models import Q
|
|
from django.utils import timezone
|
|
from django.utils.translation import gettext_lazy as _
|
|
from med.settings import MAX_EMPRUNT
|
|
|
|
|
|
class User(AbstractUser):
|
|
telephone = models.CharField(
|
|
verbose_name=_('phone number'),
|
|
max_length=15,
|
|
blank=True,
|
|
)
|
|
address = models.CharField(
|
|
verbose_name=_('address'),
|
|
max_length=255,
|
|
blank=True,
|
|
)
|
|
maxemprunt = models.IntegerField(
|
|
verbose_name=_('maximum borrowed'),
|
|
help_text=_('Maximal amount of simultaneous borrowed item '
|
|
'authorized.'),
|
|
default=MAX_EMPRUNT,
|
|
)
|
|
comment = models.CharField(
|
|
verbose_name=_('comment'),
|
|
help_text=_('Promotion...'),
|
|
max_length=255,
|
|
blank=True,
|
|
)
|
|
date_joined = models.DateTimeField(
|
|
_('date joined'),
|
|
default=timezone.now,
|
|
null=True,
|
|
)
|
|
|
|
REQUIRED_FIELDS = ['first_name', 'last_name', 'email']
|
|
|
|
@property
|
|
def is_member(self):
|
|
# FIXME Use NK20
|
|
return True
|
|
|
|
def update_data(self, data: dict):
|
|
"""
|
|
Update user data from given dictionary.
|
|
Useful when we want to update user data from Note Kfet.
|
|
|
|
Parameters
|
|
----------
|
|
data : dict
|
|
Dictionary with user data to update.
|
|
"""
|
|
self.email = data['email']
|
|
self.first_name = data['first_name']
|
|
self.last_name = data['last_name']
|
|
self.telephone = data['profile']['phone_number']
|
|
self.address = data['profile']['address']
|
|
self.comment = data['profile']['section']
|
|
|
|
|
|
class AccessToken(models.Model):
|
|
owner = models.ForeignKey(
|
|
settings.AUTH_USER_MODEL,
|
|
on_delete=models.CASCADE,
|
|
null=True,
|
|
default=None,
|
|
verbose_name=_('owner'),
|
|
)
|
|
|
|
access_token = models.CharField(
|
|
max_length=32,
|
|
verbose_name=_('access token'),
|
|
)
|
|
|
|
expires_in = models.PositiveSmallIntegerField(
|
|
verbose_name=_('expires in'),
|
|
)
|
|
|
|
scopes = models.CharField(
|
|
max_length=255,
|
|
verbose_name=_('scopes'),
|
|
)
|
|
|
|
refresh_token = models.CharField(
|
|
max_length=32,
|
|
verbose_name=_('refresh token'),
|
|
)
|
|
|
|
expires_at = models.DateTimeField(
|
|
verbose_name=_('expires at'),
|
|
)
|
|
|
|
def refresh(self):
|
|
"""
|
|
Refresh the access token.
|
|
"""
|
|
oauth = OAuth()
|
|
oauth.register('notekfet')
|
|
# Get the OAuth client
|
|
oauth_client = oauth.notekfet._get_oauth_client()
|
|
# Actually refresh the token
|
|
token = oauth_client.refresh_token(oauth.notekfet.access_token_url,
|
|
refresh_token=self.refresh_token)
|
|
self.access_token = token['access_token']
|
|
self.expires_in = token['expires_in']
|
|
self.scopes = token['scope']
|
|
self.refresh_token = token['refresh_token']
|
|
self.expires_at = timezone.utc.fromutc(
|
|
datetime.fromtimestamp(token['expires_at'])
|
|
)
|
|
|
|
self.save()
|
|
|
|
def refresh_if_expired(self):
|
|
"""
|
|
Refresh the current token if it is invalid.
|
|
"""
|
|
if self.expires_at < timezone.now():
|
|
self.refresh()
|
|
|
|
def auth_header(self):
|
|
"""
|
|
Return HTTP header that contains the bearer access token.
|
|
Refresh the token if needed.
|
|
"""
|
|
self.refresh_if_expired()
|
|
return {'Authorization': f'Bearer {self.access_token}'}
|
|
|
|
def fetch_user(self, create_if_not_exist: bool = False):
|
|
"""
|
|
Extract information about the Note Kfet API by using the current
|
|
access token.
|
|
"""
|
|
if self.owner:
|
|
return self.owner
|
|
|
|
data = requests.get('https://note-dev.crans.org/api/me/',
|
|
headers=self.auth_header()).json()
|
|
username = data['username']
|
|
email = data['email']
|
|
qs = User.objects.filter(Q(username=username) | Q(email=email))
|
|
if not qs.exists():
|
|
if create_if_not_exist:
|
|
user = User.objects.create(username=username, email=email)
|
|
else:
|
|
return None
|
|
else:
|
|
user = qs.get()
|
|
|
|
# Update user data from Note Kfet
|
|
user.update_data(data)
|
|
user.save()
|
|
|
|
return user
|
|
|
|
class Meta:
|
|
verbose_name = _('access token')
|
|
verbose_name_plural = _('access tokens')
|