source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapauthzdecisioninterface.py @ 7153

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapauthzdecisioninterface.py@7153
Revision 7153, 7.7 KB checked in by pjkersha, 10 years ago (diff)

Incomplete - task 2: XACML-Security Integration

  • updated certs for new test CA
  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI SAML 2.0 SOAP Authorisation Decision Query Interface
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "15/02/2010"
8__copyright__ = "(C) 2010 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import unittest
13from uuid import uuid4
14from datetime import datetime, timedelta
15from cStringIO import StringIO
16
17from ndg.saml.saml2.core import (SAMLVersion, Subject, NameID, Issuer, Response,
18                             AuthzDecisionQuery, AuthzDecisionStatement, Status,
19                             StatusCode, StatusMessage, DecisionType, Action, 
20                             Conditions, Assertion)
21from ndg.saml.xml.etree import AuthzDecisionQueryElementTree, ResponseElementTree
22
23from ndg.security.common.soap.etree import SOAPEnvelope
24from ndg.security.common.saml_utils.esg import EsgSamlNamespaces
25from ndg.security.test.unit.wsgi.saml import SoapSamlInterfaceMiddlewareTestCase
26
27
28class TestAuthorisationServiceMiddleware(object):
29    """Test Authorisation Service interface stub"""
30    QUERY_INTERFACE_KEYNAME_OPTNAME = 'queryInterfaceKeyName'
31    RESOURCE_URI = 'http://localhost/dap/data/'
32    ISSUER_DN = '/O=Test/OU=Authorisation/CN=Service Stub'
33   
34    def __init__(self, app, global_conf, **app_conf):
35        self.queryInterfaceKeyName = app_conf[
36            TestAuthorisationServiceMiddleware.QUERY_INTERFACE_KEYNAME_OPTNAME]
37        self._app = app
38   
39    def __call__(self, environ, start_response):
40        environ[self.queryInterfaceKeyName] = self.authzDecisionQueryFactory()
41        return self._app(environ, start_response)
42   
43    def authzDecisionQueryFactory(self):
44        def authzDecisionQuery(query, response):
45            now = datetime.utcnow()
46            response.issueInstant = now
47           
48            # Make up a request ID that this response is responding to
49            response.inResponseTo = query.id
50            response.id = str(uuid4())
51            response.version = SAMLVersion(SAMLVersion.VERSION_20)
52           
53            response.status = Status()
54            response.status.statusCode = StatusCode()
55            response.status.statusCode.value = StatusCode.SUCCESS_URI
56            response.status.statusMessage = StatusMessage()       
57            response.status.statusMessage.value = \
58                                                "Response created successfully"
59               
60            assertion = Assertion()
61            assertion.version = SAMLVersion(SAMLVersion.VERSION_20)
62            assertion.id = str(uuid4())
63            assertion.issueInstant = now
64           
65            authzDecisionStatement = AuthzDecisionStatement()
66            authzDecisionStatement.decision = DecisionType.PERMIT
67            authzDecisionStatement.resource = \
68                TestAuthorisationServiceMiddleware.RESOURCE_URI
69            authzDecisionStatement.actions.append(Action())
70            authzDecisionStatement.actions[-1].namespace = Action.GHPP_NS_URI
71            authzDecisionStatement.actions[-1].value = Action.HTTP_GET_ACTION
72            assertion.authzDecisionStatements.append(authzDecisionStatement)
73           
74            # Add a conditions statement for a validity of 8 hours
75            assertion.conditions = Conditions()
76            assertion.conditions.notBefore = now
77            assertion.conditions.notOnOrAfter = now + timedelta(seconds=60*60*8)
78                   
79            assertion.subject = Subject() 
80            assertion.subject.nameID = NameID()
81            assertion.subject.nameID.format = query.subject.nameID.format
82            assertion.subject.nameID.value = query.subject.nameID.value
83               
84            assertion.issuer = Issuer()
85            assertion.issuer.format = Issuer.X509_SUBJECT
86            assertion.issuer.value = \
87                                    TestAuthorisationServiceMiddleware.ISSUER_DN
88   
89            response.assertions.append(assertion)
90            return response
91       
92        return authzDecisionQuery
93   
94   
95class SOAPAuthzDecisionInterfaceMiddlewareTestCase(
96                                        SoapSamlInterfaceMiddlewareTestCase):
97    CONFIG_FILENAME = 'authz-decision-interface.ini'
98    RESOURCE_URI = TestAuthorisationServiceMiddleware.RESOURCE_URI
99   
100    def _createAuthzDecisionQuery(self, 
101                            issuer="/O=Site A/CN=PEP",
102                            subject="https://openid.localhost/philip.kershaw",
103                            resource=None,
104                            action=Action.HTTP_GET_ACTION,
105                            actionNs=Action.GHPP_NS_URI):
106        query = AuthzDecisionQuery()
107        query.version = SAMLVersion(SAMLVersion.VERSION_20)
108        query.id = str(uuid4())
109        query.issueInstant = datetime.utcnow()
110       
111        query.issuer = Issuer()
112        query.issuer.format = Issuer.X509_SUBJECT
113        query.issuer.value = issuer
114                       
115        query.subject = Subject() 
116        query.subject.nameID = NameID()
117        query.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT
118        query.subject.nameID.value = subject
119                                 
120        if resource is None:
121            query.resource = self.__class__.RESOURCE_URI
122        else:   
123            query.resource = resource
124                 
125        query.actions.append(Action())
126        query.actions[0].namespace = actionNs
127        query.actions[0].value = action   
128
129        return query
130   
131    def _makeRequest(self, query=None, **kw):
132        """Convenience method to construct queries for tests"""
133       
134        if query is None:
135            query = self._createAuthzDecisionQuery(**kw)
136           
137        elem = AuthzDecisionQueryElementTree.toXML(query)
138        soapRequest = SOAPEnvelope()
139        soapRequest.create()
140        soapRequest.body.elem.append(elem)
141       
142        request = soapRequest.serialize()
143       
144        return request
145   
146    def _getSAMLResponse(self, responseBody):
147        """Deserialise response string into ElementTree element"""
148        soapResponse = SOAPEnvelope()
149       
150        responseStream = StringIO()
151        responseStream.write(responseBody)
152        responseStream.seek(0)
153       
154        soapResponse.parse(responseStream)
155       
156        print("Parsed response ...")
157        print(soapResponse.serialize())
158#        print(prettyPrint(soapResponse.elem))
159       
160        response = ResponseElementTree.fromXML(soapResponse.body.elem[0])
161       
162        return response
163   
164    def test01ValidQuery(self):
165        query = self._createAuthzDecisionQuery()
166        request = self._makeRequest(query=query)
167       
168        header = {
169            'soapAction': "http://www.oasis-open.org/committees/security",
170            'Content-length': str(len(request)),
171            'Content-type': 'text/xml'
172        }
173        response = self.app.post('/authorisationservice/', 
174                                 params=request, 
175                                 headers=header, 
176                                 status=200)
177        print("Response status=%d" % response.status)
178        samlResponse = self._getSAMLResponse(response.body)
179
180        self.assert_(samlResponse.status.statusCode.value == \
181                     StatusCode.SUCCESS_URI)
182        self.assert_(samlResponse.inResponseTo == query.id)
183        self.assert_(samlResponse.assertions[0].subject.nameID.value == \
184                     query.subject.nameID.value)
185        self.assert_(samlResponse.assertions[0])
186        self.assert_(samlResponse.assertions[0].authzDecisionStatements[0])
187        self.assert_(samlResponse.assertions[0].authzDecisionStatements[0
188                                            ].decision == DecisionType.PERMIT)
189   
190   
191if __name__ == "__main__":
192    unittest.main()
Note: See TracBrowser for help on using the repository browser.