1 | """NDG Attribute Authority User Roles class - acts as an interface between |
---|
2 | the data centre's user roles configuration and the Attribute Authority |
---|
3 | |
---|
4 | NERC Data Grid Project |
---|
5 | """ |
---|
6 | __author__ = "P J Kershaw" |
---|
7 | __date__ = "29/07/05" |
---|
8 | __copyright__ = "(C) 2009 Science and Technology Facilities Council" |
---|
9 | __license__ = "BSD - see LICENSE file in top-level directory" |
---|
10 | __contact__ = "Philip.Kershaw@stfc.ac.uk" |
---|
11 | __revision__ = '$Id:siteAUserRoles.py 4371 2008-10-29 09:44:51Z pjkersha $' |
---|
12 | |
---|
13 | from datetime import datetime, timedelta |
---|
14 | from uuid import uuid4 |
---|
15 | |
---|
16 | from ndg.saml.common.xml import SAMLConstants |
---|
17 | from ndg.saml.saml2.core import (Assertion, Attribute, AttributeStatement, Issuer, |
---|
18 | SAMLVersion, Subject, NameID, Conditions, |
---|
19 | XSStringAttributeValue) |
---|
20 | |
---|
21 | from ndg.security.common.X509 import X500DN |
---|
22 | from ndg.security.server.attributeauthority import (AttributeInterface, |
---|
23 | InvalidRequestorId, |
---|
24 | AttributeNotKnownError, |
---|
25 | AttributeReleaseDenied, |
---|
26 | UserIdNotKnown) |
---|
27 | from ndg.security.test.unit import BaseTestCase |
---|
28 | |
---|
29 | |
---|
30 | class TestUserRoles(AttributeInterface): |
---|
31 | """Test User Roles class dynamic import for Attribute Authority""" |
---|
32 | ATTRIBUTE_NAMES = BaseTestCase.ATTRIBUTE_NAMES |
---|
33 | ATTRIBUTE_VALUES = BaseTestCase.ATTRIBUTE_VALUES |
---|
34 | |
---|
35 | SAML_ATTRIBUTE_NAMES = ATTRIBUTE_NAMES + ( |
---|
36 | 'urn:esg:email:address', |
---|
37 | 'urn:esg:first:name', |
---|
38 | 'urn:esg:last:name' |
---|
39 | ) |
---|
40 | |
---|
41 | SAML_ATTRIBUTE_VALUES = ( |
---|
42 | ATTRIBUTE_VALUES, |
---|
43 | ('p.kershaw@somewhere.ac.uk',), |
---|
44 | ('Philip',), |
---|
45 | ('Kershaw',) |
---|
46 | ) |
---|
47 | |
---|
48 | SAML_ATTRIBUTE_FRIENDLY_NAMES = ('',)*len(ATTRIBUTE_NAMES) + ( |
---|
49 | "EmailAddress", |
---|
50 | "FirstName", |
---|
51 | "LastName" |
---|
52 | ) |
---|
53 | SAML_ATTRIBUTE_FORMATS = (SAMLConstants.XSD_NS+"#"+\ |
---|
54 | XSStringAttributeValue.TYPE_LOCAL_NAME,) * \ |
---|
55 | len(SAML_ATTRIBUTE_NAMES) |
---|
56 | SAML_ATTRIBUTES = [] |
---|
57 | |
---|
58 | for name, vals, format, friendlyName in zip(SAML_ATTRIBUTE_NAMES, |
---|
59 | SAML_ATTRIBUTE_VALUES, |
---|
60 | SAML_ATTRIBUTE_FORMATS, |
---|
61 | SAML_ATTRIBUTE_FRIENDLY_NAMES): |
---|
62 | SAML_ATTRIBUTES.append(Attribute()) |
---|
63 | SAML_ATTRIBUTES[-1].name = name |
---|
64 | SAML_ATTRIBUTES[-1].nameFormat = format |
---|
65 | SAML_ATTRIBUTES[-1].friendlyName = friendlyName |
---|
66 | for val in vals: |
---|
67 | SAML_ATTRIBUTES[-1].attributeValues.append(XSStringAttributeValue()) |
---|
68 | SAML_ATTRIBUTES[-1].attributeValues[-1].value = val |
---|
69 | |
---|
70 | del name, val, vals, format, friendlyName |
---|
71 | |
---|
72 | # 8 hours validity for issued assertions |
---|
73 | SAML_ASSERTION_LIFETIME = 8*60*60 |
---|
74 | |
---|
75 | VALID_USER_IDS = ("https://openid.localhost/philip.kershaw", |
---|
76 | BaseTestCase.OPENID_URI) |
---|
77 | VALID_REQUESTOR_IDS = BaseTestCase.VALID_REQUESTOR_IDS |
---|
78 | |
---|
79 | INSUFFICIENT_PRIVILEGES_REQUESTOR_ID = X500DN.fromString( |
---|
80 | "/O=Site B/CN=Authorisation Service") |
---|
81 | |
---|
82 | def __init__(self, propertiesFilePath=None): |
---|
83 | pass |
---|
84 | |
---|
85 | def getRoles(self, userId): |
---|
86 | return TestUserRoles.ATTRIBUTE_VALUES |
---|
87 | |
---|
88 | def getAttributes(self, attributeQuery, response): |
---|
89 | '''Test Attribute Authority SAML Attribute Query interface''' |
---|
90 | |
---|
91 | userId = attributeQuery.subject.nameID.value |
---|
92 | requestedAttributeNames = [attribute.name |
---|
93 | for attribute in attributeQuery.attributes] |
---|
94 | if attributeQuery.issuer.format != Issuer.X509_SUBJECT: |
---|
95 | raise InvalidRequestorId('Requestor issuer format "%s" is invalid' % |
---|
96 | attributeQuery.issuerFormat.value) |
---|
97 | |
---|
98 | requestorId = X500DN.fromString(attributeQuery.issuer.value) |
---|
99 | |
---|
100 | if userId not in TestUserRoles.VALID_USER_IDS: |
---|
101 | raise UserIdNotKnown('Subject Id "%s" is not known to this ' |
---|
102 | 'authority' % userId) |
---|
103 | |
---|
104 | if requestorId not in TestUserRoles.VALID_REQUESTOR_IDS: |
---|
105 | raise InvalidRequestorId('Requestor identity "%s" is invalid' % |
---|
106 | requestorId) |
---|
107 | |
---|
108 | unknownAttrNames = [attrName for attrName in requestedAttributeNames |
---|
109 | if attrName not in |
---|
110 | TestUserRoles.SAML_ATTRIBUTE_NAMES] |
---|
111 | |
---|
112 | if len(unknownAttrNames) > 0: |
---|
113 | raise AttributeNotKnownError("Unknown attributes requested: %r" % |
---|
114 | unknownAttrNames) |
---|
115 | |
---|
116 | if requestorId == TestUserRoles.INSUFFICIENT_PRIVILEGES_REQUESTOR_ID: |
---|
117 | raise AttributeReleaseDenied("Attribute release denied for the " |
---|
118 | 'requestor "%s"' % requestorId) |
---|
119 | |
---|
120 | # Create a new assertion to hold the attributes to be returned |
---|
121 | assertion = Assertion() |
---|
122 | |
---|
123 | assertion.version = SAMLVersion(SAMLVersion.VERSION_20) |
---|
124 | assertion.id = str(uuid4()) |
---|
125 | assertion.issueInstant = response.issueInstant |
---|
126 | |
---|
127 | assertion.issuer = Issuer() |
---|
128 | assertion.issuer.value = response.issuer.value |
---|
129 | assertion.issuer.format = Issuer.X509_SUBJECT |
---|
130 | |
---|
131 | assertion.conditions = Conditions() |
---|
132 | assertion.conditions.notBefore = assertion.issueInstant |
---|
133 | assertion.conditions.notOnOrAfter = assertion.conditions.notBefore + \ |
---|
134 | timedelta(seconds=TestUserRoles.SAML_ASSERTION_LIFETIME) |
---|
135 | |
---|
136 | assertion.subject = Subject() |
---|
137 | assertion.subject.nameID = NameID() |
---|
138 | assertion.subject.nameID.format = attributeQuery.subject.nameID.format |
---|
139 | assertion.subject.nameID.value = attributeQuery.subject.nameID.value |
---|
140 | |
---|
141 | attributeStatement = AttributeStatement() |
---|
142 | |
---|
143 | # Add test set of attributes |
---|
144 | for name in requestedAttributeNames: |
---|
145 | attributeFound = False |
---|
146 | for attribute in TestUserRoles.SAML_ATTRIBUTES: |
---|
147 | if attribute.name == name: |
---|
148 | attributeFound = True |
---|
149 | break |
---|
150 | |
---|
151 | if attributeFound: |
---|
152 | attributeStatement.attributes.append(attribute) |
---|
153 | else: |
---|
154 | raise AttributeNotKnownError("Unknown attribute requested: %s"% |
---|
155 | name) |
---|
156 | |
---|
157 | assertion.attributeStatements.append(attributeStatement) |
---|
158 | response.assertions.append(assertion) |
---|
159 | |
---|