source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py @ 7698

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py@7698
Revision 7698, 17.7 KB checked in by pjkersha, 10 years ago (diff)

Integrated SAML ESGF Group/Role? attribute value type into SAML Attribute Authority client unit tests.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
RevLine 
[4654]1#!/usr/bin/env python
2"""NDG Attribute Authority
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/12/08"
[4770]8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
[4840]9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
[7077]11__revision__ = '$Id$'
[4654]12
13import unittest
14import os
15import sys
16import getpass
17import re
18import logging
[5355]19logging.basicConfig(level=logging.DEBUG)
[4654]20
[6686]21from warnings import warn
22from uuid import uuid4
23from datetime import datetime
24from os import path
[6720]25import pickle
[4654]26
[5290]27from ndg.security.test.unit import BaseTestCase
[4716]28
[5971]29from ndg.security.common.utils.configfileparsers import (
30    CaseSensitiveConfigParser)
31from ndg.security.server.attributeauthority import (AttributeAuthority, 
[6719]32    SQLAlchemyAttributeInterface, InvalidAttributeFormat, AttributeInterface)
[4654]33
[6686]34from ndg.saml.saml2.core import (Response, Attribute, SAMLVersion, Subject, 
35                                 NameID, Issuer, AttributeQuery, 
36                                 XSStringAttributeValue, Status, StatusMessage, 
37                                 StatusCode)
38from ndg.saml.xml import XMLConstants
[7698]39from ndg.security.common.saml_utils.esgf import ESGFSamlNamespaces
[4667]40
[6686]41THIS_DIR = path.dirname(__file__)
[4667]42
[6686]43
[4716]44class AttributeAuthorityTestCase(BaseTestCase):
[6686]45    THIS_DIR = THIS_DIR
46    PROPERTIES_FILENAME = 'test_attributeauthority.cfg'
47    PROPERTIES_FILEPATH = path.join(THIS_DIR, PROPERTIES_FILENAME)
[6720]48    ASSERTION_LIFETIME = "86400"
49
[6686]50    def test01ParsePropertiesFile(self):
51        cls = AttributeAuthorityTestCase
52        aa = AttributeAuthority.fromPropertyFile(cls.PROPERTIES_FILEPATH)
53        self.assert_(aa)
54        self.assert_(aa.assertionLifetime == 3600)
[4667]55       
[6720]56    def _createAttributeAuthorityHelper(self):
57        """Helper method to creat an Attribute Authority instance for use with
58        tests
59        """
[4667]60       
[6720]61        cls = AttributeAuthorityTestCase
62       
[6719]63        attributeInterfaceClassName = ('ndg.security.server.attributeauthority.'
64                                       'AttributeInterface')
65       
[6721]66        aa = AttributeAuthority.fromProperties(
[6720]67                    assertionLifetime=cls.ASSERTION_LIFETIME,
[6719]68                    attributeInterface_className=attributeInterfaceClassName)
69       
[6720]70        return aa
71           
72    def test02FromProperties(self):
73       
74        cls = AttributeAuthorityTestCase
75        aa = self._createAttributeAuthorityHelper()
76       
[6686]77        self.assert_(aa)
[6720]78       
79        # Check lifetime property converted from string input to float
80        self.assert_(aa.assertionLifetime == float(cls.ASSERTION_LIFETIME))
[6719]81        self.assert_(isinstance(aa.attributeInterface, AttributeInterface))
[4667]82
[6720]83    def test03Pickle(self):
84        # Test pickling with __slots__
85        aa = self._createAttributeAuthorityHelper()       
86        jar = pickle.dumps(aa)
87        aa2 = pickle.loads(jar)
88       
89        self.assert_(aa2)
90        self.assert_(aa2.assertionLifetime == aa.assertionLifetime)
91        self.assert_(isinstance(aa2.attributeInterface, AttributeInterface))
92   
93       
[6686]94class SQLAlchemyAttributeInterfaceTestCase(BaseTestCase):
95    THIS_DIR = THIS_DIR
96    PROPERTIES_FILENAME = 'test_sqlalchemyattributeinterface.cfg'
97    PROPERTIES_FILEPATH = path.join(THIS_DIR, PROPERTIES_FILENAME)
[4667]98   
[6062]99    SAML_SUBJECT_SQLQUERY = ("select count(*) from users where openid = "
100                             "'${userId}'")
[5971]101   
[6062]102    SAML_FIRSTNAME_SQLQUERY = ("select firstname from users where openid = "
103                               "'${userId}'")
[5977]104           
[6062]105    SAML_LASTNAME_SQLQUERY = ("select lastname from users where openid = "
106                              "'${userId}'")
[5977]107       
[6062]108    SAML_EMAILADDRESS_SQLQUERY = ("select emailaddress from users where "
109                                  "openid = '${userId}'")
[5977]110       
[5982]111    SAML_ATTRIBUTES_SQLQUERY = ("select attributename from attributes, users "
[6062]112                                "where users.openid = '${userId}' and "
113                                "attributes.username = users.username")
[5977]114                               
[5971]115    def __init__(self, *arg, **kw):
116        super(SQLAlchemyAttributeInterfaceTestCase, self).__init__(*arg, **kw)
[5973]117        self.skipTests = False
[5971]118        try:
[5982]119            import sqlalchemy
[5973]120
[5971]121        except NotImplementedError:
[5973]122            # Don't proceed with tests because SQLAlchemy is not installed
123            warn("Skipping SQLAlchemyAttributeInterfaceTestCase because "
124                 "SQLAlchemy is not installed")
125            self.skipTests = True
[5971]126       
[5982]127        if 'NDGSEC_AA_UNITTEST_DIR' not in os.environ:
128            os.environ['NDGSEC_AA_UNITTEST_DIR'
129                       ] = os.path.abspath(os.path.dirname(__file__))
130           
[6009]131        self.initDb()
[5982]132       
[5977]133    def test01TrySamlAttribute2SqlQuery__setattr__(self):
[5973]134        if self.skipTests:
135            return
136       
[5971]137        attributeInterface = SQLAlchemyAttributeInterface()
[5977]138       
139        # Define queries for SAML attribute names
[5982]140        attributeInterface.samlAttribute2SqlQuery_firstName = '"%s" "%s"' % (
[7698]141            ESGFSamlNamespaces.FIRSTNAME_ATTRNAME,                                                               
[5982]142            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY)
[5977]143           
[5982]144        setattr(attributeInterface, 
145                'samlAttribute2SqlQuery.lastName',
[7698]146                "%s %s" % (ESGFSamlNamespaces.LASTNAME_ATTRNAME,
[5982]147                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY))
[5977]148       
[5982]149        attributeInterface.samlAttribute2SqlQuery[
[7698]150            ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME] = (
[5982]151                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY)
[5977]152       
[5982]153        attributeInterface.samlAttribute2SqlQuery[
154            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]] = (
[5977]155            SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY)
156       
157    def test02SetProperties(self):
158        # test setProperties interface for instance attribute assignment
159        if self.skipTests:
160            return
161       
[5982]162        # samlAttribute2SqlQuery* suffixes have no particular requirement
163        # only that they are unique and start with an underscore or period.
[5977]164        properties = {
165            'connectionString': 
166                SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR,
167           
168            'samlSubjectSqlQuery':
169                SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY,
170               
[5982]171            'samlAttribute2SqlQuery.firstname': '"%s" "%s"' % (
[7698]172                ESGFSamlNamespaces.FIRSTNAME_ATTRNAME,
[5982]173                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY),
[5977]174           
[5982]175            'samlAttribute2SqlQuery.blah': '"%s" "%s"' % (
[7698]176                ESGFSamlNamespaces.LASTNAME_ATTRNAME,
[5982]177                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY),
[5977]178       
[5982]179            'samlAttribute2SqlQuery.3': '%s "%s"' % (
[7698]180            ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME,
[5982]181            SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY),
[5977]182       
[5982]183            'samlAttribute2SqlQuery_0': '%s %s' % (
184                SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0],
185                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY),
[5977]186           
187            'samlValidRequestorDNs': ('/O=STFC/OU=CEDA/CN=AuthorisationService',
188                                      '/O=ESG/OU=NCAR/CN=Gateway'),
189            'samlAssertionLifetime': 86400,
[5973]190
[5977]191        }
192        attributeInterface = SQLAlchemyAttributeInterface()
193        attributeInterface.setProperties(**properties)
194       
[5982]195        self.assert_(
196            attributeInterface.samlAttribute2SqlQuery[
[7698]197                ESGFSamlNamespaces.FIRSTNAME_ATTRNAME] == \
[5977]198            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY)
199       
200        self.assert_(attributeInterface.connectionString == \
201                     SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR)
202       
203        # Test constructor setting properties
204        attributeInterface2 = SQLAlchemyAttributeInterface(**properties)
205        self.assert_(attributeInterface2.samlAssertionLifetime.days == 1)
[5982]206
207    def test03FromConfigFile(self):
208        if self.skipTests:
209            return
210        cfgParser = CaseSensitiveConfigParser()
[6686]211        cls = SQLAlchemyAttributeInterfaceTestCase
212        cfgFilePath = cls.PROPERTIES_FILEPATH
[5982]213        cfgParser.read(cfgFilePath)
[5977]214       
[5982]215        cfg = dict(cfgParser.items('DEFAULT'))
216        attributeInterface = SQLAlchemyAttributeInterface()
217        attributeInterface.setProperties(prefix='attributeInterface.', **cfg)
218       
219        self.assert_(
220            attributeInterface.samlAttribute2SqlQuery[
[7698]221                ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME] == \
[5982]222            SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY)
223
224    def test04SamlAttributeQuery(self):
[5977]225        if self.skipTests:
226            return
227       
228        # Prepare a client query
229        attributeQuery = AttributeQuery()
230        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
231        attributeQuery.id = str(uuid4())
232        attributeQuery.issueInstant = datetime.utcnow()
233       
234        attributeQuery.issuer = Issuer()
235        attributeQuery.issuer.format = Issuer.X509_SUBJECT
236        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway'
237                       
238                       
239        attributeQuery.subject = Subject() 
240        attributeQuery.subject.nameID = NameID()
[7698]241        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
[5977]242        attributeQuery.subject.nameID.value = \
[6062]243                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI
[5977]244       
245        fnAttribute = Attribute()
[7698]246        fnAttribute.name = ESGFSamlNamespaces.FIRSTNAME_ATTRNAME
[6062]247        fnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
[5977]248        fnAttribute.friendlyName = "FirstName"
249
250        attributeQuery.attributes.append(fnAttribute)
251   
252        lnAttribute = Attribute()
[7698]253        lnAttribute.name = ESGFSamlNamespaces.LASTNAME_ATTRNAME
[6062]254        lnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
[5977]255        lnAttribute.friendlyName = "LastName"
256
257        attributeQuery.attributes.append(lnAttribute)
258   
259        emailAddressAttribute = Attribute()
[7698]260        emailAddressAttribute.name = ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME
[6062]261        emailAddressAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
262        emailAddressAttribute.friendlyName = "EmailAddress"
[5977]263
264        attributeQuery.attributes.append(emailAddressAttribute)                                   
265   
266        authzAttribute = Attribute()
267        authzAttribute.name = \
268            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]
[6062]269        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
[5977]270        authzAttribute.friendlyName = "authz"
271
272        attributeQuery.attributes.append(authzAttribute)                                   
273       
274        # Add the response - the interface will populate with an assertion as
[5982]275        # appropriate
[5977]276        samlResponse = Response()
277       
278        samlResponse.issueInstant = datetime.utcnow()
279        samlResponse.id = str(uuid4())
280        samlResponse.issuer = Issuer()
281       
282        # Initialise to success status but reset on error
283        samlResponse.status = Status()
284        samlResponse.status.statusCode = StatusCode()
285        samlResponse.status.statusMessage = StatusMessage()
286        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI
287       
288        # Nb. SAML 2.0 spec says issuer format must be omitted
289        samlResponse.issuer.value = "CEDA"
290       
291        samlResponse.inResponseTo = attributeQuery.id
292       
293        # Set up the interface object
294       
295        # Define queries for SAML attribute names
296        samlAttribute2SqlQuery = {
[7698]297            ESGFSamlNamespaces.FIRSTNAME_ATTRNAME: 
[5977]298                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY,
299           
[7698]300            ESGFSamlNamespaces.LASTNAME_ATTRNAME: 
[5977]301                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY,
302       
[7698]303            ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME: 
[5977]304                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY,
305       
[5982]306            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]: 
307                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY                   
[5977]308        }
309       
310        attributeInterface = SQLAlchemyAttributeInterface(
311                                samlAttribute2SqlQuery=samlAttribute2SqlQuery)
312       
313        attributeInterface.connectionString = \
314                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR
315               
316        attributeInterface.samlValidRequestorDNs = (
317            '/O=STFC/OU=CEDA/CN=AuthorisationService',
318            '/O=ESG/OU=NCAR/CN=Gateway')
319       
[6721]320        attributeInterface.setProperties(samlAssertionLifetime=28800.)
[5977]321       
322        attributeInterface.samlSubjectSqlQuery = (
323            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY)
324       
325        # Make the query
326        attributeInterface.getAttributes(attributeQuery, samlResponse)
327       
328        self.assert_(
329                samlResponse.status.statusCode.value == StatusCode.SUCCESS_URI)
330        self.assert_(samlResponse.inResponseTo == attributeQuery.id)
331        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
332                     attributeQuery.subject.nameID.value)
[5982]333        self.assert_(
334            samlResponse.assertions[0].attributeStatements[0].attributes[1
335                ].attributeValues[0].value == 'Kershaw')
336       
337        self.assert_(
338            len(samlResponse.assertions[0].attributeStatements[0].attributes[3
339                ].attributeValues) == \
340                    SQLAlchemyAttributeInterfaceTestCase.N_ATTRIBUTE_VALUES)
[5977]341
[6062]342    def test04SamlAttributeQuery(self):
343        if self.skipTests:
344            return
[5977]345       
[6062]346        # Prepare a client query
347        attributeQuery = AttributeQuery()
348        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
349        attributeQuery.id = str(uuid4())
350        attributeQuery.issueInstant = datetime.utcnow()
351       
352        attributeQuery.issuer = Issuer()
353        attributeQuery.issuer.format = Issuer.X509_SUBJECT
354        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway'
355                       
356                       
357        attributeQuery.subject = Subject() 
358        attributeQuery.subject.nameID = NameID()
[7698]359        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
[6062]360        attributeQuery.subject.nameID.value = \
361                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI
362   
363        emailAddressAttribute = Attribute()
[7698]364        emailAddressAttribute.name = ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME
[6062]365        emailAddressAttribute.nameFormat = "InvalidFormat"
366        emailAddressAttribute.friendlyName = "EmailAddress"
367
368        attributeQuery.attributes.append(emailAddressAttribute)                                   
369   
370        authzAttribute = Attribute()
371        authzAttribute.name = \
372            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]
373        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
374        authzAttribute.friendlyName = "authz"
375
376        attributeQuery.attributes.append(authzAttribute)                                   
377       
378        # Add the response - the interface will populate with an assertion as
379        # appropriate
380        samlResponse = Response()
381       
382        samlResponse.issueInstant = datetime.utcnow()
383        samlResponse.id = str(uuid4())
384        samlResponse.issuer = Issuer()
385       
386        # Initialise to success status but reset on error
387        samlResponse.status = Status()
388        samlResponse.status.statusCode = StatusCode()
389        samlResponse.status.statusMessage = StatusMessage()
390        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI
391       
392        # Nb. SAML 2.0 spec says issuer format must be omitted
393        samlResponse.issuer.value = "CEDA"
394       
395        samlResponse.inResponseTo = attributeQuery.id
396       
397        # Set up the interface object
398       
399        # Define queries for SAML attribute names
400        samlAttribute2SqlQuery = {
[7698]401            ESGFSamlNamespaces.FIRSTNAME_ATTRNAME: 
[6062]402                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY,
403           
[7698]404            ESGFSamlNamespaces.LASTNAME_ATTRNAME: 
[6062]405                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY,
406       
[7698]407            ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME: 
[6062]408                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY,
409       
410            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]: 
[6721]411                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY
[6062]412        }
413       
414        attributeInterface = SQLAlchemyAttributeInterface(
415                                samlAttribute2SqlQuery=samlAttribute2SqlQuery)
416       
417        attributeInterface.connectionString = \
418                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR
419               
420        attributeInterface.samlValidRequestorDNs = (
421            '/O=STFC/OU=CEDA/CN=AuthorisationService',
422            '/O=ESG/OU=NCAR/CN=Gateway')
423       
[6721]424        attributeInterface.setProperties(samlAssertionLifetime=28800.)
[6062]425       
426        attributeInterface.samlSubjectSqlQuery = (
427            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY)
428       
429        # Make the query
430        try:
431            attributeInterface.getAttributes(attributeQuery, samlResponse)
432        except InvalidAttributeFormat:
433            print("PASSED: caught InvalidAttributeFormat exception")
434        else:
435            self.fail("Expecting InvalidAttributeFormat exception")
436       
[4654]437if __name__ == "__main__":
438    unittest.main()
Note: See TracBrowser for help on using the repository browser.