Ignore:
Timestamp:
06/11/09 17:01:37 (11 years ago)
Author:
pjkersha
Message:

SQLAlchemyAttributeInterface working queries but more work needed on SAML attribute / SQL query config.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py

    r5973 r5977  
    242242 
    243243from warnings import warn 
    244      
     244from uuid import uuid4 
     245from datetime import datetime 
     246from saml.saml2.core import (Response, Attribute, SAMLVersion, Subject, NameID, 
     247                             Issuer, AttributeQuery, XSStringAttributeValue,  
     248                             Status, StatusMessage, StatusCode) 
     249from saml.xml import XMLConstants 
     250 
     251 
    245252class SQLAlchemyAttributeInterfaceTestCase(BaseTestCase): 
     253    SAML_SUBJECT_SQLQUERY = ("select count(*) from users where '%s' || " 
     254                             "openid_identifier = '${userId}'" % 
     255                             BaseTestCase.OPENID_URI_STEM) 
     256     
     257    SAML_FIRSTNAME_SQLQUERY = ("select firstname from users where '%s' || " 
     258                               "openid_identifier = '${userId}'" % 
     259                               BaseTestCase.OPENID_URI_STEM) 
     260             
     261    SAML_LASTNAME_SQLQUERY = ("select lastname from users where '%s' || " 
     262                              "openid_identifier = '${userId}'" % 
     263                              BaseTestCase.OPENID_URI_STEM) 
     264         
     265    SAML_EMAILADDRESS_SQLQUERY = ("select emailaddress from users where '%s' || " 
     266                                  "openid_identifier = '${userId}'" % 
     267                                  BaseTestCase.OPENID_URI_STEM) 
     268         
     269    SAML_ATTRIBUTES_SQLQUERY = ("select attributename from attributes where " 
     270                                "'%s' || openid_identifier = '${userId}'" % 
     271                                BaseTestCase.OPENID_URI_STEM) 
     272                                 
    246273    def __init__(self, *arg, **kw): 
    247274        super(SQLAlchemyAttributeInterfaceTestCase, self).__init__(*arg, **kw) 
     
    256283            self.skipTests = True 
    257284         
    258     def test01(self): 
     285    def test01TrySamlAttribute2SqlQuery__setattr__(self): 
    259286        if self.skipTests: 
    260287            return 
    261288         
    262289        attributeInterface = SQLAlchemyAttributeInterface() 
    263         attributeInterface.samlAttribute2SqlQueryMap_firstName = 'Philip' 
    264         setattr(attributeInterface, 'samlAttribute2SqlQueryMap.lastName', 
    265                 'Kershaw') 
    266         attributeInterface.samlAttribute2SqlQueryMap['emailAddress'] = ('pjk' 
    267                                                             '@somewhere.ac.uk') 
    268  
    269                                  
     290         
     291        # Define queries for SAML attribute names 
     292        attributeInterface.samlAttribute2SqlQuery_firstName = \ 
     293            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY 
     294             
     295        setattr(attributeInterface, 'samlAttribute2SqlQuery.lastName', 
     296            SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY) 
     297         
     298        attributeInterface.samlAttribute2SqlQuery['emailAddress'] = ( 
     299            SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY) 
     300         
     301        attributeInterface.samlAttribute2SqlQuery_attributes = ( 
     302            SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY) 
     303         
     304    def test02SetProperties(self): 
     305        # test setProperties interface for instance attribute assignment 
     306        if self.skipTests: 
     307            return 
     308         
     309        properties = { 
     310            'connectionString':  
     311                SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR, 
     312             
     313            'samlSubjectSqlQuery': 
     314                SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY, 
     315                 
     316            'samlAttribute2SqlQuery.urn:esg:first:name': 
     317                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY, 
     318             
     319            'samlAttribute2SqlQuery.urn:esg:last:name': 
     320                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY, 
     321         
     322            'samlAttribute2SqlQuery.urn:esg:first:email:address': 
     323                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY, 
     324         
     325            'samlAttribute2SqlQuery_attributes': 
     326                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY, 
     327             
     328            'samlValidRequestorDNs': ('/O=STFC/OU=CEDA/CN=AuthorisationService', 
     329                                      '/O=ESG/OU=NCAR/CN=Gateway'), 
     330            'samlAssertionLifetime': 86400, 
     331 
     332        } 
     333        attributeInterface = SQLAlchemyAttributeInterface() 
     334        attributeInterface.setProperties(**properties) 
     335         
     336        self.assert_(attributeInterface.samlAttribute2SqlQuery['firstName'] == \ 
     337            SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY) 
     338         
     339        self.assert_(attributeInterface.connectionString == \ 
     340                     SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR) 
     341         
     342        # Test constructor setting properties 
     343        attributeInterface2 = SQLAlchemyAttributeInterface(**properties) 
     344        self.assert_(attributeInterface2.samlAssertionLifetime.days == 1) 
     345         
     346    def test03SamlAttributeQuery(self): 
     347        if self.skipTests: 
     348            return 
     349         
     350        # Prepare a client query 
     351        attributeQuery = AttributeQuery() 
     352        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 
     353        attributeQuery.id = str(uuid4()) 
     354        attributeQuery.issueInstant = datetime.utcnow() 
     355         
     356        attributeQuery.issuer = Issuer() 
     357        attributeQuery.issuer.format = Issuer.X509_SUBJECT 
     358        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway' 
     359                         
     360                         
     361        attributeQuery.subject = Subject()   
     362        attributeQuery.subject.nameID = NameID() 
     363        attributeQuery.subject.nameID.format = "urn:esg:openid" 
     364        attributeQuery.subject.nameID.value = \ 
     365                                    "https://openid.localhost/philip.kershaw" 
     366         
     367        fnAttribute = Attribute() 
     368        fnAttribute.name = "urn:esg:first:name" 
     369        fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 
     370        fnAttribute.friendlyName = "FirstName" 
     371 
     372        attributeQuery.attributes.append(fnAttribute) 
     373     
     374        lnAttribute = Attribute() 
     375        lnAttribute.name = "urn:esg:last:name" 
     376        lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 
     377        lnAttribute.friendlyName = "LastName" 
     378 
     379        attributeQuery.attributes.append(lnAttribute) 
     380     
     381        emailAddressAttribute = Attribute() 
     382        emailAddressAttribute.name = "urn:esg:email:address" 
     383        emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 
     384                                    XSStringAttributeValue.TYPE_LOCAL_NAME 
     385        emailAddressAttribute.friendlyName = "emailAddress" 
     386 
     387        attributeQuery.attributes.append(emailAddressAttribute)                                    
     388     
     389        authzAttribute = Attribute() 
     390        authzAttribute.name = \ 
     391            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0] 
     392        authzAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 
     393                                    XSStringAttributeValue.TYPE_LOCAL_NAME 
     394        authzAttribute.friendlyName = "authz" 
     395 
     396        attributeQuery.attributes.append(authzAttribute)                                    
     397         
     398        # Add the response - the interface will populate with an assertion as 
     399        # appropraite 
     400        samlResponse = Response() 
     401         
     402        samlResponse.issueInstant = datetime.utcnow() 
     403        samlResponse.id = str(uuid4()) 
     404        samlResponse.issuer = Issuer() 
     405         
     406        # Initialise to success status but reset on error 
     407        samlResponse.status = Status() 
     408        samlResponse.status.statusCode = StatusCode() 
     409        samlResponse.status.statusMessage = StatusMessage() 
     410        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI 
     411         
     412        # Nb. SAML 2.0 spec says issuer format must be omitted 
     413        samlResponse.issuer.value = "CEDA" 
     414         
     415        samlResponse.inResponseTo = attributeQuery.id 
     416         
     417        # Set up the interface object 
     418         
     419        # Define queries for SAML attribute names 
     420        samlAttribute2SqlQuery = { 
     421            'firstName':  
     422                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY, 
     423             
     424            'lastName':  
     425                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY, 
     426         
     427            'emailAddress':  
     428                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY, 
     429         
     430            'attributes':  
     431                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY,                                
     432        } 
     433         
     434        attributeInterface = SQLAlchemyAttributeInterface( 
     435                                samlAttribute2SqlQuery=samlAttribute2SqlQuery) 
     436         
     437        attributeInterface.connectionString = \ 
     438                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR 
     439                 
     440        attributeInterface.samlValidRequestorDNs = ( 
     441            '/O=STFC/OU=CEDA/CN=AuthorisationService', 
     442            '/O=ESG/OU=NCAR/CN=Gateway') 
     443         
     444        attributeInterface.setProperties(samlAssertionLifetime=28800.) 
     445         
     446        attributeInterface.samlSubjectSqlQuery = ( 
     447            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY) 
     448         
     449        # Make the query 
     450        attributeInterface.getAttributes(attributeQuery, samlResponse) 
     451         
     452        self.assert_( 
     453                samlResponse.status.statusCode.value == StatusCode.SUCCESS_URI) 
     454        self.assert_(samlResponse.inResponseTo == attributeQuery.id) 
     455        self.assert_(samlResponse.assertions[0].subject.nameID.value == \ 
     456                     attributeQuery.subject.nameID.value) 
     457 
     458         
    270459if __name__ == "__main__": 
    271460    unittest.main() 
Note: See TracChangeset for help on using the changeset viewer.