Remove some duplicate from tests code, full coverage for prowy view
This commit is contained in:
		@@ -138,34 +138,33 @@ class CheckPasswordCase(TestCase):
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
 | 
			
		||||
class LoginTestCase(TestCase):
 | 
			
		||||
    """Tests for the login view"""
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        """
 | 
			
		||||
            Prepare the test context:
 | 
			
		||||
                * set the auth class to 'cas_server.auth.TestAuthUser'
 | 
			
		||||
                * create a service pattern for https://www.example.com/**
 | 
			
		||||
                * Set the service pattern to return all user attributes
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
class BaseServicePattern(object):
 | 
			
		||||
    """Mixing for setting up service pattern for testing"""
 | 
			
		||||
    def setup_service_patterns(self, proxy=False):
 | 
			
		||||
        """setting up service pattern"""
 | 
			
		||||
        # For general purpose testing
 | 
			
		||||
        self.service = "https://www.example.com"
 | 
			
		||||
        self.service_pattern = models.ServicePattern.objects.create(
 | 
			
		||||
            name="example",
 | 
			
		||||
            pattern="^https://www\.example\.com(/.*)?$",
 | 
			
		||||
            proxy=proxy,
 | 
			
		||||
        )
 | 
			
		||||
        models.ReplaceAttributName.objects.create(name="*", service_pattern=self.service_pattern)
 | 
			
		||||
 | 
			
		||||
        # For testing the restrict_users attributes
 | 
			
		||||
        self.service_restrict_user_fail = "https://restrict_user_fail.example.com"
 | 
			
		||||
        self.service_pattern_restrict_user_fail = models.ServicePattern.objects.create(
 | 
			
		||||
            name="restrict_user_fail",
 | 
			
		||||
            pattern="^https://restrict_user_fail\.example\.com(/.*)?$",
 | 
			
		||||
            restrict_users=True,
 | 
			
		||||
            proxy=proxy,
 | 
			
		||||
        )
 | 
			
		||||
        self.service_restrict_user_success = "https://restrict_user_success.example.com"
 | 
			
		||||
        self.service_pattern_restrict_user_success = models.ServicePattern.objects.create(
 | 
			
		||||
            name="restrict_user_success",
 | 
			
		||||
            pattern="^https://restrict_user_success\.example\.com(/.*)?$",
 | 
			
		||||
            restrict_users=True,
 | 
			
		||||
            proxy=proxy,
 | 
			
		||||
        )
 | 
			
		||||
        models.Username.objects.create(
 | 
			
		||||
            value=settings.CAS_TEST_USER,
 | 
			
		||||
@@ -173,18 +172,22 @@ class LoginTestCase(TestCase):
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # For testing the user attributes filtering conditions
 | 
			
		||||
        self.service_filter_fail = "https://filter_fail.example.com"
 | 
			
		||||
        self.service_pattern_filter_fail = models.ServicePattern.objects.create(
 | 
			
		||||
            name="filter_fail",
 | 
			
		||||
            pattern="^https://filter_fail\.example\.com(/.*)?$",
 | 
			
		||||
            proxy=proxy,
 | 
			
		||||
        )
 | 
			
		||||
        models.FilterAttributValue.objects.create(
 | 
			
		||||
            attribut="right",
 | 
			
		||||
            pattern="^admin$",
 | 
			
		||||
            service_pattern=self.service_pattern_filter_fail
 | 
			
		||||
        )
 | 
			
		||||
        self.service_filter_success = "https://filter_success.example.com"
 | 
			
		||||
        self.service_pattern_filter_success = models.ServicePattern.objects.create(
 | 
			
		||||
            name="filter_success",
 | 
			
		||||
            pattern="^https://filter_success\.example\.com(/.*)?$",
 | 
			
		||||
            proxy=proxy,
 | 
			
		||||
        )
 | 
			
		||||
        models.FilterAttributValue.objects.create(
 | 
			
		||||
            attribut="email",
 | 
			
		||||
@@ -193,17 +196,29 @@ class LoginTestCase(TestCase):
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # For testing the user_field attributes
 | 
			
		||||
        self.service_field_needed_fail = "https://field_needed_fail.example.com"
 | 
			
		||||
        self.service_pattern_field_needed_fail = models.ServicePattern.objects.create(
 | 
			
		||||
            name="field_needed_fail",
 | 
			
		||||
            pattern="^https://field_needed_fail\.example\.com(/.*)?$",
 | 
			
		||||
            user_field="uid"
 | 
			
		||||
            user_field="uid",
 | 
			
		||||
            proxy=proxy,
 | 
			
		||||
        )
 | 
			
		||||
        self.service_field_needed_success = "https://field_needed_success.example.com"
 | 
			
		||||
        self.service_pattern_field_needed_success = models.ServicePattern.objects.create(
 | 
			
		||||
            name="field_needed_success",
 | 
			
		||||
            pattern="^https://field_needed_success\.example\.com(/.*)?$",
 | 
			
		||||
            user_field="nom"
 | 
			
		||||
            user_field="nom",
 | 
			
		||||
            proxy=proxy,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
 | 
			
		||||
class LoginTestCase(TestCase, BaseServicePattern):
 | 
			
		||||
    """Tests for the login view"""
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        """Prepare the test context:"""
 | 
			
		||||
        self.setup_service_patterns()
 | 
			
		||||
 | 
			
		||||
    def assert_logged(self, client, response, warn=False, code=200):
 | 
			
		||||
        """Assertions testing that client is well authenticated"""
 | 
			
		||||
        self.assertEqual(response.status_code, code)
 | 
			
		||||
@@ -395,59 +410,55 @@ class LoginTestCase(TestCase):
 | 
			
		||||
 | 
			
		||||
    def test_service_restrict_user(self):
 | 
			
		||||
        """Testing the restric user capability fro a service"""
 | 
			
		||||
        service = "https://restrict_user_fail.example.com"
 | 
			
		||||
        client = get_auth_client()
 | 
			
		||||
        response = client.get("/login", {'service': service})
 | 
			
		||||
 | 
			
		||||
        response = client.get("/login", {'service': self.service_restrict_user_fail})
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
        self.assertTrue(b"Username non allowed" in response.content)
 | 
			
		||||
 | 
			
		||||
        service = "https://restrict_user_success.example.com"
 | 
			
		||||
        response = client.get("/login", {'service': service})
 | 
			
		||||
        response = client.get("/login", {'service': self.service_restrict_user_success})
 | 
			
		||||
        self.assertEqual(response.status_code, 302)
 | 
			
		||||
        self.assertTrue(response["Location"].startswith("%s?ticket=" % service))
 | 
			
		||||
        self.assertTrue(
 | 
			
		||||
            response["Location"].startswith("%s?ticket=" % self.service_restrict_user_success)
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def test_service_filter(self):
 | 
			
		||||
        """Test the filtering on user attributes"""
 | 
			
		||||
        service = "https://filter_fail.example.com"
 | 
			
		||||
        client = get_auth_client()
 | 
			
		||||
        response = client.get("/login", {'service': service})
 | 
			
		||||
 | 
			
		||||
        response = client.get("/login", {'service': self.service_filter_fail})
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
        self.assertTrue(b"User charateristics non allowed" in response.content)
 | 
			
		||||
 | 
			
		||||
        service = "https://filter_success.example.com"
 | 
			
		||||
        response = client.get("/login", {'service': service})
 | 
			
		||||
        response = client.get("/login", {'service': self.service_filter_success})
 | 
			
		||||
        self.assertEqual(response.status_code, 302)
 | 
			
		||||
        self.assertTrue(response["Location"].startswith("%s?ticket=" % service))
 | 
			
		||||
        self.assertTrue(response["Location"].startswith("%s?ticket=" % self.service_filter_success))
 | 
			
		||||
 | 
			
		||||
    def test_service_user_field(self):
 | 
			
		||||
        """Test using a user attribute as username: case on if the attribute exists or not"""
 | 
			
		||||
        service = "https://field_needed_fail.example.com"
 | 
			
		||||
        client = get_auth_client()
 | 
			
		||||
        response = client.get("/login", {'service': service})
 | 
			
		||||
 | 
			
		||||
        response = client.get("/login", {'service': self.service_field_needed_fail})
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
        self.assertTrue(b"The attribut uid is needed to use that service" in response.content)
 | 
			
		||||
 | 
			
		||||
        service = "https://field_needed_success.example.com"
 | 
			
		||||
        response = client.get("/login", {'service': service})
 | 
			
		||||
        response = client.get("/login", {'service': self.service_field_needed_success})
 | 
			
		||||
        self.assertEqual(response.status_code, 302)
 | 
			
		||||
        self.assertTrue(response["Location"].startswith("%s?ticket=" % service))
 | 
			
		||||
        self.assertTrue(
 | 
			
		||||
            response["Location"].startswith("%s?ticket=" % self.service_field_needed_success)
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    @override_settings(CAS_TEST_ATTRIBUTES={'nom': []})
 | 
			
		||||
    def test_service_user_field_evaluate_to_false(self):
 | 
			
		||||
        """
 | 
			
		||||
            Test using a user attribute as username:
 | 
			
		||||
            case the attribute exists but evaluate to False
 | 
			
		||||
        """
 | 
			
		||||
        service = "https://field_needed_success.example.com"
 | 
			
		||||
        saved_nom = settings.CAS_TEST_ATTRIBUTES["nom"]
 | 
			
		||||
        settings.CAS_TEST_ATTRIBUTES["nom"] = []
 | 
			
		||||
 | 
			
		||||
        client = get_auth_client()
 | 
			
		||||
        response = client.get("/login", {"service": service})
 | 
			
		||||
        response = client.get("/login", {"service": self.service_field_needed_success})
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
        self.assertTrue(b"The attribut nom is needed to use that service" in response.content)
 | 
			
		||||
 | 
			
		||||
        settings.CAS_TEST_ATTRIBUTES["nom"] = saved_nom
 | 
			
		||||
 | 
			
		||||
    def test_gateway(self):
 | 
			
		||||
        """test gateway parameter"""
 | 
			
		||||
 | 
			
		||||
@@ -904,8 +915,65 @@ class ValidateTestCase(TestCase):
 | 
			
		||||
            self.assertEqual(response.content, b'no\n')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class XmlContent(object):
 | 
			
		||||
    """Mixin for test on CAS XML responses"""
 | 
			
		||||
    def assert_error(self, response, code, text=None):
 | 
			
		||||
        """Assert a validation error"""
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        error = root.xpath(
 | 
			
		||||
            "//cas:authenticationFailure",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(error), 1)
 | 
			
		||||
        self.assertEqual(error[0].attrib['code'], code)
 | 
			
		||||
        if text is not None:
 | 
			
		||||
            self.assertEqual(error[0].text, text)
 | 
			
		||||
 | 
			
		||||
    def assert_success(self, response, username, original_attributes):
 | 
			
		||||
        """assert a ticket validation success"""
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        sucess = root.xpath(
 | 
			
		||||
            "//cas:authenticationSuccess",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertTrue(sucess)
 | 
			
		||||
 | 
			
		||||
        users = root.xpath("//cas:user", namespaces={'cas': "http://www.yale.edu/tp/cas"})
 | 
			
		||||
        self.assertEqual(len(users), 1)
 | 
			
		||||
        self.assertEqual(users[0].text, username)
 | 
			
		||||
 | 
			
		||||
        attributes = root.xpath(
 | 
			
		||||
            "//cas:attributes",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(attributes), 1)
 | 
			
		||||
        attrs1 = set()
 | 
			
		||||
        for attr in attributes[0]:
 | 
			
		||||
            attrs1.add((attr.tag[len("http://www.yale.edu/tp/cas")+2:], attr.text))
 | 
			
		||||
 | 
			
		||||
        attributes = root.xpath("//cas:attribute", namespaces={'cas': "http://www.yale.edu/tp/cas"})
 | 
			
		||||
        self.assertEqual(len(attributes), len(attrs1))
 | 
			
		||||
        attrs2 = set()
 | 
			
		||||
        for attr in attributes:
 | 
			
		||||
            attrs2.add((attr.attrib['name'], attr.attrib['value']))
 | 
			
		||||
        original = set()
 | 
			
		||||
        for key, value in original_attributes.items():
 | 
			
		||||
            if isinstance(value, list):
 | 
			
		||||
                for sub_value in value:
 | 
			
		||||
                    original.add((key, sub_value))
 | 
			
		||||
            else:
 | 
			
		||||
                original.add((key, value))
 | 
			
		||||
        self.assertEqual(attrs1, attrs2)
 | 
			
		||||
        self.assertEqual(attrs1, original)
 | 
			
		||||
 | 
			
		||||
        return root
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
 | 
			
		||||
class ValidateServiceTestCase(TestCase):
 | 
			
		||||
class ValidateServiceTestCase(TestCase, XmlContent):
 | 
			
		||||
    """tests for the serviceValidate view"""
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        """preparing test context"""
 | 
			
		||||
@@ -940,42 +1008,7 @@ class ValidateServiceTestCase(TestCase):
 | 
			
		||||
 | 
			
		||||
        client = Client()
 | 
			
		||||
        response = client.get('/serviceValidate', {'ticket': ticket.value, 'service': self.service})
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        sucess = root.xpath(
 | 
			
		||||
            "//cas:authenticationSuccess",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertTrue(sucess)
 | 
			
		||||
 | 
			
		||||
        users = root.xpath("//cas:user", namespaces={'cas': "http://www.yale.edu/tp/cas"})
 | 
			
		||||
        self.assertEqual(len(users), 1)
 | 
			
		||||
        self.assertEqual(users[0].text, settings.CAS_TEST_USER)
 | 
			
		||||
 | 
			
		||||
        attributes = root.xpath(
 | 
			
		||||
            "//cas:attributes",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(attributes), 1)
 | 
			
		||||
        attrs1 = set()
 | 
			
		||||
        for attr in attributes[0]:
 | 
			
		||||
            attrs1.add((attr.tag[len("http://www.yale.edu/tp/cas")+2:], attr.text))
 | 
			
		||||
 | 
			
		||||
        attributes = root.xpath("//cas:attribute", namespaces={'cas': "http://www.yale.edu/tp/cas"})
 | 
			
		||||
        self.assertEqual(len(attributes), len(attrs1))
 | 
			
		||||
        attrs2 = set()
 | 
			
		||||
        for attr in attributes:
 | 
			
		||||
            attrs2.add((attr.attrib['name'], attr.attrib['value']))
 | 
			
		||||
        original = set()
 | 
			
		||||
        for key, value in settings.CAS_TEST_ATTRIBUTES.items():
 | 
			
		||||
            if isinstance(value, list):
 | 
			
		||||
                for sub_value in value:
 | 
			
		||||
                    original.add((key, sub_value))
 | 
			
		||||
            else:
 | 
			
		||||
                original.add((key, value))
 | 
			
		||||
        self.assertEqual(attrs1, attrs2)
 | 
			
		||||
        self.assertEqual(attrs1, original)
 | 
			
		||||
        self.assert_success(response, settings.CAS_TEST_USER, settings.CAS_TEST_ATTRIBUTES)
 | 
			
		||||
 | 
			
		||||
    def test_validate_service_view_ok_one_attribute(self):
 | 
			
		||||
        """
 | 
			
		||||
@@ -989,36 +1022,11 @@ class ValidateServiceTestCase(TestCase):
 | 
			
		||||
            '/serviceValidate',
 | 
			
		||||
            {'ticket': ticket.value, 'service': self.service_one_attribute}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        sucess = root.xpath(
 | 
			
		||||
            "//cas:authenticationSuccess",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        self.assert_success(
 | 
			
		||||
            response,
 | 
			
		||||
            settings.CAS_TEST_USER,
 | 
			
		||||
            {'nom': settings.CAS_TEST_ATTRIBUTES['nom']}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertTrue(sucess)
 | 
			
		||||
 | 
			
		||||
        users = root.xpath("//cas:user", namespaces={'cas': "http://www.yale.edu/tp/cas"})
 | 
			
		||||
        self.assertEqual(len(users), 1)
 | 
			
		||||
        self.assertEqual(users[0].text, settings.CAS_TEST_USER)
 | 
			
		||||
 | 
			
		||||
        attributes = root.xpath(
 | 
			
		||||
            "//cas:attributes",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(attributes), 1)
 | 
			
		||||
        attrs1 = set()
 | 
			
		||||
        for attr in attributes[0]:
 | 
			
		||||
            attrs1.add((attr.tag[len("http://www.yale.edu/tp/cas")+2:], attr.text))
 | 
			
		||||
 | 
			
		||||
        attributes = root.xpath("//cas:attribute", namespaces={'cas': "http://www.yale.edu/tp/cas"})
 | 
			
		||||
        self.assertEqual(len(attributes), len(attrs1))
 | 
			
		||||
        attrs2 = set()
 | 
			
		||||
        for attr in attributes:
 | 
			
		||||
            attrs2.add((attr.attrib['name'], attr.attrib['value']))
 | 
			
		||||
        original = set([('nom', settings.CAS_TEST_ATTRIBUTES['nom'])])
 | 
			
		||||
        self.assertEqual(attrs1, attrs2)
 | 
			
		||||
        self.assertEqual(attrs1, original)
 | 
			
		||||
 | 
			
		||||
    def test_validate_service_view_badservice(self):
 | 
			
		||||
        """test with a valid ticket but a bad service, the validatin should fail"""
 | 
			
		||||
@@ -1027,16 +1035,11 @@ class ValidateServiceTestCase(TestCase):
 | 
			
		||||
        client = Client()
 | 
			
		||||
        bad_service = "https://www.example.org"
 | 
			
		||||
        response = client.get('/serviceValidate', {'ticket': ticket.value, 'service': bad_service})
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        error = root.xpath(
 | 
			
		||||
            "//cas:authenticationFailure",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        self.assert_error(
 | 
			
		||||
            response,
 | 
			
		||||
            "INVALID_SERVICE",
 | 
			
		||||
            bad_service
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(error), 1)
 | 
			
		||||
        self.assertEqual(error[0].attrib['code'], "INVALID_SERVICE")
 | 
			
		||||
        self.assertEqual(error[0].text, bad_service)
 | 
			
		||||
 | 
			
		||||
    def test_validate_service_view_badticket_goodprefix(self):
 | 
			
		||||
        """
 | 
			
		||||
@@ -1048,16 +1051,11 @@ class ValidateServiceTestCase(TestCase):
 | 
			
		||||
        client = Client()
 | 
			
		||||
        bad_ticket = "%s-RANDOM" % settings.CAS_SERVICE_TICKET_PREFIX
 | 
			
		||||
        response = client.get('/serviceValidate', {'ticket': bad_ticket, 'service': self.service})
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        error = root.xpath(
 | 
			
		||||
            "//cas:authenticationFailure",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        self.assert_error(
 | 
			
		||||
            response,
 | 
			
		||||
            "INVALID_TICKET",
 | 
			
		||||
            'ticket not found'
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(error), 1)
 | 
			
		||||
        self.assertEqual(error[0].attrib['code'], "INVALID_TICKET")
 | 
			
		||||
        self.assertEqual(error[0].text, 'ticket not found')
 | 
			
		||||
 | 
			
		||||
    def test_validate_service_view_badticket_badprefix(self):
 | 
			
		||||
        """
 | 
			
		||||
@@ -1069,16 +1067,11 @@ class ValidateServiceTestCase(TestCase):
 | 
			
		||||
        client = Client()
 | 
			
		||||
        bad_ticket = "RANDOM"
 | 
			
		||||
        response = client.get('/serviceValidate', {'ticket': bad_ticket, 'service': self.service})
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        error = root.xpath(
 | 
			
		||||
            "//cas:authenticationFailure",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        self.assert_error(
 | 
			
		||||
            response,
 | 
			
		||||
            "INVALID_TICKET",
 | 
			
		||||
            bad_ticket
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(error), 1)
 | 
			
		||||
        self.assertEqual(error[0].attrib['code'], "INVALID_TICKET")
 | 
			
		||||
        self.assertEqual(error[0].text, bad_ticket)
 | 
			
		||||
 | 
			
		||||
    def test_validate_service_view_ok_pgturl(self):
 | 
			
		||||
        """test the retrieval of a ProxyGrantingTicket"""
 | 
			
		||||
@@ -1116,15 +1109,10 @@ class ValidateServiceTestCase(TestCase):
 | 
			
		||||
            '/serviceValidate',
 | 
			
		||||
            {'ticket': ticket.value, 'service': service, 'pgtUrl': service}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        error = root.xpath(
 | 
			
		||||
            "//cas:authenticationFailure",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        self.assert_error(
 | 
			
		||||
            response,
 | 
			
		||||
            "INVALID_PROXY_CALLBACK",
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(error), 1)
 | 
			
		||||
        self.assertEqual(error[0].attrib['code'], "INVALID_PROXY_CALLBACK")
 | 
			
		||||
 | 
			
		||||
    def test_validate_service_pgturl_404(self):
 | 
			
		||||
        """
 | 
			
		||||
@@ -1140,13 +1128,7 @@ class ValidateServiceTestCase(TestCase):
 | 
			
		||||
            '/serviceValidate',
 | 
			
		||||
            {'ticket': ticket.value, 'service': service, 'pgtUrl': service}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        sucess = root.xpath(
 | 
			
		||||
            "//cas:authenticationSuccess",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertTrue(sucess)
 | 
			
		||||
        root = self.assert_success(response, settings.CAS_TEST_USER, settings.CAS_TEST_ATTRIBUTES)
 | 
			
		||||
        pgtiou = root.xpath(
 | 
			
		||||
            "//cas:proxyGrantingTicket",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
@@ -1163,16 +1145,11 @@ class ValidateServiceTestCase(TestCase):
 | 
			
		||||
            '/serviceValidate',
 | 
			
		||||
            {'ticket': ticket.value, 'service': self.service, 'pgtUrl': self.service}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        error = root.xpath(
 | 
			
		||||
            "//cas:authenticationFailure",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        self.assert_error(
 | 
			
		||||
            response,
 | 
			
		||||
            "INVALID_PROXY_CALLBACK",
 | 
			
		||||
            "callback url not allowed by configuration"
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(error), 1)
 | 
			
		||||
        self.assertEqual(error[0].attrib['code'], "INVALID_PROXY_CALLBACK")
 | 
			
		||||
        self.assertEqual(error[0].text, "callback url not allowed by configuration")
 | 
			
		||||
 | 
			
		||||
        self.service_pattern.proxy_callback = True
 | 
			
		||||
 | 
			
		||||
@@ -1182,15 +1159,11 @@ class ValidateServiceTestCase(TestCase):
 | 
			
		||||
            '/serviceValidate',
 | 
			
		||||
            {'ticket': ticket.value, 'service': self.service, 'pgtUrl': "https://www.example.org"}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        error = root.xpath(
 | 
			
		||||
            "//cas:authenticationFailure",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        self.assert_error(
 | 
			
		||||
            response,
 | 
			
		||||
            "INVALID_PROXY_CALLBACK",
 | 
			
		||||
            "callback url not allowed by configuration"
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(error), 1)
 | 
			
		||||
        self.assertEqual(error[0].attrib['code'], "INVALID_PROXY_CALLBACK")
 | 
			
		||||
        self.assertEqual(error[0].text, "callback url not allowed by configuration")
 | 
			
		||||
 | 
			
		||||
    def test_validate_user_field_ok(self):
 | 
			
		||||
        """
 | 
			
		||||
@@ -1203,17 +1176,11 @@ class ValidateServiceTestCase(TestCase):
 | 
			
		||||
            '/serviceValidate',
 | 
			
		||||
            {'ticket': ticket.value, 'service': self.service_user_field}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        sucess = root.xpath(
 | 
			
		||||
            "//cas:authenticationSuccess",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        self.assert_success(
 | 
			
		||||
            response,
 | 
			
		||||
            settings.CAS_TEST_ATTRIBUTES["alias"][0],
 | 
			
		||||
            {}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertTrue(sucess)
 | 
			
		||||
 | 
			
		||||
        users = root.xpath("//cas:user", namespaces={'cas': "http://www.yale.edu/tp/cas"})
 | 
			
		||||
        self.assertEqual(len(users), 1)
 | 
			
		||||
        self.assertEqual(users[0].text, settings.CAS_TEST_ATTRIBUTES["alias"][0])
 | 
			
		||||
 | 
			
		||||
    def test_validate_missing_parameter(self):
 | 
			
		||||
        """test with a missing GET parameter among [service, ticket]"""
 | 
			
		||||
@@ -1225,20 +1192,20 @@ class ValidateServiceTestCase(TestCase):
 | 
			
		||||
            send_params = params.copy()
 | 
			
		||||
            del send_params[key]
 | 
			
		||||
            response = client.get('/serviceValidate', send_params)
 | 
			
		||||
            root = etree.fromstring(response.content)
 | 
			
		||||
            error = root.xpath(
 | 
			
		||||
                "//cas:authenticationFailure",
 | 
			
		||||
                namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
            self.assert_error(
 | 
			
		||||
                response,
 | 
			
		||||
                "INVALID_REQUEST",
 | 
			
		||||
                "you must specify a service and a ticket"
 | 
			
		||||
            )
 | 
			
		||||
            self.assertEqual(len(error), 1)
 | 
			
		||||
            self.assertEqual(error[0].attrib['code'], "INVALID_REQUEST")
 | 
			
		||||
            self.assertEqual(error[0].text, "you must specify a service and a ticket")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@override_settings(CAS_AUTH_CLASS='cas_server.auth.TestAuthUser')
 | 
			
		||||
class ProxyTestCase(TestCase):
 | 
			
		||||
 | 
			
		||||
class ProxyTestCase(TestCase, BaseServicePattern, XmlContent):
 | 
			
		||||
    """tests for the proxy view"""
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        """preparing test context"""
 | 
			
		||||
        self.setup_service_patterns(proxy=True)
 | 
			
		||||
 | 
			
		||||
        self.service = 'http://127.0.0.1'
 | 
			
		||||
        self.service_pattern = models.ServicePattern.objects.create(
 | 
			
		||||
            name="localhost",
 | 
			
		||||
@@ -1249,6 +1216,10 @@ class ProxyTestCase(TestCase):
 | 
			
		||||
        models.ReplaceAttributName.objects.create(name="*", service_pattern=self.service_pattern)
 | 
			
		||||
 | 
			
		||||
    def test_validate_proxy_ok(self):
 | 
			
		||||
        """
 | 
			
		||||
            Get a PGT, get a proxy ticket, validate it. Validation should succeed and
 | 
			
		||||
            show the proxy service URL.
 | 
			
		||||
        """
 | 
			
		||||
        params = get_pgt()
 | 
			
		||||
 | 
			
		||||
        # get a proxy ticket
 | 
			
		||||
@@ -1270,14 +1241,11 @@ class ProxyTestCase(TestCase):
 | 
			
		||||
        # validate the proxy ticket
 | 
			
		||||
        client2 = Client()
 | 
			
		||||
        response = client2.get('/proxyValidate', {'ticket': proxy_ticket, 'service': self.service})
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        sucess = root.xpath(
 | 
			
		||||
            "//cas:authenticationSuccess",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        root = self.assert_success(
 | 
			
		||||
            response,
 | 
			
		||||
            settings.CAS_TEST_USER,
 | 
			
		||||
            settings.CAS_TEST_ATTRIBUTES
 | 
			
		||||
        )
 | 
			
		||||
        self.assertTrue(sucess)
 | 
			
		||||
 | 
			
		||||
        # check that the proxy is send to the end service
 | 
			
		||||
        proxies = root.xpath("//cas:proxies", namespaces={'cas': "http://www.yale.edu/tp/cas"})
 | 
			
		||||
@@ -1286,97 +1254,82 @@ class ProxyTestCase(TestCase):
 | 
			
		||||
        self.assertEqual(len(proxy), 1)
 | 
			
		||||
        self.assertEqual(proxy[0].text, params["service"])
 | 
			
		||||
 | 
			
		||||
        # same tests than those for serviceValidate
 | 
			
		||||
        users = root.xpath("//cas:user", namespaces={'cas': "http://www.yale.edu/tp/cas"})
 | 
			
		||||
        self.assertEqual(len(users), 1)
 | 
			
		||||
        self.assertEqual(users[0].text, settings.CAS_TEST_USER)
 | 
			
		||||
 | 
			
		||||
        attributes = root.xpath(
 | 
			
		||||
            "//cas:attributes",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(attributes), 1)
 | 
			
		||||
        attrs1 = set()
 | 
			
		||||
        for attr in attributes[0]:
 | 
			
		||||
            attrs1.add((attr.tag[len("http://www.yale.edu/tp/cas")+2:], attr.text))
 | 
			
		||||
 | 
			
		||||
        attributes = root.xpath("//cas:attribute", namespaces={'cas': "http://www.yale.edu/tp/cas"})
 | 
			
		||||
        self.assertEqual(len(attributes), len(attrs1))
 | 
			
		||||
        attrs2 = set()
 | 
			
		||||
        for attr in attributes:
 | 
			
		||||
            attrs2.add((attr.attrib['name'], attr.attrib['value']))
 | 
			
		||||
        original = set()
 | 
			
		||||
        for key, value in settings.CAS_TEST_ATTRIBUTES.items():
 | 
			
		||||
            if isinstance(value, list):
 | 
			
		||||
                for sub_value in value:
 | 
			
		||||
                    original.add((key, sub_value))
 | 
			
		||||
            else:
 | 
			
		||||
                original.add((key, value))
 | 
			
		||||
        self.assertEqual(attrs1, attrs2)
 | 
			
		||||
        self.assertEqual(attrs1, original)
 | 
			
		||||
 | 
			
		||||
    def test_validate_proxy_bad(self):
 | 
			
		||||
    def test_validate_proxy_bad_pgt(self):
 | 
			
		||||
        """Try to get a ProxyTicket with a bad PGT. The PT generation should fail"""
 | 
			
		||||
        params = get_pgt()
 | 
			
		||||
 | 
			
		||||
        # bad PGT
 | 
			
		||||
        client1 = Client()
 | 
			
		||||
        response = client1.get(
 | 
			
		||||
        client = Client()
 | 
			
		||||
        response = client.get(
 | 
			
		||||
            '/proxy',
 | 
			
		||||
            {
 | 
			
		||||
                'pgt': "%s-RANDOM" % settings.CAS_PROXY_GRANTING_TICKET_PREFIX,
 | 
			
		||||
                'targetService': params['service']
 | 
			
		||||
            }
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        error = root.xpath(
 | 
			
		||||
            "//cas:authenticationFailure",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(error), 1)
 | 
			
		||||
        self.assertEqual(error[0].attrib['code'], "INVALID_TICKET")
 | 
			
		||||
        self.assertEqual(
 | 
			
		||||
            error[0].text,
 | 
			
		||||
        self.assert_error(
 | 
			
		||||
            response,
 | 
			
		||||
            "INVALID_TICKET",
 | 
			
		||||
            "PGT %s-RANDOM not found" % settings.CAS_PROXY_GRANTING_TICKET_PREFIX
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # bad targetService
 | 
			
		||||
        client2 = Client()
 | 
			
		||||
        response = client2.get(
 | 
			
		||||
    def test_validate_proxy_bad_service(self):
 | 
			
		||||
        """
 | 
			
		||||
            Try to get a ProxyTicket for a denied service and
 | 
			
		||||
            a service that do not allow PT. The PT generation should fail.
 | 
			
		||||
        """
 | 
			
		||||
        params = get_pgt()
 | 
			
		||||
 | 
			
		||||
        client1 = Client()
 | 
			
		||||
        response = client1.get(
 | 
			
		||||
            '/proxy',
 | 
			
		||||
            {'pgt': params['pgtId'], 'targetService': "https://www.example.org"}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        error = root.xpath(
 | 
			
		||||
            "//cas:authenticationFailure",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        self.assert_error(
 | 
			
		||||
            response,
 | 
			
		||||
            "UNAUTHORIZED_SERVICE",
 | 
			
		||||
            "https://www.example.org"
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(error), 1)
 | 
			
		||||
        self.assertEqual(error[0].attrib['code'], "UNAUTHORIZED_SERVICE")
 | 
			
		||||
        self.assertEqual(error[0].text, "https://www.example.org")
 | 
			
		||||
 | 
			
		||||
        # service do not allow proxy ticket
 | 
			
		||||
        self.service_pattern.proxy = False
 | 
			
		||||
        self.service_pattern.save()
 | 
			
		||||
 | 
			
		||||
        client3 = Client()
 | 
			
		||||
        response = client3.get(
 | 
			
		||||
        client2 = Client()
 | 
			
		||||
        response = client2.get(
 | 
			
		||||
            '/proxy',
 | 
			
		||||
            {'pgt': params['pgtId'], 'targetService': params['service']}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        root = etree.fromstring(response.content)
 | 
			
		||||
        error = root.xpath(
 | 
			
		||||
            "//cas:authenticationFailure",
 | 
			
		||||
            namespaces={'cas': "http://www.yale.edu/tp/cas"}
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(len(error), 1)
 | 
			
		||||
        self.assertEqual(error[0].attrib['code'], "UNAUTHORIZED_SERVICE")
 | 
			
		||||
        self.assertEqual(
 | 
			
		||||
            error[0].text,
 | 
			
		||||
        self.assert_error(
 | 
			
		||||
            response,
 | 
			
		||||
            "UNAUTHORIZED_SERVICE",
 | 
			
		||||
            'the service %s do not allow proxy ticket' % params['service']
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self.service_pattern.proxy = True
 | 
			
		||||
        self.service_pattern.save()
 | 
			
		||||
 | 
			
		||||
    def test_proxy_unauthorized_user(self):
 | 
			
		||||
        """
 | 
			
		||||
            Try to get a PT for services that do not allow the current user:
 | 
			
		||||
                * first with a service that restrict allower username
 | 
			
		||||
                * second with a service requiring somes conditions on the user attributes
 | 
			
		||||
                * third with a service using a particular user attribute as username
 | 
			
		||||
            All this tests should fail
 | 
			
		||||
        """
 | 
			
		||||
        params = get_pgt()
 | 
			
		||||
 | 
			
		||||
        for service in [
 | 
			
		||||
            self.service_restrict_user_fail,
 | 
			
		||||
            self.service_filter_fail,
 | 
			
		||||
            self.service_field_needed_fail
 | 
			
		||||
        ]:
 | 
			
		||||
            client = Client()
 | 
			
		||||
            response = client.get(
 | 
			
		||||
                '/proxy',
 | 
			
		||||
                {'pgt': params['pgtId'], 'targetService': service}
 | 
			
		||||
            )
 | 
			
		||||
            self.assert_error(
 | 
			
		||||
                response,
 | 
			
		||||
                "UNAUTHORIZED_USER",
 | 
			
		||||
                'User %s not allowed on %s' % (settings.CAS_TEST_USER, service)
 | 
			
		||||
            )
 | 
			
		||||
 
 | 
			
		||||
@@ -864,7 +864,7 @@ class Proxy(View):
 | 
			
		||||
        except (models.BadUsername, models.BadFilter, models.UserFieldNotDefined):
 | 
			
		||||
            raise ValidateError(
 | 
			
		||||
                'UNAUTHORIZED_USER',
 | 
			
		||||
                '%s not allowed on %s' % (ticket.user, self.target_service)
 | 
			
		||||
                'User %s not allowed on %s' % (ticket.user.username, self.target_service)
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user