source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py @ 7698

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py@7698
Revision 7698, 15.0 KB checked in by pjkersha, 10 years ago (diff)

Integrated SAML ESGF Group/Role? attribute value type into SAML Attribute Authority client unit tests.

  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SAML SOAP Binding client unit tests
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/02/10 (moved from test_attributeauthorityclient)"
8__copyright__ = "(C) 2010 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14import os
15from datetime import datetime
16from uuid import uuid4
17from xml.etree import ElementTree
18
19from ndg.saml.common import SAMLVersion
20from ndg.saml.common.xml import SAMLConstants
21from ndg.saml.xml.etree import AttributeQueryElementTree, ResponseElementTree
22from ndg.saml.saml2.core import (Subject, Issuer, Attribute, NameID, 
23                                 AttributeQuery, StatusCode, 
24                                 XSStringAttributeValue)
25
26from ndg.saml.saml2.binding.soap.client import SOAPBinding
27from ndg.saml.saml2.binding.soap.client.attributequery import (
28                                        AttributeQuerySOAPBinding, 
29                                        AttributeQuerySslSOAPBinding)
30from ndg.security.common.saml_utils.esgf import (ESGFSamlNamespaces,
31                                                 ESGFDefaultQueryAttributes,
32                                                 ESGFGroupRoleAttributeValue)
33from ndg.security.common.saml_utils.esgf.xml.etree import (
34                                        ESGFGroupRoleAttributeValueElementTree,
35                                        ESGFResponseElementTree)
36from ndg.security.common.utils.etree import prettyPrint
37from ndg.security.test.unit.attributeauthorityclient import \
38                                        AttributeAuthorityClientBaseTestCase
39   
40   
41class AttributeAuthoritySAMLInterfaceTestCase(
42                                        AttributeAuthorityClientBaseTestCase):
43    """NDG Attribute Authority SAML SOAP Binding client unit tests"""
44    HERE_DIR = os.path.dirname(__file__)
45    CONFIG_FILENAME = 'test_samlattributeauthorityclient.cfg'
46    CONFIG_FILEPATH = os.path.join(HERE_DIR, CONFIG_FILENAME)
47   
48    def __init__(self, *arg, **kw):
49        super(AttributeAuthoritySAMLInterfaceTestCase, self).__init__(*arg, 
50                                                                      **kw)
51       
52        # Run same config but on two different ports - one HTTP and one HTTPS
53        self.startSiteAAttributeAuthority()
54        self.startSiteAAttributeAuthority(withSSL=True, port=5443)
55       
56    def test01AttributeQuery(self):
57        _cfg = self.cfg['test01AttributeQuery']
58       
59        attributeQuery = AttributeQuery()
60        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
61        attributeQuery.id = str(uuid4())
62        attributeQuery.issueInstant = datetime.utcnow()
63       
64        attributeQuery.issuer = Issuer()
65        attributeQuery.issuer.format = Issuer.X509_SUBJECT
66        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
67                       
68        attributeQuery.subject = Subject()
69        attributeQuery.subject.nameID = NameID()
70        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
71        attributeQuery.subject.nameID.value = _cfg['subject']
72        xsStringNs = SAMLConstants.XSD_NS+"#"+\
73                                        XSStringAttributeValue.TYPE_LOCAL_NAME
74        fnAttribute = Attribute()
75        fnAttribute.name = ESGFSamlNamespaces.FIRSTNAME_ATTRNAME
76        fnAttribute.nameFormat = xsStringNs
77        fnAttribute.friendlyName = "FirstName"
78
79        attributeQuery.attributes.append(fnAttribute)
80   
81        lnAttribute = Attribute()
82        lnAttribute.name = ESGFSamlNamespaces.LASTNAME_ATTRNAME
83        lnAttribute.nameFormat = xsStringNs
84        lnAttribute.friendlyName = "LastName"
85
86        attributeQuery.attributes.append(lnAttribute)
87   
88        emailAddressAttribute = Attribute()
89        emailAddressAttribute.name = ESGFSamlNamespaces.EMAILADDRESS_ATTRNAME
90        emailAddressAttribute.nameFormat = xsStringNs
91        emailAddressAttribute.friendlyName = "emailAddress"
92       
93        attributeQuery.attributes.append(emailAddressAttribute) 
94
95        siteAAttribute = Attribute()
96        siteAAttribute.name = _cfg['siteAttributeName']
97        siteAAttribute.nameFormat = xsStringNs
98       
99        attributeQuery.attributes.append(siteAAttribute) 
100
101        binding = SOAPBinding()
102        binding.serialise = AttributeQueryElementTree.toXML
103        binding.deserialise = ResponseElementTree.fromXML
104        response = binding.send(attributeQuery, _cfg['uri'])
105       
106        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
107       
108        # Check Query ID matches the query ID the service received
109        self.assert_(response.inResponseTo == attributeQuery.id)
110       
111        now = datetime.utcnow()
112        self.assert_(response.issueInstant < now)
113        self.assert_(response.assertions[-1].issueInstant < now)       
114        self.assert_(response.assertions[-1].conditions.notBefore < now) 
115        self.assert_(response.assertions[-1].conditions.notOnOrAfter > now)
116         
117        samlResponseElem = ResponseElementTree.toXML(response)
118       
119        print("SAML Response ...")
120        print(ElementTree.tostring(samlResponseElem))
121        print("Pretty print SAML Response ...")
122        print(prettyPrint(samlResponseElem))
123             
124    def test02AttributeQueryInvalidIssuer(self):
125        _cfg = self.cfg['test02AttributeQueryInvalidIssuer']
126       
127        attributeQuery = AttributeQuery()
128        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
129        attributeQuery.id = str(uuid4())
130        attributeQuery.issueInstant = datetime.utcnow()
131       
132        attributeQuery.issuer = Issuer()
133        attributeQuery.issuer.format = Issuer.X509_SUBJECT
134        attributeQuery.issuer.value = "/O=Invalid Site/CN=PDP"   
135                       
136        attributeQuery.subject = Subject() 
137        attributeQuery.subject.nameID = NameID()
138        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
139        attributeQuery.subject.nameID.value = _cfg['subject']
140        xsStringNs = SAMLConstants.XSD_NS+"#"+\
141                                        XSStringAttributeValue.TYPE_LOCAL_NAME
142
143        siteAAttribute = Attribute()
144        siteAAttribute.name = _cfg['siteAttributeName']
145        siteAAttribute.nameFormat = xsStringNs
146       
147        attributeQuery.attributes.append(siteAAttribute) 
148
149        binding = SOAPBinding()
150        binding.serialise = AttributeQueryElementTree.toXML
151        binding.deserialise = ResponseElementTree.fromXML
152        response = binding.send(attributeQuery, _cfg['uri'])
153
154        samlResponseElem = ResponseElementTree.toXML(response)
155       
156        print("SAML Response ...")
157        print(ElementTree.tostring(samlResponseElem))
158        print("Pretty print SAML Response ...")
159        print(prettyPrint(samlResponseElem))
160       
161        self.assert_(
162            response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI)
163                   
164    def test03AttributeQueryUnknownSubject(self):
165        _cfg = self.cfg['test03AttributeQueryUnknownSubject']
166       
167        attributeQuery = AttributeQuery()
168        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
169        attributeQuery.id = str(uuid4())
170        attributeQuery.issueInstant = datetime.utcnow()
171       
172        attributeQuery.issuer = Issuer()
173        attributeQuery.issuer.format = Issuer.X509_SUBJECT
174        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
175                       
176        attributeQuery.subject = Subject() 
177        attributeQuery.subject.nameID = NameID()
178        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
179        attributeQuery.subject.nameID.value = _cfg['subject']
180        xsStringNs = SAMLConstants.XSD_NS+"#"+\
181                                        XSStringAttributeValue.TYPE_LOCAL_NAME
182
183        siteAAttribute = Attribute()
184        siteAAttribute.name = _cfg['siteAttributeName']
185        siteAAttribute.nameFormat = xsStringNs
186       
187        attributeQuery.attributes.append(siteAAttribute) 
188
189        binding = SOAPBinding()
190        binding.serialise = AttributeQueryElementTree.toXML
191        binding.deserialise = ResponseElementTree.fromXML
192        response = binding.send(attributeQuery, _cfg['uri'])
193       
194        samlResponseElem = ResponseElementTree.toXML(response)
195        print("SAML Response ...")
196        print(ElementTree.tostring(samlResponseElem))
197        print("Pretty print SAML Response ...")
198        print(prettyPrint(samlResponseElem))
199       
200        self.assert_(
201            response.status.statusCode.value==StatusCode.UNKNOWN_PRINCIPAL_URI)
202             
203    def test04AttributeQueryInvalidAttrName(self):
204        thisSection = 'test04AttributeQueryInvalidAttrName'
205        _cfg = self.cfg[thisSection]
206       
207        attributeQuery = AttributeQuery()
208        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
209        attributeQuery.id = str(uuid4())
210        attributeQuery.issueInstant = datetime.utcnow()
211       
212        attributeQuery.issuer = Issuer()
213        attributeQuery.issuer.format = Issuer.X509_SUBJECT
214        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
215                       
216        attributeQuery.subject = Subject() 
217        attributeQuery.subject.nameID = NameID()
218        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
219        attributeQuery.subject.nameID.value = _cfg['subject']
220        xsStringNs = SAMLConstants.XSD_NS+"#"+\
221                                        XSStringAttributeValue.TYPE_LOCAL_NAME
222
223        invalidAttribute = Attribute()
224        invalidAttribute.name = "myInvalidAttributeName"
225        invalidAttribute.nameFormat = xsStringNs
226       
227        attributeQuery.attributes.append(invalidAttribute) 
228
229        binding = SOAPBinding.fromConfig(
230                     AttributeAuthoritySAMLInterfaceTestCase.CONFIG_FILEPATH, 
231                     prefix='saml.', 
232                     section=thisSection)
233        response = binding.send(attributeQuery, _cfg['uri'])
234       
235        samlResponseElem = ResponseElementTree.toXML(response)
236       
237        print("SAML Response ...")
238        print(ElementTree.tostring(samlResponseElem))
239        print("Pretty print SAML Response ...")
240        print(prettyPrint(samlResponseElem))
241       
242        self.assert_(response.status.statusCode.value==\
243                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
244             
245    def test05AttributeQueryWithESGFAttributeType(self):
246        # Test interface with custom ESGF Group/Role attribute type
247        thisSection = 'test05AttributeQueryWithESGFAttributeType'
248        _cfg = self.cfg[thisSection]
249       
250        attributeQuery = AttributeQuery()
251        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
252        attributeQuery.id = str(uuid4())
253        attributeQuery.issueInstant = datetime.utcnow()
254       
255        attributeQuery.issuer = Issuer()
256        attributeQuery.issuer.format = Issuer.X509_SUBJECT
257        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
258                       
259        attributeQuery.subject = Subject() 
260        attributeQuery.subject.nameID = NameID()
261        attributeQuery.subject.nameID.format = ESGFSamlNamespaces.NAMEID_FORMAT
262        attributeQuery.subject.nameID.value = _cfg['subject']
263       
264        groupRoleAttribute = Attribute()
265        groupRoleAttribute.name = 'urn:esg:sitea:grouprole'
266        groupRoleAttribute.nameFormat = \
267            ESGFGroupRoleAttributeValue.TYPE_LOCAL_NAME
268       
269        attributeQuery.attributes.append(groupRoleAttribute) 
270
271        binding = SOAPBinding.fromConfig(
272                     AttributeAuthoritySAMLInterfaceTestCase.CONFIG_FILEPATH, 
273                     prefix='saml.',
274                     section=thisSection)
275       
276        response = binding.send(attributeQuery, _cfg['uri'])
277       
278        samlResponseElem = ResponseElementTree.toXML(response)
279       
280        print("SAML Response ...")
281        print(ElementTree.tostring(samlResponseElem))
282        print("Pretty print SAML Response ...")
283        print(prettyPrint(samlResponseElem))
284       
285        self.assert_(response.assertions[0].attributeStatements[0].attributes[0
286            ].attributeValues[0].value == ('siteagroup', 'default'))
287       
288        self.assert_(response.status.statusCode.value == StatusCode.SUCCESS_URI)
289       
290    def test06AttributeQuerySOAPBindingInterface(self):
291        _cfg = self.cfg['test06AttributeQuerySOAPBindingInterface']
292       
293        binding = AttributeQuerySOAPBinding()
294       
295        binding.subjectID = AttributeAuthoritySAMLInterfaceTestCase.OPENID_URI
296        binding.subjectIdFormat = ESGFSamlNamespaces.NAMEID_FORMAT
297        binding.issuerName = \
298            str(AttributeAuthoritySAMLInterfaceTestCase.VALID_REQUESTOR_IDS[0])
299        binding.issuerFormat = Issuer.X509_SUBJECT
300       
301        binding.queryAttributes = ESGFDefaultQueryAttributes.ATTRIBUTES
302       
303        response = binding.send(uri=_cfg['uri'])
304        samlResponseElem = ResponseElementTree.toXML(response)
305       
306        print("SAML Response ...")
307        print(ElementTree.tostring(samlResponseElem))
308        print("Pretty print SAML Response ...")
309        print(prettyPrint(samlResponseElem))
310       
311        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
312
313    def test07AttributeQueryFromConfig(self):
314        thisSection = 'test07AttributeQueryFromConfig'
315        _cfg = self.cfg[thisSection]
316       
317        binding = AttributeQuerySOAPBinding.fromConfig(self.cfgFilePath, 
318                                                       section=thisSection,
319                                                       prefix='attributeQuery.')
320        binding.subjectID = _cfg['subject']
321        response = binding.send(uri=_cfg['uri'])
322        samlResponseElem = ResponseElementTree.toXML(response)
323       
324        print("SAML Response ...")
325        print(ElementTree.tostring(samlResponseElem))
326        print("Pretty print SAML Response ...")
327        print(prettyPrint(samlResponseElem))
328       
329        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
330       
331    def test08AttributeQuerySslSOAPBindingInterface(self):
332        thisSection = 'test08AttributeQuerySslSOAPBindingInterface'
333        _cfg = self.cfg[thisSection]
334       
335        binding = AttributeQuerySslSOAPBinding.fromConfig(self.cfgFilePath, 
336                                                       section=thisSection,
337                                                       prefix='attributeQuery.')
338       
339        binding.subjectID = _cfg['subject']
340        response = binding.send(uri=_cfg['uri'])
341        samlResponseElem = ResponseElementTree.toXML(response)
342       
343        print("SAML Response ...")
344        print(ElementTree.tostring(samlResponseElem))
345        print("Pretty print SAML Response ...")
346        print(prettyPrint(samlResponseElem))
347       
348        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
349
350       
351if __name__ == "__main__":
352    unittest.main()
Note: See TracBrowser for help on using the repository browser.