Ignore:
Timestamp:
16/02/10 16:11:08 (11 years ago)
Author:
pjkersha
Message:
  • Important fix for SOAP client used with SAML SOAP binding: set text/xml content type.
  • Refactored SAML SOAP binding query clients.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/wsgi/saml/test_soapauthzdecisioninterface.py

    r6575 r6578  
    11#!/usr/bin/env python 
    2 """Unit tests for WSGI SAML 2.0 SOAP Attribute Query Interface 
     2"""Unit tests for WSGI SAML 2.0 SOAP Authorisation Decision Query Interface 
    33 
    44NERC DataGrid Project 
    55""" 
    66__author__ = "P J Kershaw" 
    7 __date__ = "21/08/09" 
    8 __copyright__ = "(C) 2009 Science and Technology Facilities Council" 
     7__date__ = "15/02/2010" 
     8__copyright__ = "(C) 2010 Science and Technology Facilities Council" 
    99__license__ = "BSD - see LICENSE file in top-level directory" 
    1010__contact__ = "Philip.Kershaw@stfc.ac.uk" 
    1111__revision__ = '$Id: $' 
     12import unittest 
    1213from uuid import uuid4 
    13 from datetime import datetime 
     14from datetime import datetime, timedelta 
    1415from cStringIO import StringIO 
    1516 
    16 from saml.saml2.core import (Attribute, SAMLVersion, Subject, NameID, Issuer,  
    17                              AttributeQuery, XSStringAttributeValue,  
    18                              StatusCode) 
    19 from saml.xml import XMLConstants 
    20 from saml.xml.etree import AttributeQueryElementTree, ResponseElementTree 
     17from saml.saml2.core import (SAMLVersion, Subject, NameID, Issuer, Response, 
     18                             AuthzDecisionQuery, AuthzDecisionStatement, Status, 
     19                             StatusCode, StatusMessage, DecisionType, Action,  
     20                             Conditions, Assertion) 
     21from saml.xml.etree import AuthzDecisionQueryElementTree, ResponseElementTree 
    2122 
    2223from ndg.security.common.soap.etree import SOAPEnvelope 
     
    2526 
    2627 
    27 class SOAPAuthzDecisionInterfaceMiddlewareTestCase(unittest.TestCase): 
     28class TestAuthorisationServiceMiddleware(object): 
     29    """Test Authorisation Service interface stub""" 
     30    QUERY_INTERFACE_KEYNAME_OPTNAME = 'queryInterfaceKeyName' 
     31    RESOURCE_URI = 'http://localhost/dap/data/' 
     32    ISSUER_DN = '/O=Site A/CN=PDP' 
     33     
     34    def __init__(self, app, global_conf, **app_conf): 
     35        self.queryInterfaceKeyName = app_conf[ 
     36            TestAuthorisationServiceMiddleware.QUERY_INTERFACE_KEYNAME_OPTNAME] 
     37        self._app = app 
     38     
     39    def __call__(self, environ, start_response): 
     40        environ[self.queryInterfaceKeyName] = self.authzDecisionQueryFactory() 
     41        return self._app(environ, start_response) 
     42     
     43    def authzDecisionQueryFactory(self): 
     44        def authzDecisionQuery(query): 
     45            response = Response() 
     46            now = datetime.utcnow() 
     47            response.issueInstant = now 
     48             
     49            # Make up a request ID that this response is responding to 
     50            response.inResponseTo = query.id 
     51            response.id = str(uuid4()) 
     52            response.version = SAMLVersion(SAMLVersion.VERSION_20) 
     53                 
     54            response.issuer = Issuer() 
     55            response.issuer.format = Issuer.X509_SUBJECT 
     56            response.issuer.value = TestAuthorisationServiceMiddleware.ISSUER_DN 
     57             
     58            response.status = Status() 
     59            response.status.statusCode = StatusCode() 
     60            response.status.statusCode.value = StatusCode.SUCCESS_URI 
     61            response.status.statusMessage = StatusMessage()         
     62            response.status.statusMessage.value = \ 
     63                                                "Response created successfully" 
     64                
     65            assertion = Assertion() 
     66            assertion.version = SAMLVersion(SAMLVersion.VERSION_20) 
     67            assertion.id = str(uuid4()) 
     68            assertion.issueInstant = now 
     69             
     70            authzDecisionStatement = AuthzDecisionStatement() 
     71            authzDecisionStatement.decision = DecisionType.PERMIT 
     72            authzDecisionStatement.resource = \ 
     73                TestAuthorisationServiceMiddleware.RESOURCE_URI 
     74            authzDecisionStatement.actions.append(Action()) 
     75            authzDecisionStatement.actions[-1].namespace = Action.GHPP_NS_URI 
     76            authzDecisionStatement.actions[-1].value = Action.HTTP_GET_ACTION 
     77            assertion.authzDecisionStatements.append(authzDecisionStatement) 
     78             
     79            # Add a conditions statement for a validity of 8 hours 
     80            assertion.conditions = Conditions() 
     81            assertion.conditions.notBefore = now 
     82            assertion.conditions.notOnOrAfter = now + timedelta(seconds=60*60*8) 
     83                    
     84            assertion.subject = Subject()   
     85            assertion.subject.nameID = NameID() 
     86            assertion.subject.nameID.format = query.subject.nameID.format 
     87            assertion.subject.nameID.value = query.subject.nameID.value 
     88                 
     89            assertion.issuer = Issuer() 
     90            assertion.issuer.format = Issuer.X509_SUBJECT 
     91            assertion.issuer.value = \ 
     92                                    TestAuthorisationServiceMiddleware.ISSUER_DN 
     93     
     94            response.assertions.append(assertion) 
     95            return response 
     96         
     97        return authzDecisionQuery 
     98     
     99     
     100class SOAPAuthzDecisionInterfaceMiddlewareTestCase( 
     101                                        SoapSamlInterfaceMiddlewareTestCase): 
    28102    CONFIG_FILENAME = 'authz-decision-interface.ini' 
    29103 
    30104    def _createAuthzDecisionQuery(self,  
    31                         issuer="/O=Site A/CN=Authorisation Service", 
    32                         subject="https://openid.localhost/philip.kershaw"): 
    33         query = AttributeQuery() 
     105                    issuer="/O=Site A/CN=PEP", 
     106                    subject="https://openid.localhost/philip.kershaw", 
     107                    resource=TestAuthorisationServiceMiddleware.RESOURCE_URI, 
     108                    action=Action.HTTP_GET_ACTION, 
     109                    actionNs=Action.GHPP_NS_URI): 
     110        query = AuthzDecisionQuery() 
    34111        query.version = SAMLVersion(SAMLVersion.VERSION_20) 
    35112        query.id = str(uuid4()) 
     
    45122        query.subject.nameID.value = subject 
    46123                                     
    47          
    48         # special case handling for 'FirstName' attribute 
    49         fnAttribute = Attribute() 
    50         fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME 
    51         fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 
    52         fnAttribute.friendlyName = "FirstName" 
    53  
    54         query.attributes.append(fnAttribute) 
    55      
    56         # special case handling for 'LastName' attribute 
    57         lnAttribute = Attribute() 
    58         lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME 
    59         lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 
    60         lnAttribute.friendlyName = "LastName" 
    61  
    62         query.attributes.append(lnAttribute) 
    63      
    64         # special case handling for 'LastName' attribute 
    65         emailAddressAttribute = Attribute() 
    66         emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME 
    67         emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 
    68                                     XSStringAttributeValue.TYPE_LOCAL_NAME 
    69         emailAddressAttribute.friendlyName = "emailAddress" 
    70  
    71         query.attributes.append(emailAddressAttribute)   
     124        query.resource = resource          
     125        query.actions.append(Action()) 
     126        query.actions[0].namespace = actionNs 
     127        query.actions[0].value = action     
    72128 
    73129        return query 
     
    79135            query = self._createAuthzDecisionQuery(**kw) 
    80136             
    81         elem = AuthzDecusionQueryElementTree.toXML(query) 
     137        elem = AuthzDecisionQueryElementTree.toXML(query) 
    82138        soapRequest = SOAPEnvelope() 
    83139        soapRequest.create() 
     
    115171            'Content-type': 'text/xml' 
    116172        } 
    117         response = self.app.post('/attributeauthority/saml',  
     173        response = self.app.post('/authorisationservice/',  
    118174                                 params=request,  
    119175                                 headers=header,  
     
    127183        self.assert_(samlResponse.assertions[0].subject.nameID.value == \ 
    128184                     query.subject.nameID.value) 
    129  
    130     def test02AttributeReleaseDenied(self): 
    131         request = self._makeRequest(issuer="/O=Site B/CN=Authorisation Service") 
     185        self.assert_(samlResponse.assertions[0]) 
     186        self.assert_(samlResponse.assertions[0].authzDecisionStatements[0]) 
     187        self.assert_(samlResponse.assertions[0].authzDecisionStatements[0 
     188                                            ].decision == DecisionType.PERMIT) 
    132189         
    133         header = { 
    134             'soapAction': "http://www.oasis-open.org/committees/security", 
    135             'Content-length': str(len(request)), 
    136             'Content-type': 'text/xml' 
    137         } 
    138          
    139         response = self.app.post('/attributeauthority/saml',  
    140                                  params=request,  
    141                                  headers=header,  
    142                                  status=200) 
    143          
    144         print("Response status=%d" % response.status) 
    145          
    146         samlResponse = self._getSAMLResponse(response.body) 
    147  
    148         self.assert_(samlResponse.status.statusCode.value == \ 
    149                      StatusCode.INVALID_ATTR_NAME_VALUE_URI) 
    150  
    151     def test03InvalidAttributesRequested(self): 
    152         query = self._createAuthzDecisionQuery() 
    153          
    154         # Add an unsupported Attribute name 
    155         attribute = Attribute() 
    156         attribute.name = "urn:my:attribute" 
    157         attribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 
    158                                     XSStringAttributeValue.TYPE_LOCAL_NAME 
    159         attribute.friendlyName = "myAttribute" 
    160         query.attributes.append(attribute)      
    161          
    162         request = self._makeRequest(query=query) 
    163             
    164         header = { 
    165             'soapAction': "http://www.oasis-open.org/committees/security", 
    166             'Content-length': str(len(request)), 
    167             'Content-type': 'text/xml' 
    168         } 
    169         
    170         response = self.app.post('/attributeauthority/saml',  
    171                                  params=request,  
    172                                  headers=header,  
    173                                  status=200) 
    174          
    175         print("Response status=%d" % response.status) 
    176          
    177         samlResponse = self._getSAMLResponse(response.body) 
    178  
    179         self.assert_(samlResponse.status.statusCode.value == \ 
    180                      StatusCode.INVALID_ATTR_NAME_VALUE_URI) 
    181          
    182     def test04InvalidIssuer(self): 
    183         request = self._makeRequest(issuer="My Attribute Query Issuer") 
    184          
    185         header = { 
    186             'soapAction': "http://www.oasis-open.org/committees/security", 
    187             'Content-length': str(len(request)), 
    188             'Content-type': 'text/xml' 
    189         } 
    190         
    191         response = self.app.post('/attributeauthority/saml',  
    192                                  params=request,  
    193                                  headers=header,  
    194                                  status=200) 
    195          
    196         print("Response status=%d" % response.status) 
    197          
    198         samlResponse = self._getSAMLResponse(response.body) 
    199  
    200         self.assert_(samlResponse.status.statusCode.value == \ 
    201                      StatusCode.REQUEST_DENIED_URI) 
    202  
    203     def test05UnknownPrincipal(self): 
    204         request = self._makeRequest(subject="Joe.Bloggs") 
    205          
    206         header = { 
    207             'soapAction': "http://www.oasis-open.org/committees/security", 
    208             'Content-length': str(len(request)), 
    209             'Content-type': 'text/xml' 
    210         } 
    211          
    212         response = self.app.post('/attributeauthority/saml',  
    213                                  params=request,  
    214                                  headers=header,  
    215                                  status=200) 
    216          
    217         print("Response status=%d" % response.status) 
    218          
    219         samlResponse = self._getSAMLResponse(response.body) 
    220  
    221         self.assert_(samlResponse.status.statusCode.value == \ 
    222                      StatusCode.UNKNOWN_PRINCIPAL_URI) 
    223  
     190    def test02(self): 
     191        pass 
    224192  
    225193if __name__ == "__main__": 
Note: See TracChangeset for help on using the changeset viewer.