source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py @ 7077

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_samlattributeauthorityclient.py@7077
Revision 7077, 12.8 KB checked in by pjkersha, 11 years ago (diff)
  • Property svn:keywords set to Id
RevLine 
[6571]1#!/usr/bin/env python
2"""NDG Attribute Authority SAML SOAP Binding client unit tests
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/02/10 (moved from test_attributeauthorityclient)"
8__copyright__ = "(C) 2010 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
[7077]11__revision__ = '$Id$'
[6571]12import logging
13logging.basicConfig(level=logging.DEBUG)
[6575]14import os
[6571]15from datetime import datetime
16from uuid import uuid4
17from xml.etree import ElementTree
18
[6615]19from ndg.saml.common import SAMLVersion
20from ndg.saml.common.xml import SAMLConstants
21from ndg.saml.xml.etree import AttributeQueryElementTree, ResponseElementTree
[6720]22from ndg.saml.saml2.core import (Subject, Issuer, Attribute, NameID, 
23                                 AttributeQuery, StatusCode, 
24                                 XSStringAttributeValue)
[6571]25
26from ndg.security.common.saml_utils.binding.soap import SOAPBinding
27from ndg.security.common.saml_utils.binding.soap.attributequery import (
28                                        AttributeQuerySOAPBinding, 
29                                        AttributeQuerySslSOAPBinding)
[6572]30from ndg.security.common.saml_utils.esg import (EsgSamlNamespaces,
31                                                EsgDefaultQueryAttributes)
[6571]32from ndg.security.test.unit.attributeauthorityclient import \
33                                        AttributeAuthorityClientBaseTestCase
34from ndg.security.common.utils.etree import prettyPrint
35
36   
37class AttributeAuthoritySAMLInterfaceTestCase(
38                                        AttributeAuthorityClientBaseTestCase):
39    """NDG Attribute Authority SAML SOAP Binding client unit tests"""
[6720]40    HERE_DIR = os.path.dirname(__file__)
[6571]41    CONFIG_FILENAME = 'test_samlattributeauthorityclient.cfg'
[6720]42    CONFIG_FILEPATH = os.path.join(HERE_DIR, CONFIG_FILENAME)
[6571]43   
44    def __init__(self, *arg, **kw):
45        super(AttributeAuthoritySAMLInterfaceTestCase, self).__init__(*arg, 
46                                                                      **kw)
[6572]47       
48        # Run same config but on two different ports - one HTTP and one HTTPS
49        self.startSiteAAttributeAuthority()
[6571]50        self.startSiteAAttributeAuthority(withSSL=True, port=5443)
51       
[6575]52    def test01AttributeQuery(self):
53        _cfg = self.cfg['test01AttributeQuery']
[6571]54       
55        attributeQuery = AttributeQuery()
56        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
57        attributeQuery.id = str(uuid4())
58        attributeQuery.issueInstant = datetime.utcnow()
59       
60        attributeQuery.issuer = Issuer()
61        attributeQuery.issuer.format = Issuer.X509_SUBJECT
62        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
63                       
[6572]64        attributeQuery.subject = Subject()
[6571]65        attributeQuery.subject.nameID = NameID()
66        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
67        attributeQuery.subject.nameID.value = _cfg['subject']
68        xsStringNs = SAMLConstants.XSD_NS+"#"+\
69                                        XSStringAttributeValue.TYPE_LOCAL_NAME
70        fnAttribute = Attribute()
71        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME
72        fnAttribute.nameFormat = xsStringNs
73        fnAttribute.friendlyName = "FirstName"
74
75        attributeQuery.attributes.append(fnAttribute)
76   
77        lnAttribute = Attribute()
78        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME
79        lnAttribute.nameFormat = xsStringNs
80        lnAttribute.friendlyName = "LastName"
81
82        attributeQuery.attributes.append(lnAttribute)
83   
84        emailAddressAttribute = Attribute()
85        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME
86        emailAddressAttribute.nameFormat = xsStringNs
87        emailAddressAttribute.friendlyName = "emailAddress"
88       
89        attributeQuery.attributes.append(emailAddressAttribute) 
90
91        siteAAttribute = Attribute()
92        siteAAttribute.name = _cfg['siteAttributeName']
93        siteAAttribute.nameFormat = xsStringNs
94       
95        attributeQuery.attributes.append(siteAAttribute) 
96
97        binding = SOAPBinding()
98        binding.serialise = AttributeQueryElementTree.toXML
99        binding.deserialise = ResponseElementTree.fromXML
100        response = binding.send(attributeQuery, _cfg['uri'])
101       
102        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
103       
104        # Check Query ID matches the query ID the service received
105        self.assert_(response.inResponseTo == attributeQuery.id)
106       
107        now = datetime.utcnow()
108        self.assert_(response.issueInstant < now)
109        self.assert_(response.assertions[-1].issueInstant < now)       
110        self.assert_(response.assertions[-1].conditions.notBefore < now) 
111        self.assert_(response.assertions[-1].conditions.notOnOrAfter > now)
112         
113        samlResponseElem = ResponseElementTree.toXML(response)
114       
115        print("SAML Response ...")
116        print(ElementTree.tostring(samlResponseElem))
117        print("Pretty print SAML Response ...")
118        print(prettyPrint(samlResponseElem))
119             
[6575]120    def test02AttributeQueryInvalidIssuer(self):
121        _cfg = self.cfg['test02AttributeQueryInvalidIssuer']
[6571]122       
123        attributeQuery = AttributeQuery()
124        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
125        attributeQuery.id = str(uuid4())
126        attributeQuery.issueInstant = datetime.utcnow()
127       
128        attributeQuery.issuer = Issuer()
129        attributeQuery.issuer.format = Issuer.X509_SUBJECT
[6572]130        attributeQuery.issuer.value = "/O=Invalid Site/CN=PDP"   
[6571]131                       
132        attributeQuery.subject = Subject() 
133        attributeQuery.subject.nameID = NameID()
134        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
135        attributeQuery.subject.nameID.value = _cfg['subject']
136        xsStringNs = SAMLConstants.XSD_NS+"#"+\
137                                        XSStringAttributeValue.TYPE_LOCAL_NAME
138
139        siteAAttribute = Attribute()
140        siteAAttribute.name = _cfg['siteAttributeName']
141        siteAAttribute.nameFormat = xsStringNs
142       
143        attributeQuery.attributes.append(siteAAttribute) 
144
145        binding = SOAPBinding()
[6572]146        binding.serialise = AttributeQueryElementTree.toXML
147        binding.deserialise = ResponseElementTree.fromXML
[6571]148        response = binding.send(attributeQuery, _cfg['uri'])
149
150        samlResponseElem = ResponseElementTree.toXML(response)
151       
152        print("SAML Response ...")
153        print(ElementTree.tostring(samlResponseElem))
154        print("Pretty print SAML Response ...")
155        print(prettyPrint(samlResponseElem))
156       
157        self.assert_(
158            response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI)
159                   
[6575]160    def test03AttributeQueryUnknownSubject(self):
161        _cfg = self.cfg['test03AttributeQueryUnknownSubject']
[6571]162       
163        attributeQuery = AttributeQuery()
164        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
165        attributeQuery.id = str(uuid4())
166        attributeQuery.issueInstant = datetime.utcnow()
167       
168        attributeQuery.issuer = Issuer()
169        attributeQuery.issuer.format = Issuer.X509_SUBJECT
170        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
171                       
172        attributeQuery.subject = Subject() 
173        attributeQuery.subject.nameID = NameID()
174        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
175        attributeQuery.subject.nameID.value = _cfg['subject']
176        xsStringNs = SAMLConstants.XSD_NS+"#"+\
177                                        XSStringAttributeValue.TYPE_LOCAL_NAME
178
179        siteAAttribute = Attribute()
180        siteAAttribute.name = _cfg['siteAttributeName']
181        siteAAttribute.nameFormat = xsStringNs
182       
183        attributeQuery.attributes.append(siteAAttribute) 
184
185        binding = SOAPBinding()
[6572]186        binding.serialise = AttributeQueryElementTree.toXML
187        binding.deserialise = ResponseElementTree.fromXML
[6571]188        response = binding.send(attributeQuery, _cfg['uri'])
189       
190        samlResponseElem = ResponseElementTree.toXML(response)
191        print("SAML Response ...")
192        print(ElementTree.tostring(samlResponseElem))
193        print("Pretty print SAML Response ...")
194        print(prettyPrint(samlResponseElem))
195       
196        self.assert_(
197            response.status.statusCode.value==StatusCode.UNKNOWN_PRINCIPAL_URI)
198             
[6575]199    def test04AttributeQueryInvalidAttrName(self):
200        thisSection = 'test04AttributeQueryInvalidAttrName'
[6572]201        _cfg = self.cfg[thisSection]
[6571]202       
203        attributeQuery = AttributeQuery()
204        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20)
205        attributeQuery.id = str(uuid4())
206        attributeQuery.issueInstant = datetime.utcnow()
207       
208        attributeQuery.issuer = Issuer()
209        attributeQuery.issuer.format = Issuer.X509_SUBJECT
210        attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A"   
211                       
212        attributeQuery.subject = Subject() 
213        attributeQuery.subject.nameID = NameID()
214        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
215        attributeQuery.subject.nameID.value = _cfg['subject']
216        xsStringNs = SAMLConstants.XSD_NS+"#"+\
217                                        XSStringAttributeValue.TYPE_LOCAL_NAME
218
219        invalidAttribute = Attribute()
220        invalidAttribute.name = "myInvalidAttributeName"
221        invalidAttribute.nameFormat = xsStringNs
222       
223        attributeQuery.attributes.append(invalidAttribute) 
224
[6572]225        binding = SOAPBinding.fromConfig(
[6575]226                     AttributeAuthoritySAMLInterfaceTestCase.CONFIG_FILEPATH, 
[6572]227                     prefix='saml.', 
228                     section=thisSection)
[6571]229        response = binding.send(attributeQuery, _cfg['uri'])
230       
231        samlResponseElem = ResponseElementTree.toXML(response)
232       
233        print("SAML Response ...")
234        print(ElementTree.tostring(samlResponseElem))
235        print("Pretty print SAML Response ...")
236        print(prettyPrint(samlResponseElem))
237       
238        self.assert_(response.status.statusCode.value==\
239                     StatusCode.INVALID_ATTR_NAME_VALUE_URI)
240       
241    def test05AttributeQuerySOAPBindingInterface(self):
242        _cfg = self.cfg['test05AttributeQuerySOAPBindingInterface']
243       
244        binding = AttributeQuerySOAPBinding()
245       
246        binding.subjectID = AttributeAuthoritySAMLInterfaceTestCase.OPENID_URI
[6572]247        binding.subjectIdFormat = EsgSamlNamespaces.NAMEID_FORMAT
248        binding.issuerName = \
249            str(AttributeAuthoritySAMLInterfaceTestCase.VALID_REQUESTOR_IDS[0])
250        binding.issuerFormat = Issuer.X509_SUBJECT
[6571]251       
252        binding.queryAttributes = EsgDefaultQueryAttributes.ATTRIBUTES
253       
254        response = binding.send(uri=_cfg['uri'])
255        samlResponseElem = ResponseElementTree.toXML(response)
256       
257        print("SAML Response ...")
258        print(ElementTree.tostring(samlResponseElem))
259        print("Pretty print SAML Response ...")
260        print(prettyPrint(samlResponseElem))
261       
262        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
263
264    def test06AttributeQueryFromConfig(self):
265        thisSection = 'test06AttributeQueryFromConfig'
266        _cfg = self.cfg[thisSection]
267       
268        binding = AttributeQuerySOAPBinding.fromConfig(self.cfgFilePath, 
269                                                       section=thisSection,
270                                                       prefix='attributeQuery.')
271        binding.subjectID = _cfg['subject']
272        response = binding.send(uri=_cfg['uri'])
273        samlResponseElem = ResponseElementTree.toXML(response)
274       
275        print("SAML Response ...")
276        print(ElementTree.tostring(samlResponseElem))
277        print("Pretty print SAML Response ...")
278        print(prettyPrint(samlResponseElem))
279       
280        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
281       
282    def test07AttributeQuerySslSOAPBindingInterface(self):
283        thisSection = 'test07AttributeQuerySslSOAPBindingInterface'
284        _cfg = self.cfg[thisSection]
285       
286        binding = AttributeQuerySslSOAPBinding.fromConfig(self.cfgFilePath, 
287                                                       section=thisSection,
288                                                       prefix='attributeQuery.')
289       
290        binding.subjectID = _cfg['subject']
291        response = binding.send(uri=_cfg['uri'])
292        samlResponseElem = ResponseElementTree.toXML(response)
293       
294        print("SAML Response ...")
295        print(ElementTree.tostring(samlResponseElem))
296        print("Pretty print SAML Response ...")
297        print(prettyPrint(samlResponseElem))
298       
299        self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI)
300
[6575]301       
[6571]302if __name__ == "__main__":
303    unittest.main()
Note: See TracBrowser for help on using the repository browser.