Add some tests using tox
This commit is contained in:
@ -27,26 +27,14 @@ class UserCredential(forms.Form):
|
||||
method = forms.CharField(widget=forms.HiddenInput(), required=False)
|
||||
warn = forms.BooleanField(label=_('warn'), required=False)
|
||||
|
||||
def __init__(self, request, *args, **kwargs):
|
||||
self.request = request
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(UserCredential, self).__init__(*args, **kwargs)
|
||||
|
||||
def clean(self):
|
||||
cleaned_data = super(UserCredential, self).clean()
|
||||
auth = utils.import_attr(settings.CAS_AUTH_CLASS)(cleaned_data.get("username"))
|
||||
if auth.test_password(cleaned_data.get("password")):
|
||||
try:
|
||||
user = models.User.objects.get(
|
||||
username=auth.username,
|
||||
session_key=self.request.session.session_key
|
||||
)
|
||||
user.save()
|
||||
except models.User.DoesNotExist:
|
||||
user = models.User.objects.create(
|
||||
username=auth.username,
|
||||
session_key=self.request.session.session_key
|
||||
)
|
||||
user.save()
|
||||
cleaned_data["username"] = auth.username
|
||||
else:
|
||||
raise forms.ValidationError(_(u"Bad user"))
|
||||
|
||||
|
@ -89,11 +89,14 @@ class LogoutView(View, LogoutMixin):
|
||||
request = None
|
||||
service = None
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
"""methode called on GET request on this view"""
|
||||
def init_get(self, request):
|
||||
self.request = request
|
||||
self.service = request.GET.get('service')
|
||||
self.url = request.GET.get('url')
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
"""methode called on GET request on this view"""
|
||||
self.init_get(request)
|
||||
self.logout()
|
||||
# if service is set, redirect to service after logout
|
||||
if self.service:
|
||||
@ -105,6 +108,7 @@ class LogoutView(View, LogoutMixin):
|
||||
# else redirect to login page
|
||||
else:
|
||||
if settings.CAS_REDIRECT_TO_LOGIN_AFTER_LOGOUT:
|
||||
|
||||
messages.add_message(request, messages.SUCCESS, _(u'Successfully logout'))
|
||||
return redirect("cas_server:login")
|
||||
else:
|
||||
@ -129,67 +133,110 @@ class LoginView(View, LogoutMixin):
|
||||
renewed = False
|
||||
warned = False
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
"""methode called on POST request on this view"""
|
||||
INVALID_LOGIN_TICKET = 1
|
||||
USER_LOGIN_OK = 2
|
||||
USER_LOGIN_FAILURE = 3
|
||||
USER_ALREADY_LOGGED = 4
|
||||
USER_AUTHENTICATED = 5
|
||||
USER_NOT_AUTHENTICATED = 6
|
||||
|
||||
def init_post(self, request):
|
||||
self.request = request
|
||||
self.service = request.POST.get('service')
|
||||
self.renew = True if request.POST.get('renew') else False
|
||||
self.gateway = request.POST.get('gateway')
|
||||
self.method = request.POST.get('method')
|
||||
|
||||
def check_lt(self):
|
||||
# save LT for later check
|
||||
lt_valid = request.session.get('lt')
|
||||
lt_send = request.POST.get('lt')
|
||||
lt_valid = self.request.session.get('lt')
|
||||
lt_send = self.request.POST.get('lt')
|
||||
# generate a new LT (by posting the LT has been consumed)
|
||||
request.session['lt'] = utils.gen_lt()
|
||||
self.request.session['lt'] = utils.gen_lt()
|
||||
|
||||
# check if send LT is valid
|
||||
if lt_valid is None or lt_valid != lt_send:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
"""methode called on POST request on this view"""
|
||||
self.init_post(request)
|
||||
ret = self.process_post()
|
||||
if ret == self.INVALID_LOGIN_TICKET:
|
||||
messages.add_message(
|
||||
self.request,
|
||||
messages.ERROR,
|
||||
_(u"Invalid login ticket")
|
||||
)
|
||||
values = request.POST.copy()
|
||||
# if not set a new LT and fail
|
||||
values['lt'] = request.session['lt']
|
||||
self.init_form(values)
|
||||
|
||||
elif not request.session.get("authenticated") or self.renew:
|
||||
self.init_form(request.POST)
|
||||
if self.form.is_valid():
|
||||
elif ret == self.USER_LOGIN_OK:
|
||||
try:
|
||||
self.user = models.User.objects.get(
|
||||
username=self.form.cleaned_data['username'],
|
||||
username=self.request.session['username'],
|
||||
session_key=self.request.session.session_key
|
||||
)
|
||||
request.session.set_expiry(0)
|
||||
request.session["username"] = self.form.cleaned_data['username']
|
||||
request.session["warn"] = True if self.form.cleaned_data.get("warn") else False
|
||||
request.session["authenticated"] = True
|
||||
self.renewed = True
|
||||
self.warned = True
|
||||
else:
|
||||
self.logout()
|
||||
self.user.save()
|
||||
except models.User.DoesNotExist:
|
||||
self.user = models.User.objects.create(
|
||||
username=self.request.session['username'],
|
||||
session_key=self.request.session.session_key
|
||||
)
|
||||
self.user.save()
|
||||
elif ret == self.USER_LOGIN_FAILURE: # bad user login
|
||||
self.logout()
|
||||
elif ret == self.USER_ALREADY_LOGGED:
|
||||
pass
|
||||
else:
|
||||
raise EnvironmentError("invalid output for LoginView.process_post")
|
||||
return self.common()
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
"""methode called on GET request on this view"""
|
||||
def process_post(self, pytest=False):
|
||||
if not self.check_lt():
|
||||
values = self.request.POST.copy()
|
||||
# if not set a new LT and fail
|
||||
values['lt'] = self.request.session['lt']
|
||||
self.init_form(values)
|
||||
return self.INVALID_LOGIN_TICKET
|
||||
elif not self.request.session.get("authenticated") or self.renew:
|
||||
self.init_form(self.request.POST)
|
||||
if self.form.is_valid():
|
||||
self.request.session.set_expiry(0)
|
||||
self.request.session["username"] = self.form.cleaned_data['username']
|
||||
self.request.session["warn"] = True if self.form.cleaned_data.get("warn") else False
|
||||
self.request.session["authenticated"] = True
|
||||
self.renewed = True
|
||||
self.warned = True
|
||||
return self.USER_LOGIN_OK
|
||||
else:
|
||||
return self.USER_LOGIN_FAILURE
|
||||
else:
|
||||
return self.USER_ALREADY_LOGGED
|
||||
|
||||
def init_get(self, request):
|
||||
self.request = request
|
||||
self.service = request.GET.get('service')
|
||||
self.renew = True if request.GET.get('renew') else False
|
||||
self.gateway = request.GET.get('gateway')
|
||||
self.method = request.GET.get('method')
|
||||
|
||||
# generate a new LT if none is present
|
||||
request.session['lt'] = request.session.get('lt', utils.gen_lt())
|
||||
|
||||
if not request.session.get("authenticated") or self.renew:
|
||||
self.init_form()
|
||||
def get(self, request, *args, **kwargs):
|
||||
"""methode called on GET request on this view"""
|
||||
self.init_get(request)
|
||||
self.process_get()
|
||||
return self.common()
|
||||
|
||||
def process_get(self):
|
||||
# generate a new LT if none is present
|
||||
self.request.session['lt'] = self.request.session.get('lt', utils.gen_lt())
|
||||
|
||||
if not self.request.session.get("authenticated") or self.renew:
|
||||
self.init_form()
|
||||
return self.USER_NOT_AUTHENTICATED
|
||||
return self.USER_AUTHENTICATED
|
||||
|
||||
def init_form(self, values=None):
|
||||
self.form = forms.UserCredential(
|
||||
self.request,
|
||||
values,
|
||||
initial={
|
||||
'service': self.service,
|
||||
@ -345,7 +392,6 @@ class Auth(View):
|
||||
if not username or not password or not service:
|
||||
return HttpResponse("no\n", content_type="text/plain")
|
||||
form = forms.UserCredential(
|
||||
request,
|
||||
request.POST,
|
||||
initial={
|
||||
'service': service,
|
||||
@ -355,10 +401,17 @@ class Auth(View):
|
||||
)
|
||||
if form.is_valid():
|
||||
try:
|
||||
user = models.User.objects.get(
|
||||
username=form.cleaned_data['username'],
|
||||
session_key=request.session.session_key
|
||||
)
|
||||
try:
|
||||
user = models.User.objects.get(
|
||||
username=form.cleaned_data['username'],
|
||||
session_key=request.session.session_key
|
||||
)
|
||||
except models.User.DoesNotExist:
|
||||
user = models.User.objects.create(
|
||||
username=form.cleaned_data['username'],
|
||||
session_key=request.session.session_key
|
||||
)
|
||||
user.save()
|
||||
# is the service allowed
|
||||
service_pattern = ServicePattern.validate(service)
|
||||
# is the current user allowed on this service
|
||||
|
Reference in New Issue
Block a user