source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/test_authz.py @ 7077

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/test_authz.py@7077
Revision 7077, 11.8 KB checked in by pjkersha, 11 years ago (diff)
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI Authorization handler
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "21/05/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13
14
15import unittest
16import os
17from urlparse import urlunsplit
18
19from os.path import expandvars as xpdVars
20from os.path import join as jnPath
21mkPath = lambda file: jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'], 
22                             file)
23from ConfigParser import SafeConfigParser
24
25import paste.fixture
26from paste.deploy import loadapp
27
28from ndg.security.test.unit import BaseTestCase
29from ndg.security.server.wsgi import NDGSecurityMiddlewareBase
30from ndg.security.server.wsgi.authz.result_handler.basic import \
31    PEPResultHandlerMiddleware
32from ndg.security.server.wsgi.authz.result_handler.redirect import \
33    HTTPRedirectPEPResultHandlerMiddleware
34from ndg.security.server.wsgi.authz import SamlPIPMiddlewareConfigError
35from ndg.security.common.authz.msi import Response
36
37
38class RedirectFollowingAccessDenied(PEPResultHandlerMiddleware):
39   
40    @NDGSecurityMiddlewareBase.initCall
41    def __call__(self, environ, start_response):
42       
43        queryString = environ.get('QUERY_STRING', '')
44        if 'admin=1' in queryString:
45            # User has been rejected access to a URI requiring admin rights,
46            # try redirect to the same URI minus the admin query arg, this
47            # request will pass because admin rights aren't needed
48            queryArgs = queryString.split('&')
49            queryList = [arg for arg in queryArgs if arg != 'admin=1']
50            editedQuery = '&'.join(queryList)
51            redirectURI = urlunsplit(('', '', self.pathInfo, editedQuery, ''))
52            return self.redirect(redirectURI)
53        else:
54            return super(RedirectFollowingAccessDenied, self).__call__(
55                                                            environ,
56                                                            start_response)
57
58       
59class TestAuthZMiddleware(object):
60    '''Test Application for the Authentication handler to protect'''
61    response = "Test Authorization application"
62       
63    def __init__(self, app_conf, **local_conf):
64        pass
65   
66    def __call__(self, environ, start_response):
67       
68        if environ['PATH_INFO'] == '/test_401':
69            status = "401 Unauthorized"
70           
71        elif environ['PATH_INFO'] == '/test_403':
72            status = "403 Forbidden"
73           
74        elif environ['PATH_INFO'] == '/test_200':
75            status = "200 OK"
76           
77        elif environ['PATH_INFO'] == '/test_accessDeniedToSecuredURI':
78            # Nb. AuthZ middleware should intercept the request and bypass this
79            # response
80            status = "200 OK"
81           
82        elif environ['PATH_INFO'] == '/test_accessGrantedToSecuredURI':
83            status = "200 OK"
84        else:
85            status = "404 Not found"
86               
87        start_response(status,
88                       [('Content-length', 
89                         str(len(TestAuthZMiddleware.response))),
90                        ('Content-type', 'text/plain')])
91        return [TestAuthZMiddleware.response]
92
93
94class BeakerSessionStub(dict):
95    """Emulate beaker.session session object for purposes of the unit tests
96    """
97    def save(self):
98        pass
99
100       
101class TestAuthZMiddleware(object):
102    '''Test Application for the Authentication handler to protect'''
103    response = "Test Authorization application"
104       
105    def __init__(self, app_conf, **local_conf):
106        pass
107   
108    def __call__(self, environ, start_response):
109       
110        if environ['PATH_INFO'] == '/test_401':
111            status = "401 Unauthorized"
112           
113        elif environ['PATH_INFO'] == '/test_403':
114            status = "403 Forbidden"
115           
116        elif environ['PATH_INFO'] == '/test_200':
117            status = "200 OK"
118           
119        elif environ['PATH_INFO'] == '/test_accessDeniedToSecuredURI':
120            # Nb. AuthZ middleware should intercept the request and bypass this
121            # response
122            status = "200 OK"
123           
124        elif environ['PATH_INFO'] == '/test_accessGrantedToSecuredURI':
125            status = "200 OK"
126        else:
127            status = "404 Not found"
128               
129        start_response(status,
130                       [('Content-length', 
131                         str(len(TestAuthZMiddleware.response))),
132                        ('Content-type', 'text/plain')])
133        return [TestAuthZMiddleware.response]
134
135
136class BeakerSessionStub(dict):
137    """Emulate beaker.session session object for purposes of the unit tests
138    """
139    def save(self):
140        pass
141 
142   
143class SamlWSGIAuthZTestCase(BaseTestCase):
144    INI_FILE = 'saml-test.ini'
145    THIS_DIR = os.path.dirname(os.path.abspath(__file__))
146    def __init__(self, *args, **kwargs):       
147        BaseTestCase.__init__(self, *args, **kwargs)
148
149       
150        wsgiapp = loadapp('config:'+SamlWSGIAuthZTestCase.INI_FILE, 
151                          relative_to=SamlWSGIAuthZTestCase.THIS_DIR)
152        self.app = paste.fixture.TestApp(wsgiapp)
153       
154        self.startSiteAAttributeAuthority(withSSL=True,
155            port=SamlWSGIAuthZTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
156       
157
158    def test01CatchNoBeakerSessionFound(self):
159       
160        # PEPFilterConfigError is raised if no beaker.session is set in
161        # environ
162        try:
163            response = self.app.get('/test_200')
164        except SamlPIPMiddlewareConfigError, e:
165            print("ok - expected: %s exception: %s" % (e.__class__, e))
166       
167    def test02Ensure200WithNotLoggedInAndUnsecuredURI(self):
168       
169        # Check the authZ middleware leaves the response alone if the URI
170        # is not matched in the policy
171       
172        # Simulate a beaker.session in the environ
173        extra_environ={'beaker.session.ndg.security':BeakerSessionStub()}
174        response = self.app.get('/test_200',
175                                extra_environ=extra_environ)
176
177    def test03Catch401WithLoggedIn(self):
178       
179        # Check that the application being secured can raise a HTTP 401
180        # response and that this respected by the Authorization middleware
181        # even though a user is set in the session
182       
183        extra_environ = {
184            'beaker.session.ndg.security':
185                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
186        }
187        response = self.app.get('/test_401', 
188                                extra_environ=extra_environ,
189                                status=401)
190
191    def test04Catch403WithLoggedIn(self):
192       
193        # Check that the application being secured can raise a HTTP 403
194        # response and that this respected by the Authorization middleware
195        # even though a user is set in the session
196       
197        extra_environ = {
198            'beaker.session.ndg.security':
199                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
200        }
201        response = self.app.get('/test_403', 
202                                extra_environ=extra_environ,
203                                status=403)
204
205    def test05Catch401WithNotLoggedInAndSecuredURI(self):
206       
207        # AuthZ middleware grants access because the URI requested is not
208        # targeted in the policy
209       
210        # AuthZ middleware checks for username key in session set by AuthN
211        # handler
212        extra_environ={'beaker.session.ndg.security':BeakerSessionStub()}       
213        response = self.app.get('/test_accessDeniedToSecuredURI',
214                                extra_environ=extra_environ,
215                                status=401)
216       
217    def test06AccessDeniedForSecuredURI(self):
218       
219        # User is logged in but doesn't have the required credentials for
220        # access
221        extra_environ = {
222            'beaker.session.ndg.security':
223                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
224        }
225       
226        response = self.app.get('/test_accessDeniedToSecuredURI',
227                                extra_environ=extra_environ,
228                                status=403)
229        print response
230        self.assert_("Insufficient privileges to access the "
231                     "resource" in response)
232
233    def test07AccessGrantedForSecuredURI(self):
234       
235        # User is logged in and has credentials for access to a URI secured
236        # by the policy file
237        extra_environ = {
238            'beaker.session.ndg.security':
239                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
240        }
241       
242        response = self.app.get('/test_accessGrantedToSecuredURI',
243                                extra_environ=extra_environ,
244                                status=200)
245        self.assert_(TestAuthZMiddleware.response in response)
246        print response
247
248    def test08AccessDeniedForAdminQueryArg(self):
249       
250        # User is logged in but doesn't have the required credentials for
251        # access
252        extra_environ = {
253            'beaker.session.ndg.security':
254                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
255        }
256       
257        # Try this URI with the query arg admin=1.  This will be picked up
258        # by the policy as a request requiring admin rights.  The request is
259        # denied as the user doesn't have these rights but this then calls
260        # into play the PEP result handler defined in this module,
261        # RedirectFollowingAccessDenied.  This class reinvokes the request
262        # but without the admin query argument.  Access is then granted for
263        # the redirected request
264        response = self.app.get('/test_accessGrantedToSecuredURI',
265                                params={'admin': 1},
266                                extra_environ=extra_environ,
267                                status=302)
268        try:
269            redirectResponse = response.follow(extra_environ=extra_environ)
270        except paste.fixture.AppError, e:
271            self.failIf(TestAuthZMiddleware.response not in response)
272        print response
273
274
275class PEPResultHandlerTestCase(BaseTestCase):
276    INI_FILE = 'pep-result-handler-test.ini'
277    THIS_DIR = os.path.dirname(os.path.abspath(__file__))
278    INI_FILEPATH = jnPath(THIS_DIR, INI_FILE)
279   
280    def __init__(self, *arg, **kw):
281        BaseTestCase.__init__(self, *arg, **kw)
282       
283        here_dir = os.path.dirname(os.path.abspath(__file__))
284        wsgiapp = loadapp('config:'+PEPResultHandlerTestCase.INI_FILE, 
285                          relative_to=PEPResultHandlerTestCase.THIS_DIR)
286        self.app = paste.fixture.TestApp(wsgiapp)
287       
288        cfg = SafeConfigParser(dict(here=PEPResultHandlerTestCase.THIS_DIR))
289        cfg.read(jnPath(PEPResultHandlerTestCase.INI_FILEPATH))
290        self.redirectURI = cfg.get('filter:AuthZFilter', 
291                                   'authz.pepResultHandler.redirectURI')
292       
293        self.startSiteAAttributeAuthority(withSSL=True,
294            port=SamlWSGIAuthZTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
295
296       
297    def testRedirectPEPResultHandlerMiddleware(self):
298        # User is logged in but doesn't have the required credentials for
299        # access
300        extra_environ = {
301            'beaker.session.ndg.security':
302                BeakerSessionStub(username=PEPResultHandlerTestCase.OPENID_URI)
303        }
304       
305        # Expecting redirect response to specified redirect URI
306        response = self.app.get('/test_accessDeniedToSecuredURI',
307                                extra_environ=extra_environ,
308                                status=302)
309        print(response)
310        self.assert_(response.header_dict.get('location') == self.redirectURI)
311       
312if __name__ == "__main__":
313    unittest.main()       
Note: See TracBrowser for help on using the repository browser.