source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py @ 6572

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py@6572
Revision 6572, 12.7 KB checked in by pjkersha, 11 years ago (diff)

Working refactored Attribute Authority Client unit tests.

Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority SAML SOAP Binding client unit tests
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/02/10 (moved from test_attributeauthorityclient)"
8__copyright__ = "(C) 2010 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14from datetime import datetime
15from uuid import uuid4
16from xml.etree import ElementTree
17
18from saml.common import SAMLVersion
19from saml.common.xml import SAMLConstants
20from saml.xml.etree import AttributeQueryElementTree, ResponseElementTree
21from saml.saml2.core import (Subject, Issuer, Attribute, NameID, AttributeQuery,
22                             StatusCode, XSStringAttributeValue, )
23
24from ndg.security.common.saml_utils.binding.soap import SOAPBinding
25from ndg.security.common.saml_utils.binding.soap.attributequery import (
26                                        AttributeQuerySOAPBinding, 
27                                        AttributeQuerySslSOAPBinding)
28from ndg.security.common.saml_utils.esg import (EsgSamlNamespaces,
29                                                EsgDefaultQueryAttributes)
30from ndg.security.test.unit.attributeauthorityclient import \
31                                        AttributeAuthorityClientBaseTestCase
32from ndg.security.common.utils.etree import prettyPrint
33
34   
35class AttributeAuthoritySAMLInterfaceTestCase(
36                                        AttributeAuthorityClientBaseTestCase):
37    """NDG Attribute Authority SAML SOAP Binding client unit tests"""
38    CONFIG_FILENAME = 'test_samlattributeauthorityclient.cfg'
39   
40    def __init__(self, *arg, **kw):
41        super(AttributeAuthoritySAMLInterfaceTestCase, self).__init__(*arg, 
42                                                                      **kw)
43       
44        # Run same config but on two different ports - one HTTP and one HTTPS
45        self.startSiteAAttributeAuthority()
46        self.startSiteAAttributeAuthority(withSSL=True, port=5443)
47       
48    def test01SAMLAttributeQuery(self):
49        _cfg = self.cfg['test01SAMLAttributeQuery']
50       
51        attributeQuery = AttributeQuery()
52        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
53        attributeQuery.id = str(uuid4())
54        attributeQuery.issueInstant = datetime.utcnow()
55       
56        attributeQuery.issuer = Issuer()
57        attributeQuery.issuer.format = Issuer.X509_SUBJECT
58        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
59                       
60        attributeQuery.subject = Subject()
61        attributeQuery.subject.nameID = NameID()
62        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
63        attributeQuery.subject.nameID.value = _cfg['subject']
64        xsStringNs = SAMLConstants.XSD_NS+"#"+\
65                                        XSStringAttributeValue.TYPE_LOCAL_NAME
66        fnAttribute = Attribute()
67        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
68        fnAttribute.nameFormat = xsStringNs
69        fnAttribute.friendlyName = "FirstName"
70
71        attributeQuery.attributes.append(fnAttribute)
72   
73        lnAttribute = Attribute()
74        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
75        lnAttribute.nameFormat = xsStringNs
76        lnAttribute.friendlyName = "LastName"
77
78        attributeQuery.attributes.append(lnAttribute)
79   
80        emailAddressAttribute = Attribute()
81        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
82        emailAddressAttribute.nameFormat = xsStringNs
83        emailAddressAttribute.friendlyName = "emailAddress"
84       
85        attributeQuery.attributes.append(emailAddressAttribute) 
86
87        siteAAttribute = Attribute()
88        siteAAttribute.name = _cfg['siteAttributeName']
89        siteAAttribute.nameFormat = xsStringNs
90       
91        attributeQuery.attributes.append(siteAAttribute) 
92
93        binding = SOAPBinding()
94        binding.serialise = AttributeQueryElementTree.toXML
95        binding.deserialise = ResponseElementTree.fromXML
96        response = binding.send(attributeQuery, _cfg['uri'])
97       
98        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
99       
100        # Check Query ID matches the query ID the service received
101        self.assert_(response.inResponseTo == attributeQuery.id)
102       
103        now = datetime.utcnow()
104        self.assert_(response.issueInstant < now)
105        self.assert_(response.assertions[-1].issueInstant < now)       
106        self.assert_(response.assertions[-1].conditions.notBefore < now) 
107        self.assert_(response.assertions[-1].conditions.notOnOrAfter > now)
108         
109        samlResponseElem = ResponseElementTree.toXML(response)
110       
111        print("SAML Response ...")
112        print(ElementTree.tostring(samlResponseElem))
113        print("Pretty print SAML Response ...")
114        print(prettyPrint(samlResponseElem))
115             
116    def test02SAMLAttributeQueryInvalidIssuer(self):
117        _cfg = self.cfg['test02SAMLAttributeQueryInvalidIssuer']
118       
119        attributeQuery = AttributeQuery()
120        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
121        attributeQuery.id = str(uuid4())
122        attributeQuery.issueInstant = datetime.utcnow()
123       
124        attributeQuery.issuer = Issuer()
125        attributeQuery.issuer.format = Issuer.X509_SUBJECT
126        attributeQuery.issuer.value = "/O=Invalid Site/CN=PDP"   
127                       
128        attributeQuery.subject = Subject() 
129        attributeQuery.subject.nameID = NameID()
130        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
131        attributeQuery.subject.nameID.value = _cfg['subject']
132        xsStringNs = SAMLConstants.XSD_NS+"#"+\
133                                        XSStringAttributeValue.TYPE_LOCAL_NAME
134
135        siteAAttribute = Attribute()
136        siteAAttribute.name = _cfg['siteAttributeName']
137        siteAAttribute.nameFormat = xsStringNs
138       
139        attributeQuery.attributes.append(siteAAttribute) 
140
141        binding = SOAPBinding()
142        binding.serialise = AttributeQueryElementTree.toXML
143        binding.deserialise = ResponseElementTree.fromXML
144        response = binding.send(attributeQuery, _cfg['uri'])
145
146        samlResponseElem = ResponseElementTree.toXML(response)
147       
148        print("SAML Response ...")
149        print(ElementTree.tostring(samlResponseElem))
150        print("Pretty print SAML Response ...")
151        print(prettyPrint(samlResponseElem))
152       
153        self.assert_(
154            response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI)
155                   
156    def test03SAMLAttributeQueryUnknownSubject(self):
157        _cfg = self.cfg['test03SAMLAttributeQueryUnknownSubject']
158       
159        attributeQuery = AttributeQuery()
160        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
161        attributeQuery.id = str(uuid4())
162        attributeQuery.issueInstant = datetime.utcnow()
163       
164        attributeQuery.issuer = Issuer()
165        attributeQuery.issuer.format = Issuer.X509_SUBJECT
166        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
167                       
168        attributeQuery.subject = Subject() 
169        attributeQuery.subject.nameID = NameID()
170        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
171        attributeQuery.subject.nameID.value = _cfg['subject']
172        xsStringNs = SAMLConstants.XSD_NS+"#"+\
173                                        XSStringAttributeValue.TYPE_LOCAL_NAME
174
175        siteAAttribute = Attribute()
176        siteAAttribute.name = _cfg['siteAttributeName']
177        siteAAttribute.nameFormat = xsStringNs
178       
179        attributeQuery.attributes.append(siteAAttribute) 
180
181        binding = SOAPBinding()
182        binding.serialise = AttributeQueryElementTree.toXML
183        binding.deserialise = ResponseElementTree.fromXML
184        response = binding.send(attributeQuery, _cfg['uri'])
185       
186        samlResponseElem = ResponseElementTree.toXML(response)
187        print("SAML Response ...")
188        print(ElementTree.tostring(samlResponseElem))
189        print("Pretty print SAML Response ...")
190        print(prettyPrint(samlResponseElem))
191       
192        self.assert_(
193            response.status.statusCode.value==StatusCode.UNKNOWN_PRINCIPAL_URI)
194             
195    def test04SAMLAttributeQueryInvalidAttrName(self):
196        thisSection = 'test04SAMLAttributeQueryInvalidAttrName'
197        _cfg = self.cfg[thisSection]
198       
199        attributeQuery = AttributeQuery()
200        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
201        attributeQuery.id = str(uuid4())
202        attributeQuery.issueInstant = datetime.utcnow()
203       
204        attributeQuery.issuer = Issuer()
205        attributeQuery.issuer.format = Issuer.X509_SUBJECT
206        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
207                       
208        attributeQuery.subject = Subject() 
209        attributeQuery.subject.nameID = NameID()
210        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
211        attributeQuery.subject.nameID.value = _cfg['subject']
212        xsStringNs = SAMLConstants.XSD_NS+"#"+\
213                                        XSStringAttributeValue.TYPE_LOCAL_NAME
214
215        invalidAttribute = Attribute()
216        invalidAttribute.name = "myInvalidAttributeName"
217        invalidAttribute.nameFormat = xsStringNs
218       
219        attributeQuery.attributes.append(invalidAttribute) 
220
221        binding = SOAPBinding.fromConfig(
222                     AttributeAuthoritySAMLInterfaceTestCase.CONFIG_FILENAME, 
223                     prefix='saml.', 
224                     section=thisSection)
225        response = binding.send(attributeQuery, _cfg['uri'])
226       
227        samlResponseElem = ResponseElementTree.toXML(response)
228       
229        print("SAML Response ...")
230        print(ElementTree.tostring(samlResponseElem))
231        print("Pretty print SAML Response ...")
232        print(prettyPrint(samlResponseElem))
233       
234        self.assert_(response.status.statusCode.value==\
235                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
236       
237    def test05AttributeQuerySOAPBindingInterface(self):
238        _cfg = self.cfg['test05AttributeQuerySOAPBindingInterface']
239       
240        binding = AttributeQuerySOAPBinding()
241       
242        binding.subjectID = AttributeAuthoritySAMLInterfaceTestCase.OPENID_URI
243        binding.subjectIdFormat = EsgSamlNamespaces.NAMEID_FORMAT
244        binding.issuerName = \
245            str(AttributeAuthoritySAMLInterfaceTestCase.VALID_REQUESTOR_IDS[0])
246        binding.issuerFormat = Issuer.X509_SUBJECT
247       
248        binding.queryAttributes = EsgDefaultQueryAttributes.ATTRIBUTES
249       
250        response = binding.send(uri=_cfg['uri'])
251        samlResponseElem = ResponseElementTree.toXML(response)
252       
253        print("SAML Response ...")
254        print(ElementTree.tostring(samlResponseElem))
255        print("Pretty print SAML Response ...")
256        print(prettyPrint(samlResponseElem))
257       
258        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
259
260    def test06AttributeQueryFromConfig(self):
261        thisSection = 'test06AttributeQueryFromConfig'
262        _cfg = self.cfg[thisSection]
263       
264        binding = AttributeQuerySOAPBinding.fromConfig(self.cfgFilePath, 
265                                                       section=thisSection,
266                                                       prefix='attributeQuery.')
267        binding.subjectID = _cfg['subject']
268        response = binding.send(uri=_cfg['uri'])
269        samlResponseElem = ResponseElementTree.toXML(response)
270       
271        print("SAML Response ...")
272        print(ElementTree.tostring(samlResponseElem))
273        print("Pretty print SAML Response ...")
274        print(prettyPrint(samlResponseElem))
275       
276        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
277       
278    def test07AttributeQuerySslSOAPBindingInterface(self):
279        thisSection = 'test07AttributeQuerySslSOAPBindingInterface'
280        _cfg = self.cfg[thisSection]
281       
282        binding = AttributeQuerySslSOAPBinding.fromConfig(self.cfgFilePath, 
283                                                       section=thisSection,
284                                                       prefix='attributeQuery.')
285       
286        binding.subjectID = _cfg['subject']
287        response = binding.send(uri=_cfg['uri'])
288        samlResponseElem = ResponseElementTree.toXML(response)
289       
290        print("SAML Response ...")
291        print(ElementTree.tostring(samlResponseElem))
292        print("Pretty print SAML Response ...")
293        print(prettyPrint(samlResponseElem))
294       
295        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
296
297       
298if __name__ == "__main__":
299    unittest.main()
Note: See TracBrowser for help on using the repository browser.