source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/config/attributeauthority/sitea/sitea_attributeinterface.py @ 6720

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/config/attributeauthority/sitea/sitea_attributeinterface.py@6720
Revision 6720, 6.7 KB checked in by pjkersha, 11 years ago (diff)
  • Attribute Authority unit tests now pass with refactored Attribute Authority which has NDG Attribute Certificate and role mapping code removed.
  • Now refactoring client unit tests.
  • Removed NDG Attribute Certificate and XMLSec unit tests - no longer needed.
Line 
1"""NDG Attribute Authority User Roles class - acts as an interface between
2the data centre's user roles configuration and the Attribute Authority
3                                                                               
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "29/07/05"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id:siteAUserRoles.py 4371 2008-10-29 09:44:51Z pjkersha $'
12
13from datetime import datetime, timedelta
14from uuid import uuid4
15
16from ndg.saml.common.xml import SAMLConstants
17from ndg.saml.saml2.core import (Assertion, Attribute, AttributeStatement, Issuer,
18                             SAMLVersion, Subject, NameID, Conditions,
19                             XSStringAttributeValue)
20
21from ndg.security.common.X509 import X500DN
22from ndg.security.server.attributeauthority import (AttributeInterface, 
23                                                    InvalidRequestorId, 
24                                                    AttributeNotKnownError, 
25                                                    AttributeReleaseDenied, 
26                                                    UserIdNotKnown)
27from ndg.security.test.unit import BaseTestCase
28
29
30class TestUserRoles(AttributeInterface):
31    """Test User Roles class dynamic import for Attribute Authority"""
32    ATTRIBUTE_NAMES = BaseTestCase.ATTRIBUTE_NAMES
33    ATTRIBUTE_VALUES = BaseTestCase.ATTRIBUTE_VALUES
34
35    SAML_ATTRIBUTE_NAMES = ATTRIBUTE_NAMES + (
36        'urn:esg:email:address',
37        'urn:esg:first:name', 
38        'urn:esg:last:name'
39    )
40   
41    SAML_ATTRIBUTE_VALUES = (
42        ATTRIBUTE_VALUES,
43        ('p.kershaw@somewhere.ac.uk',),
44        ('Philip',),
45        ('Kershaw',)
46    )
47   
48    SAML_ATTRIBUTE_FRIENDLY_NAMES = ('',)*len(ATTRIBUTE_NAMES) + (
49        "EmailAddress",
50        "FirstName",
51        "LastName"
52    )
53    SAML_ATTRIBUTE_FORMATS = (SAMLConstants.XSD_NS+"#"+\
54                              XSStringAttributeValue.TYPE_LOCAL_NAME,) * \
55                              len(SAML_ATTRIBUTE_NAMES)
56    SAML_ATTRIBUTES = []
57   
58    for name, vals, format, friendlyName in zip(SAML_ATTRIBUTE_NAMES,
59                                               SAML_ATTRIBUTE_VALUES,
60                                               SAML_ATTRIBUTE_FORMATS,
61                                               SAML_ATTRIBUTE_FRIENDLY_NAMES):
62        SAML_ATTRIBUTES.append(Attribute())
63        SAML_ATTRIBUTES[-1].name = name
64        SAML_ATTRIBUTES[-1].nameFormat = format
65        SAML_ATTRIBUTES[-1].friendlyName = friendlyName
66        for val in vals:
67            SAML_ATTRIBUTES[-1].attributeValues.append(XSStringAttributeValue())
68            SAML_ATTRIBUTES[-1].attributeValues[-1].value = val
69
70    del name, val, vals, format, friendlyName
71   
72    # 8 hours validity for issued assertions
73    SAML_ASSERTION_LIFETIME = 8*60*60
74   
75    VALID_USER_IDS = ("https://openid.localhost/philip.kershaw",
76                      BaseTestCase.OPENID_URI)
77    VALID_REQUESTOR_IDS = BaseTestCase.VALID_REQUESTOR_IDS
78   
79    ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
80   
81    INSUFFICIENT_PRIVILEGES_REQUESTOR_ID = X500DN.fromString(
82                                        "/O=Site B/CN=Authorisation Service")
83   
84    def __init__(self, propertiesFilePath=None):
85        pass
86
87    def getRoles(self, userId):
88        return TestUserRoles.ATTRIBUTE_VALUES
89
90    def getAttributes(self, attributeQuery, response):
91        '''Test Attribute Authority SAML Attribute Query interface'''
92       
93        userId = attributeQuery.subject.nameID.value
94        requestedAttributeNames = [attribute.name
95                                   for attribute in attributeQuery.attributes]
96        if attributeQuery.issuer.format != Issuer.X509_SUBJECT:
97            raise InvalidRequestorId('Requestor issuer format "%s" is invalid' %
98                                     attributeQuery.issuerFormat.value)
99           
100        requestorId = X500DN.fromString(attributeQuery.issuer.value)
101       
102        if userId not in TestUserRoles.VALID_USER_IDS:
103            raise UserIdNotKnown('Subject Id "%s" is not known to this '
104                                 'authority' % userId)
105           
106        if requestorId not in TestUserRoles.VALID_REQUESTOR_IDS:
107            raise InvalidRequestorId('Requestor identity "%s" is invalid' %
108                                     requestorId)
109       
110        unknownAttrNames = [attrName for attrName in requestedAttributeNames
111                            if attrName not in 
112                            TestUserRoles.SAML_ATTRIBUTE_NAMES]
113       
114        if len(unknownAttrNames) > 0:
115            raise AttributeNotKnownError("Unknown attributes requested: %r" %
116                                         unknownAttrNames)
117           
118        if requestorId == TestUserRoles.INSUFFICIENT_PRIVILEGES_REQUESTOR_ID:
119            raise AttributeReleaseDenied("Attribute release denied for the "
120                                         'requestor "%s"' % requestorId)
121       
122        # Create a new assertion to hold the attributes to be returned
123        assertion = Assertion()
124       
125        assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
126        assertion.id = str(uuid4())
127        assertion.issueInstant = response.issueInstant
128   
129        assertion.issuer = Issuer()
130        assertion.issuer.value = TestUserRoles.ISSUER_NAME
131        assertion.issuer.format = Issuer.X509_SUBJECT
132       
133        assertion.conditions = Conditions()
134        assertion.conditions.notBefore = assertion.issueInstant
135        assertion.conditions.notOnOrAfter = assertion.conditions.notBefore + \
136            timedelta(seconds=TestUserRoles.SAML_ASSERTION_LIFETIME)
137       
138        assertion.subject = Subject() 
139        assertion.subject.nameID = NameID()
140        assertion.subject.nameID.format = attributeQuery.subject.nameID.format
141        assertion.subject.nameID.value = attributeQuery.subject.nameID.value
142
143        attributeStatement = AttributeStatement()
144       
145        # Add test set of attributes
146        for name in requestedAttributeNames:
147            attributeFound = False
148            for attribute in TestUserRoles.SAML_ATTRIBUTES:
149                if attribute.name == name:
150                    attributeFound = True
151                    break
152           
153            if attributeFound:
154                attributeStatement.attributes.append(attribute)
155            else:
156                raise AttributeNotKnownError("Unknown attribute requested: %s"%
157                                             name)
158 
159        assertion.attributeStatements.append(attributeStatement)       
160        response.assertions.append(assertion)
161 
Note: See TracBrowser for help on using the repository browser.