source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/test_authz.py @ 6788

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/test_authz.py@6788
Revision 6788, 11.8 KB checked in by pjkersha, 11 years ago (diff)

Contains important fix for OpenIDProviderMiddleware - moved OpenIDResponse object from class member to session key to preserve separation between user sessions in sign in process. This bug was manifest in users being incorrectly redirected following login.

Line 
1#!/usr/bin/env python
2"""Unit tests for WSGI Authorization handler
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "21/05/09"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12import logging
13
14
15import unittest
16import os
17from urlparse import urlunsplit
18
19from os.path import expandvars as xpdVars
20from os.path import join as jnPath
21mkPath = lambda file: jnPath(os.environ['NDGSEC_COMBINED_SRVS_UNITTEST_DIR'], 
22                             file)
23from ConfigParser import SafeConfigParser
24
25import paste.fixture
26from paste.deploy import loadapp
27
28from ndg.security.test.unit import BaseTestCase
29from ndg.security.server.wsgi import NDGSecurityMiddlewareBase
30from ndg.security.server.wsgi.authz.result_handler.basic import \
31    PEPResultHandlerMiddleware
32from ndg.security.server.wsgi.authz.result_handler.redirect import \
33    HTTPRedirectPEPResultHandlerMiddleware
34from ndg.security.server.wsgi.authz import SamlPIPMiddlewareConfigError
35from ndg.security.common.authz.msi import Response
36
37
38class RedirectFollowingAccessDenied(PEPResultHandlerMiddleware):
39   
40    @NDGSecurityMiddlewareBase.initCall
41    def __call__(self, environ, start_response):
42       
43        queryString = environ.get('QUERY_STRING', '')
44        if 'admin=1' in queryString:
45            # User has been rejected access to a URI requiring admin rights,
46            # try redirect to the same URI minus the admin query arg, this
47            # request will pass because admin rights aren't needed
48            queryArgs = queryString.split('&')
49            queryList = [arg for arg in queryArgs if arg != 'admin=1']
50            editedQuery = '&'.join(queryList)
51            redirectURI = urlunsplit(('', '', self.pathInfo, editedQuery, ''))
52            return self.redirect(redirectURI)
53        else:
54            return super(RedirectFollowingAccessDenied, self).__call__(
55                                                            environ,
56                                                            start_response)
57
58       
59class TestAuthZMiddleware(object):
60    '''Test Application for the Authentication handler to protect'''
61    response = "Test Authorization application"
62       
63    def __init__(self, app_conf, **local_conf):
64        pass
65   
66    def __call__(self, environ, start_response):
67       
68        if environ['PATH_INFO'] == '/test_401':
69            status = "401 Unauthorized"
70           
71        elif environ['PATH_INFO'] == '/test_403':
72            status = "403 Forbidden"
73           
74        elif environ['PATH_INFO'] == '/test_200':
75            status = "200 OK"
76           
77        elif environ['PATH_INFO'] == '/test_accessDeniedToSecuredURI':
78            # Nb. AuthZ middleware should intercept the request and bypass this
79            # response
80            status = "200 OK"
81           
82        elif environ['PATH_INFO'] == '/test_accessGrantedToSecuredURI':
83            status = "200 OK"
84        else:
85            status = "404 Not found"
86               
87        start_response(status,
88                       [('Content-length', 
89                         str(len(TestAuthZMiddleware.response))),
90                        ('Content-type', 'text/plain')])
91        return [TestAuthZMiddleware.response]
92
93
94class BeakerSessionStub(dict):
95    """Emulate beaker.session session object for purposes of the unit tests
96    """
97    def save(self):
98        pass
99
100       
101class TestAuthZMiddleware(object):
102    '''Test Application for the Authentication handler to protect'''
103    response = "Test Authorization application"
104       
105    def __init__(self, app_conf, **local_conf):
106        pass
107   
108    def __call__(self, environ, start_response):
109       
110        if environ['PATH_INFO'] == '/test_401':
111            status = "401 Unauthorized"
112           
113        elif environ['PATH_INFO'] == '/test_403':
114            status = "403 Forbidden"
115           
116        elif environ['PATH_INFO'] == '/test_200':
117            status = "200 OK"
118           
119        elif environ['PATH_INFO'] == '/test_accessDeniedToSecuredURI':
120            # Nb. AuthZ middleware should intercept the request and bypass this
121            # response
122            status = "200 OK"
123           
124        elif environ['PATH_INFO'] == '/test_accessGrantedToSecuredURI':
125            status = "200 OK"
126        else:
127            status = "404 Not found"
128               
129        start_response(status,
130                       [('Content-length', 
131                         str(len(TestAuthZMiddleware.response))),
132                        ('Content-type', 'text/plain')])
133        return [TestAuthZMiddleware.response]
134
135
136class BeakerSessionStub(dict):
137    """Emulate beaker.session session object for purposes of the unit tests
138    """
139    def save(self):
140        pass
141 
142   
143class SamlWSGIAuthZTestCase(BaseTestCase):
144    INI_FILE = 'saml-test.ini'
145    THIS_DIR = os.path.dirname(os.path.abspath(__file__))
146    def __init__(self, *args, **kwargs):       
147        BaseTestCase.__init__(self, *args, **kwargs)
148
149       
150        wsgiapp = loadapp('config:'+SamlWSGIAuthZTestCase.INI_FILE, 
151                          relative_to=SamlWSGIAuthZTestCase.THIS_DIR)
152        self.app = paste.fixture.TestApp(wsgiapp)
153       
154        self.startSiteAAttributeAuthority(withSSL=True,
155            port=SamlWSGIAuthZTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
156       
157
158    def test01CatchNoBeakerSessionFound(self):
159       
160        # PEPFilterConfigError is raised if no beaker.session is set in
161        # environ
162        try:
163            response = self.app.get('/test_200')
164        except SamlPIPMiddlewareConfigError, e:
165            print("ok - expected: %s exception: %s" % (e.__class__, e))
166       
167    def test02Ensure200WithNotLoggedInAndUnsecuredURI(self):
168       
169        # Check the authZ middleware leaves the response alone if the URI
170        # is not matched in the policy
171       
172        # Simulate a beaker.session in the environ
173        extra_environ={'beaker.session.ndg.security':BeakerSessionStub()}
174        response = self.app.get('/test_200',
175                                extra_environ=extra_environ)
176
177    def test03Catch401WithLoggedIn(self):
178       
179        # Check that the application being secured can raise a HTTP 401
180        # response and that this respected by the Authorization middleware
181        # even though a user is set in the session
182       
183        extra_environ = {
184            'beaker.session.ndg.security':
185                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
186        }
187        response = self.app.get('/test_401', 
188                                extra_environ=extra_environ,
189                                status=401)
190
191    def test04Catch403WithLoggedIn(self):
192       
193        # Check that the application being secured can raise a HTTP 403
194        # response and that this respected by the Authorization middleware
195        # even though a user is set in the session
196       
197        extra_environ = {
198            'beaker.session.ndg.security':
199                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
200        }
201        response = self.app.get('/test_403', 
202                                extra_environ=extra_environ,
203                                status=403)
204
205    def test05Catch401WithNotLoggedInAndSecuredURI(self):
206       
207        # AuthZ middleware grants access because the URI requested is not
208        # targeted in the policy
209       
210        # AuthZ middleware checks for username key in session set by AuthN
211        # handler
212        extra_environ={'beaker.session.ndg.security':BeakerSessionStub()}       
213        response = self.app.get('/test_accessDeniedToSecuredURI',
214                                extra_environ=extra_environ,
215                                status=401)
216       
217    def test06AccessDeniedForSecuredURI(self):
218       
219        # User is logged in but doesn't have the required credentials for
220        # access
221        extra_environ = {
222            'beaker.session.ndg.security':
223                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
224        }
225       
226        response = self.app.get('/test_accessDeniedToSecuredURI',
227                                extra_environ=extra_environ,
228                                status=403)
229        print response
230        self.assert_("Insufficient privileges to access the "
231                     "resource" in response)
232
233    def test07AccessGrantedForSecuredURI(self):
234       
235        # User is logged in and has credentials for access to a URI secured
236        # by the policy file
237        extra_environ = {
238            'beaker.session.ndg.security':
239                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
240        }
241       
242        response = self.app.get('/test_accessGrantedToSecuredURI',
243                                extra_environ=extra_environ,
244                                status=200)
245        self.assert_(TestAuthZMiddleware.response in response)
246        print response
247
248    def test08AccessDeniedForAdminQueryArg(self):
249       
250        # User is logged in but doesn't have the required credentials for
251        # access
252        extra_environ = {
253            'beaker.session.ndg.security':
254                BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI)
255        }
256       
257        # Try this URI with the query arg admin=1.  This will be picked up
258        # by the policy as a request requiring admin rights.  The request is
259        # denied as the user doesn't have these rights but this then calls
260        # into play the PEP result handler defined in this module,
261        # RedirectFollowingAccessDenied.  This class reinvokes the request
262        # but without the admin query argument.  Access is then granted for
263        # the redirected request
264        response = self.app.get('/test_accessGrantedToSecuredURI',
265                                params={'admin': 1},
266                                extra_environ=extra_environ,
267                                status=302)
268        try:
269            redirectResponse = response.follow(extra_environ=extra_environ)
270        except paste.fixture.AppError, e:
271            self.failIf(TestAuthZMiddleware.response not in response)
272        print response
273
274
275class PEPResultHandlerTestCase(BaseTestCase):
276    INI_FILE = 'pep-result-handler-test.ini'
277    THIS_DIR = os.path.dirname(os.path.abspath(__file__))
278    INI_FILEPATH = jnPath(THIS_DIR, INI_FILE)
279   
280    def __init__(self, *arg, **kw):
281        BaseTestCase.__init__(self, *arg, **kw)
282       
283        here_dir = os.path.dirname(os.path.abspath(__file__))
284        wsgiapp = loadapp('config:'+PEPResultHandlerTestCase.INI_FILE, 
285                          relative_to=PEPResultHandlerTestCase.THIS_DIR)
286        self.app = paste.fixture.TestApp(wsgiapp)
287       
288        cfg = SafeConfigParser(dict(here=PEPResultHandlerTestCase.THIS_DIR))
289        cfg.read(jnPath(PEPResultHandlerTestCase.INI_FILEPATH))
290        self.redirectURI = cfg.get('filter:AuthZFilter', 
291                                   'authz.pepResultHandler.redirectURI')
292       
293        self.startSiteAAttributeAuthority(withSSL=True,
294            port=SamlWSGIAuthZTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
295
296       
297    def testRedirectPEPResultHandlerMiddleware(self):
298        # User is logged in but doesn't have the required credentials for
299        # access
300        extra_environ = {
301            'beaker.session.ndg.security':
302                BeakerSessionStub(username=PEPResultHandlerTestCase.OPENID_URI)
303        }
304       
305        # Expecting redirect response to specified redirect URI
306        response = self.app.get('/test_accessDeniedToSecuredURI',
307                                extra_environ=extra_environ,
308                                status=302)
309        print(response)
310        self.assert_(response.header_dict.get('location') == self.redirectURI)
311       
312if __name__ == "__main__":
313    unittest.main()       
Note: See TracBrowser for help on using the repository browser.