source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapauthzdecisioninterface.py @ 6578

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapauthzdecisioninterface.py@6578
Revision 6578, 7.8 KB checked in by pjkersha, 11 years ago (diff)
  • Important fix for SOAP client used with SAML SOAP binding: set text/xml content type.
  • Refactored SAML SOAP binding query clients.
  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI SAML 2.0 SOAP Authorisation Decision Query Interface
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/02/2010"
8__copyright__ = "(C) 2010 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12import unittest
13from uuid import uuid4
14from datetime import datetime, timedelta
15from cStringIO import StringIO
16
17from saml.saml2.core import (SAMLVersion, Subject, NameID, Issuer, Response,
18                             AuthzDecisionQuery, AuthzDecisionStatement, Status,
19                             StatusCode, StatusMessage, DecisionType, Action, 
20                             Conditions, Assertion)
21from saml.xml.etree import AuthzDecisionQueryElementTree, ResponseElementTree
22
23from ndg.security.common.soap.etree import SOAPEnvelope
24from ndg.security.common.saml_utils.esg import EsgSamlNamespaces
25from ndg.security.test.unit.wsgi.saml import SoapSamlInterfaceMiddlewareTestCase
26
27
28class TestAuthorisationServiceMiddleware(object):
29    """Test Authorisation Service interface stub"""
30    QUERY_INTERFACE_KEYNAME_OPTNAME = 'queryInterfaceKeyName'
31    RESOURCE_URI = 'http://localhost/dap/data/'
32    ISSUER_DN = '/O=Site A/CN=PDP'
33   
34    def __init__(self, app, global_conf, **app_conf):
35        self.queryInterfaceKeyName = app_conf[
36            TestAuthorisationServiceMiddleware.QUERY_INTERFACE_KEYNAME_OPTNAME]
37        self._app = app
38   
39    def __call__(self, environ, start_response):
40        environ[self.queryInterfaceKeyName] = self.authzDecisionQueryFactory()
41        return self._app(environ, start_response)
42   
43    def authzDecisionQueryFactory(self):
44        def authzDecisionQuery(query):
45            response = Response()
46            now = datetime.utcnow()
47            response.issueInstant = now
48           
49            # Make up a request ID that this response is responding to
50            response.inResponseTo = query.id
51            response.id = str(uuid4())
52            response.version = SAMLVersion(SAMLVersion.VERSION_20)
53               
54            response.issuer = Issuer()
55            response.issuer.format = Issuer.X509_SUBJECT
56            response.issuer.value = TestAuthorisationServiceMiddleware.ISSUER_DN
57           
58            response.status = Status()
59            response.status.statusCode = StatusCode()
60            response.status.statusCode.value = StatusCode.SUCCESS_URI
61            response.status.statusMessage = StatusMessage()       
62            response.status.statusMessage.value = \
63                                                "Response created successfully"
64               
65            assertion = Assertion()
66            assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
67            assertion.id = str(uuid4())
68            assertion.issueInstant = now
69           
70            authzDecisionStatement = AuthzDecisionStatement()
71            authzDecisionStatement.decision = DecisionType.PERMIT
72            authzDecisionStatement.resource = \
73                TestAuthorisationServiceMiddleware.RESOURCE_URI
74            authzDecisionStatement.actions.append(Action())
75            authzDecisionStatement.actions[-1].namespace = Action.GHPP_NS_URI
76            authzDecisionStatement.actions[-1].value = Action.HTTP_GET_ACTION
77            assertion.authzDecisionStatements.append(authzDecisionStatement)
78           
79            # Add a conditions statement for a validity of 8 hours
80            assertion.conditions = Conditions()
81            assertion.conditions.notBefore = now
82            assertion.conditions.notOnOrAfter = now + timedelta(seconds=60*60*8)
83                   
84            assertion.subject = Subject() 
85            assertion.subject.nameID = NameID()
86            assertion.subject.nameID.format = query.subject.nameID.format
87            assertion.subject.nameID.value = query.subject.nameID.value
88               
89            assertion.issuer = Issuer()
90            assertion.issuer.format = Issuer.X509_SUBJECT
91            assertion.issuer.value = \
92                                    TestAuthorisationServiceMiddleware.ISSUER_DN
93   
94            response.assertions.append(assertion)
95            return response
96       
97        return authzDecisionQuery
98   
99   
100class SOAPAuthzDecisionInterfaceMiddlewareTestCase(
101                                        SoapSamlInterfaceMiddlewareTestCase):
102    CONFIG_FILENAME = 'authz-decision-interface.ini'
103
104    def _createAuthzDecisionQuery(self, 
105                    issuer="/O=Site A/CN=PEP",
106                    subject="https://openid.localhost/philip.kershaw",
107                    resource=TestAuthorisationServiceMiddleware.RESOURCE_URI,
108                    action=Action.HTTP_GET_ACTION,
109                    actionNs=Action.GHPP_NS_URI):
110        query = AuthzDecisionQuery()
111        query.version = SAMLVersion(SAMLVersion.VERSION_20)
112        query.id = str(uuid4())
113        query.issueInstant = datetime.utcnow()
114       
115        query.issuer = Issuer()
116        query.issuer.format = Issuer.X509_SUBJECT
117        query.issuer.value = issuer
118                       
119        query.subject = Subject() 
120        query.subject.nameID = NameID()
121        query.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
122        query.subject.nameID.value = subject
123                                   
124        query.resource = resource         
125        query.actions.append(Action())
126        query.actions[0].namespace = actionNs
127        query.actions[0].value = action   
128
129        return query
130   
131    def _makeRequest(self, query=None, **kw):
132        """Convenience method to construct queries for tests"""
133       
134        if query is None:
135            query = self._createAuthzDecisionQuery(**kw)
136           
137        elem = AuthzDecisionQueryElementTree.toXML(query)
138        soapRequest = SOAPEnvelope()
139        soapRequest.create()
140        soapRequest.body.elem.append(elem)
141       
142        request = soapRequest.serialize()
143       
144        return request
145   
146    def _getSAMLResponse(self, responseBody):
147        """Deserialise response string into ElementTree element"""
148        soapResponse = SOAPEnvelope()
149       
150        responseStream = StringIO()
151        responseStream.write(responseBody)
152        responseStream.seek(0)
153       
154        soapResponse.parse(responseStream)
155       
156        print("Parsed response ...")
157        print(soapResponse.serialize())
158#        print(prettyPrint(soapResponse.elem))
159       
160        response = ResponseElementTree.fromXML(soapResponse.body.elem[0])
161       
162        return response
163   
164    def test01ValidQuery(self):
165        query = self._createAuthzDecisionQuery()
166        request = self._makeRequest(query=query)
167       
168        header = {
169            'soapAction': "http://www.oasis-open.org/committees/security",
170            'Content-length': str(len(request)),
171            'Content-type': 'text/xml'
172        }
173        response = self.app.post('/authorisationservice/', 
174                                 params=request, 
175                                 headers=header, 
176                                 status=200)
177        print("Response status=%d" % response.status)
178        samlResponse = self._getSAMLResponse(response.body)
179
180        self.assert_(samlResponse.status.statusCode.value == \
181                     StatusCode.SUCCESS_URI)
182        self.assert_(samlResponse.inResponseTo == query.id)
183        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
184                     query.subject.nameID.value)
185        self.assert_(samlResponse.assertions[0])
186        self.assert_(samlResponse.assertions[0].authzDecisionStatements[0])
187        self.assert_(samlResponse.assertions[0].authzDecisionStatements[0
188                                            ].decision == DecisionType.PERMIT)
189       
190    def test02(self):
191        pass
192 
193if __name__ == "__main__":
194    unittest.main()
Note: See TracBrowser for help on using the repository browser.