source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py @ 6720

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py@6720
Revision 6720, 18.1 KB checked in by pjkersha, 11 years ago (diff)
  • Attribute Authority unit tests now pass with refactored Attribute Authority which has NDG Attribute Certificate and role mapping code removed.
  • Now refactoring client unit tests.
  • Removed NDG Attribute Certificate and XMLSec unit tests - no longer needed.
  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/12/08"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12
13import unittest
14import os
15import sys
16import getpass
17import re
18import logging
19logging.basicConfig(level=logging.DEBUG)
20
21from warnings import warn
22from uuid import uuid4
23from datetime import datetime
24from os import path
25import pickle
26
27from ndg.security.test.unit import BaseTestCase
28
29from ndg.security.common.utils.configfileparsers import (
30    CaseSensitiveConfigParser)
31from ndg.security.server.attributeauthority import (AttributeAuthority, 
32    SQLAlchemyAttributeInterface, InvalidAttributeFormat, AttributeInterface)
33
34from ndg.saml.saml2.core import (Response, Attribute, SAMLVersion, Subject, 
35                                 NameID, Issuer, AttributeQuery, 
36                                 XSStringAttributeValue, Status, StatusMessage, 
37                                 StatusCode)
38from ndg.saml.xml import XMLConstants
39from ndg.security.common.saml_utils.esg import EsgSamlNamespaces
40
41THIS_DIR = path.dirname(__file__)
42
43
44class AttributeAuthorityTestCase(BaseTestCase):
45    THIS_DIR = THIS_DIR
46    PROPERTIES_FILENAME = 'test_attributeauthority.cfg'
47    PROPERTIES_FILEPATH = path.join(THIS_DIR, PROPERTIES_FILENAME)
48    ISSUER_NAME = '/O=My Organisation/OU=Centre/CN=Attribute Authority'
49    ASSERTION_LIFETIME = "86400"
50
51    def test01ParsePropertiesFile(self):
52        cls = AttributeAuthorityTestCase
53        aa = AttributeAuthority.fromPropertyFile(cls.PROPERTIES_FILEPATH)
54        self.assert_(aa)
55        self.assert_(aa.assertionLifetime == 3600)
56        self.assert_(aa.issuerName == cls.ISSUER_NAME)
57       
58    def _createAttributeAuthorityHelper(self):
59        """Helper method to creat an Attribute Authority instance for use with
60        tests
61        """
62       
63        cls = AttributeAuthorityTestCase
64       
65        attributeInterfaceClassName = ('ndg.security.server.attributeauthority.'
66                                       'AttributeInterface')
67       
68        aa = AttributeAuthority.fromProperties(issuerName=cls.ISSUER_NAME,
69                    assertionLifetime=cls.ASSERTION_LIFETIME,
70                    attributeInterface_className=attributeInterfaceClassName)
71       
72        return aa
73           
74    def test02FromProperties(self):
75       
76        cls = AttributeAuthorityTestCase
77        aa = self._createAttributeAuthorityHelper()
78       
79        self.assert_(aa)
80       
81        # Check lifetime property converted from string input to float
82        self.assert_(aa.assertionLifetime == float(cls.ASSERTION_LIFETIME))
83        self.assert_(aa.issuerName == cls.ISSUER_NAME)
84        self.assert_(isinstance(aa.attributeInterface, AttributeInterface))
85
86    def test03Pickle(self):
87        # Test pickling with __slots__
88        aa = self._createAttributeAuthorityHelper()       
89        jar = pickle.dumps(aa)
90        aa2 = pickle.loads(jar)
91       
92        self.assert_(aa2)
93        self.assert_(aa2.assertionLifetime == aa.assertionLifetime)
94        self.assert_(aa2.issuerName == aa.issuerName)
95        self.assert_(isinstance(aa2.attributeInterface, AttributeInterface))
96   
97       
98class SQLAlchemyAttributeInterfaceTestCase(BaseTestCase):
99    THIS_DIR = THIS_DIR
100    PROPERTIES_FILENAME = 'test_sqlalchemyattributeinterface.cfg'
101    PROPERTIES_FILEPATH = path.join(THIS_DIR, PROPERTIES_FILENAME)
102   
103    SAML_SUBJECT_SQLQUERY = ("select count(*) from users where openid = "
104                             "'${userId}'")
105   
106    SAML_FIRSTNAME_SQLQUERY = ("select firstname from users where openid = "
107                               "'${userId}'")
108           
109    SAML_LASTNAME_SQLQUERY = ("select lastname from users where openid = "
110                              "'${userId}'")
111       
112    SAML_EMAILADDRESS_SQLQUERY = ("select emailaddress from users where "
113                                  "openid = '${userId}'")
114       
115    SAML_ATTRIBUTES_SQLQUERY = ("select attributename from attributes, users "
116                                "where users.openid = '${userId}' and "
117                                "attributes.username = users.username")
118                               
119    def __init__(self, *arg, **kw):
120        super(SQLAlchemyAttributeInterfaceTestCase, self).__init__(*arg, **kw)
121        self.skipTests = False
122        try:
123            import sqlalchemy
124
125        except NotImplementedError:
126            # Don't proceed with tests because SQLAlchemy is not installed
127            warn("Skipping SQLAlchemyAttributeInterfaceTestCase because "
128                 "SQLAlchemy is not installed")
129            self.skipTests = True
130       
131        if 'NDGSEC_AA_UNITTEST_DIR' not in os.environ:
132            os.environ['NDGSEC_AA_UNITTEST_DIR'
133                       ] = os.path.abspath(os.path.dirname(__file__))
134           
135        self.initDb()
136       
137    def test01TrySamlAttribute2SqlQuery__setattr__(self):
138        if self.skipTests:
139            return
140       
141        attributeInterface = SQLAlchemyAttributeInterface()
142       
143        # Define queries for SAML attribute names
144        attributeInterface.samlAttribute2SqlQuery_firstName = '"%s" "%s"' % (
145            EsgSamlNamespaces.FIRSTNAME_ATTRNAME,                                                               
146            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY)
147           
148        setattr(attributeInterface, 
149                'samlAttribute2SqlQuery.lastName',
150                "%s %s" % (EsgSamlNamespaces.LASTNAME_ATTRNAME,
151                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY))
152       
153        attributeInterface.samlAttribute2SqlQuery[
154            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME] = (
155                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY)
156       
157        attributeInterface.samlAttribute2SqlQuery[
158            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]] = (
159            SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY)
160       
161    def test02SetProperties(self):
162        # test setProperties interface for instance attribute assignment
163        if self.skipTests:
164            return
165       
166        # samlAttribute2SqlQuery* suffixes have no particular requirement
167        # only that they are unique and start with an underscore or period.
168        properties = {
169            'connectionString': 
170                SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR,
171           
172            'samlSubjectSqlQuery':
173                SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY,
174               
175            'samlAttribute2SqlQuery.firstname': '"%s" "%s"' % (
176                EsgSamlNamespaces.FIRSTNAME_ATTRNAME,
177                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY),
178           
179            'samlAttribute2SqlQuery.blah': '"%s" "%s"' % (
180                EsgSamlNamespaces.LASTNAME_ATTRNAME,
181                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY),
182       
183            'samlAttribute2SqlQuery.3': '%s "%s"' % (
184            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME,
185            SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY),
186       
187            'samlAttribute2SqlQuery_0': '%s %s' % (
188                SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0],
189                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY),
190           
191            'samlValidRequestorDNs': ('/O=STFC/OU=CEDA/CN=AuthorisationService',
192                                      '/O=ESG/OU=NCAR/CN=Gateway'),
193            'samlAssertionLifetime': 86400,
194
195        }
196        attributeInterface = SQLAlchemyAttributeInterface()
197        attributeInterface.setProperties(**properties)
198       
199        self.assert_(
200            attributeInterface.samlAttribute2SqlQuery[
201                EsgSamlNamespaces.FIRSTNAME_ATTRNAME] == \
202            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY)
203       
204        self.assert_(attributeInterface.connectionString == \
205                     SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR)
206       
207        # Test constructor setting properties
208        attributeInterface2 = SQLAlchemyAttributeInterface(**properties)
209        self.assert_(attributeInterface2.samlAssertionLifetime.days == 1)
210
211    def test03FromConfigFile(self):
212        if self.skipTests:
213            return
214        cfgParser = CaseSensitiveConfigParser()
215        cls = SQLAlchemyAttributeInterfaceTestCase
216        cfgFilePath = cls.PROPERTIES_FILEPATH
217        cfgParser.read(cfgFilePath)
218       
219        cfg = dict(cfgParser.items('DEFAULT'))
220        attributeInterface = SQLAlchemyAttributeInterface()
221        attributeInterface.setProperties(prefix='attributeInterface.', **cfg)
222       
223        self.assert_(
224            attributeInterface.samlAttribute2SqlQuery[
225                EsgSamlNamespaces.EMAILADDRESS_ATTRNAME] == \
226            SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY)
227
228    def test04SamlAttributeQuery(self):
229        if self.skipTests:
230            return
231       
232        # Prepare a client query
233        attributeQuery = AttributeQuery()
234        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
235        attributeQuery.id = str(uuid4())
236        attributeQuery.issueInstant = datetime.utcnow()
237       
238        attributeQuery.issuer = Issuer()
239        attributeQuery.issuer.format = Issuer.X509_SUBJECT
240        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway'
241                       
242                       
243        attributeQuery.subject = Subject() 
244        attributeQuery.subject.nameID = NameID()
245        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
246        attributeQuery.subject.nameID.value = \
247                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI
248       
249        fnAttribute = Attribute()
250        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
251        fnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
252        fnAttribute.friendlyName = "FirstName"
253
254        attributeQuery.attributes.append(fnAttribute)
255   
256        lnAttribute = Attribute()
257        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
258        lnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
259        lnAttribute.friendlyName = "LastName"
260
261        attributeQuery.attributes.append(lnAttribute)
262   
263        emailAddressAttribute = Attribute()
264        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
265        emailAddressAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
266        emailAddressAttribute.friendlyName = "EmailAddress"
267
268        attributeQuery.attributes.append(emailAddressAttribute)                                   
269   
270        authzAttribute = Attribute()
271        authzAttribute.name = \
272            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]
273        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
274        authzAttribute.friendlyName = "authz"
275
276        attributeQuery.attributes.append(authzAttribute)                                   
277       
278        # Add the response - the interface will populate with an assertion as
279        # appropriate
280        samlResponse = Response()
281       
282        samlResponse.issueInstant = datetime.utcnow()
283        samlResponse.id = str(uuid4())
284        samlResponse.issuer = Issuer()
285       
286        # Initialise to success status but reset on error
287        samlResponse.status = Status()
288        samlResponse.status.statusCode = StatusCode()
289        samlResponse.status.statusMessage = StatusMessage()
290        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI
291       
292        # Nb. SAML 2.0 spec says issuer format must be omitted
293        samlResponse.issuer.value = "CEDA"
294       
295        samlResponse.inResponseTo = attributeQuery.id
296       
297        # Set up the interface object
298       
299        # Define queries for SAML attribute names
300        samlAttribute2SqlQuery = {
301            EsgSamlNamespaces.FIRSTNAME_ATTRNAME: 
302                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY,
303           
304            EsgSamlNamespaces.LASTNAME_ATTRNAME: 
305                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY,
306       
307            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME: 
308                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY,
309       
310            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]: 
311                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY                   
312        }
313       
314        attributeInterface = SQLAlchemyAttributeInterface(
315                                samlAttribute2SqlQuery=samlAttribute2SqlQuery)
316       
317        attributeInterface.connectionString = \
318                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR
319               
320        attributeInterface.samlValidRequestorDNs = (
321            '/O=STFC/OU=CEDA/CN=AuthorisationService',
322            '/O=ESG/OU=NCAR/CN=Gateway')
323       
324        attributeInterface.setProperties(samlAssertionLifetime=28800.,
325                                issuerName='/CN=Attribute Authority/O=Site A')
326       
327        attributeInterface.samlSubjectSqlQuery = (
328            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY)
329       
330        # Make the query
331        attributeInterface.getAttributes(attributeQuery, samlResponse)
332       
333        self.assert_(
334                samlResponse.status.statusCode.value == StatusCode.SUCCESS_URI)
335        self.assert_(samlResponse.inResponseTo == attributeQuery.id)
336        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
337                     attributeQuery.subject.nameID.value)
338        self.assert_(
339            samlResponse.assertions[0].attributeStatements[0].attributes[1
340                ].attributeValues[0].value == 'Kershaw')
341       
342        self.assert_(
343            len(samlResponse.assertions[0].attributeStatements[0].attributes[3
344                ].attributeValues) == \
345                    SQLAlchemyAttributeInterfaceTestCase.N_ATTRIBUTE_VALUES)
346
347    def test04SamlAttributeQuery(self):
348        if self.skipTests:
349            return
350       
351        # Prepare a client query
352        attributeQuery = AttributeQuery()
353        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
354        attributeQuery.id = str(uuid4())
355        attributeQuery.issueInstant = datetime.utcnow()
356       
357        attributeQuery.issuer = Issuer()
358        attributeQuery.issuer.format = Issuer.X509_SUBJECT
359        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway'
360                       
361                       
362        attributeQuery.subject = Subject() 
363        attributeQuery.subject.nameID = NameID()
364        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
365        attributeQuery.subject.nameID.value = \
366                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI
367   
368        emailAddressAttribute = Attribute()
369        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
370        emailAddressAttribute.nameFormat = "InvalidFormat"
371        emailAddressAttribute.friendlyName = "EmailAddress"
372
373        attributeQuery.attributes.append(emailAddressAttribute)                                   
374   
375        authzAttribute = Attribute()
376        authzAttribute.name = \
377            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]
378        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT
379        authzAttribute.friendlyName = "authz"
380
381        attributeQuery.attributes.append(authzAttribute)                                   
382       
383        # Add the response - the interface will populate with an assertion as
384        # appropriate
385        samlResponse = Response()
386       
387        samlResponse.issueInstant = datetime.utcnow()
388        samlResponse.id = str(uuid4())
389        samlResponse.issuer = Issuer()
390       
391        # Initialise to success status but reset on error
392        samlResponse.status = Status()
393        samlResponse.status.statusCode = StatusCode()
394        samlResponse.status.statusMessage = StatusMessage()
395        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI
396       
397        # Nb. SAML 2.0 spec says issuer format must be omitted
398        samlResponse.issuer.value = "CEDA"
399       
400        samlResponse.inResponseTo = attributeQuery.id
401       
402        # Set up the interface object
403       
404        # Define queries for SAML attribute names
405        samlAttribute2SqlQuery = {
406            EsgSamlNamespaces.FIRSTNAME_ATTRNAME: 
407                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY,
408           
409            EsgSamlNamespaces.LASTNAME_ATTRNAME: 
410                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY,
411       
412            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME: 
413                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY,
414       
415            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]: 
416                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY                   
417        }
418       
419        attributeInterface = SQLAlchemyAttributeInterface(
420                                samlAttribute2SqlQuery=samlAttribute2SqlQuery)
421       
422        attributeInterface.connectionString = \
423                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR
424               
425        attributeInterface.samlValidRequestorDNs = (
426            '/O=STFC/OU=CEDA/CN=AuthorisationService',
427            '/O=ESG/OU=NCAR/CN=Gateway')
428       
429        attributeInterface.setProperties(samlAssertionLifetime=28800.,
430                                issuerName='/CN=Attribute Authority/O=Site A')
431       
432        attributeInterface.samlSubjectSqlQuery = (
433            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY)
434       
435        # Make the query
436        try:
437            attributeInterface.getAttributes(attributeQuery, samlResponse)
438        except InvalidAttributeFormat:
439            print("PASSED: caught InvalidAttributeFormat exception")
440        else:
441            self.fail("Expecting InvalidAttributeFormat exception")
442       
443if __name__ == "__main__":
444    unittest.main()
Note: See TracBrowser for help on using the repository browser.