Ignore:
Timestamp:
27/11/09 11:34:30 (11 years ago)
Author:
pjkersha
Message:
  • Working SamlPIPMiddleware - the Policy Information Point with an interface to the SAML Attribute Authority. The PIP retrieves user credential information for the PDP.
  • Fixed ndg.security.test.unit.wsgi.authz.test_authz unit tests for the above.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py

    r6034 r6062  
    2929from ndg.security.server.attributeauthority import (AttributeAuthority,  
    3030    AttributeAuthorityNoMatchingRoleInTrustedHosts,  
    31     SQLAlchemyAttributeInterface) 
     31    SQLAlchemyAttributeInterface, InvalidAttributeFormat) 
    3232 
    3333from ndg.security.common.AttCert import AttCert 
     
    252252 
    253253class SQLAlchemyAttributeInterfaceTestCase(BaseTestCase): 
    254     SAML_SUBJECT_SQLQUERY = ("select count(*) from users where '%s' || " 
    255                              "openid_identifier = '${userId}'" % 
    256                              BaseTestCase.OPENID_URI_STEM) 
    257      
    258     SAML_FIRSTNAME_SQLQUERY = ("select firstname from users where '%s' || " 
    259                                "openid_identifier = '${userId}'" % 
    260                                BaseTestCase.OPENID_URI_STEM) 
    261              
    262     SAML_LASTNAME_SQLQUERY = ("select lastname from users where '%s' || " 
    263                               "openid_identifier = '${userId}'" % 
    264                               BaseTestCase.OPENID_URI_STEM) 
    265          
    266     SAML_EMAILADDRESS_SQLQUERY = ("select emailaddress from users where '%s' ||" 
    267                                   " openid_identifier = '${userId}'" % 
    268                                   BaseTestCase.OPENID_URI_STEM) 
     254    SAML_SUBJECT_SQLQUERY = ("select count(*) from users where openid = " 
     255                             "'${userId}'") 
     256     
     257    SAML_FIRSTNAME_SQLQUERY = ("select firstname from users where openid = " 
     258                               "'${userId}'") 
     259             
     260    SAML_LASTNAME_SQLQUERY = ("select lastname from users where openid = " 
     261                              "'${userId}'") 
     262         
     263    SAML_EMAILADDRESS_SQLQUERY = ("select emailaddress from users where " 
     264                                  "openid = '${userId}'") 
    269265         
    270266    SAML_ATTRIBUTES_SQLQUERY = ("select attributename from attributes, users " 
    271                                 "where '%s' || users.openid_identifier = " 
    272                                 "'${userId}' and attributes.username = " 
    273                                 "users.username" % BaseTestCase.OPENID_URI_STEM) 
    274 #         
    275 #    SAML_ATTRIBUTES_SQLQUERY = ("select attributename from attributes " 
    276 #                                "where username = 'pjk'") 
     267                                "where users.openid = '${userId}' and " 
     268                                "attributes.username = users.username") 
    277269                                 
    278270    def __init__(self, *arg, **kw): 
     
    403395        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT 
    404396        attributeQuery.subject.nameID.value = \ 
    405                                     "https://openid.localhost/philip.kershaw" 
     397                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI 
    406398         
    407399        fnAttribute = Attribute() 
    408400        fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME 
    409         fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 
     401        fnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT 
    410402        fnAttribute.friendlyName = "FirstName" 
    411403 
     
    414406        lnAttribute = Attribute() 
    415407        lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME 
    416         lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 
     408        lnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT 
    417409        lnAttribute.friendlyName = "LastName" 
    418410 
     
    421413        emailAddressAttribute = Attribute() 
    422414        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME 
    423         emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 
    424                                     XSStringAttributeValue.TYPE_LOCAL_NAME 
    425         emailAddressAttribute.friendlyName = "emailAddress" 
     415        emailAddressAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT 
     416        emailAddressAttribute.friendlyName = "EmailAddress" 
    426417 
    427418        attributeQuery.attributes.append(emailAddressAttribute)                                    
     
    430421        authzAttribute.name = \ 
    431422            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0] 
    432         authzAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 
    433                                     XSStringAttributeValue.TYPE_LOCAL_NAME 
     423        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT 
    434424        authzAttribute.friendlyName = "authz" 
    435425 
     
    482472            '/O=ESG/OU=NCAR/CN=Gateway') 
    483473         
    484         attributeInterface.setProperties(samlAssertionLifetime=28800.) 
     474        attributeInterface.setProperties(samlAssertionLifetime=28800., 
     475                                issuerName='/CN=Attribute Authority/O=Site A') 
    485476         
    486477        attributeInterface.samlSubjectSqlQuery = ( 
     
    504495                    SQLAlchemyAttributeInterfaceTestCase.N_ATTRIBUTE_VALUES) 
    505496 
     497    def test04SamlAttributeQuery(self): 
     498        if self.skipTests: 
     499            return 
     500         
     501        # Prepare a client query 
     502        attributeQuery = AttributeQuery() 
     503        attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 
     504        attributeQuery.id = str(uuid4()) 
     505        attributeQuery.issueInstant = datetime.utcnow() 
     506         
     507        attributeQuery.issuer = Issuer() 
     508        attributeQuery.issuer.format = Issuer.X509_SUBJECT 
     509        attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway' 
     510                         
     511                         
     512        attributeQuery.subject = Subject()   
     513        attributeQuery.subject.nameID = NameID() 
     514        attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT 
     515        attributeQuery.subject.nameID.value = \ 
     516                                SQLAlchemyAttributeInterfaceTestCase.OPENID_URI 
     517     
     518        emailAddressAttribute = Attribute() 
     519        emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME 
     520        emailAddressAttribute.nameFormat = "InvalidFormat" 
     521        emailAddressAttribute.friendlyName = "EmailAddress" 
     522 
     523        attributeQuery.attributes.append(emailAddressAttribute)                                    
     524     
     525        authzAttribute = Attribute() 
     526        authzAttribute.name = \ 
     527            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0] 
     528        authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT 
     529        authzAttribute.friendlyName = "authz" 
     530 
     531        attributeQuery.attributes.append(authzAttribute)                                    
     532         
     533        # Add the response - the interface will populate with an assertion as 
     534        # appropriate 
     535        samlResponse = Response() 
     536         
     537        samlResponse.issueInstant = datetime.utcnow() 
     538        samlResponse.id = str(uuid4()) 
     539        samlResponse.issuer = Issuer() 
     540         
     541        # Initialise to success status but reset on error 
     542        samlResponse.status = Status() 
     543        samlResponse.status.statusCode = StatusCode() 
     544        samlResponse.status.statusMessage = StatusMessage() 
     545        samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI 
     546         
     547        # Nb. SAML 2.0 spec says issuer format must be omitted 
     548        samlResponse.issuer.value = "CEDA" 
     549         
     550        samlResponse.inResponseTo = attributeQuery.id 
     551         
     552        # Set up the interface object 
     553         
     554        # Define queries for SAML attribute names 
     555        samlAttribute2SqlQuery = { 
     556            EsgSamlNamespaces.FIRSTNAME_ATTRNAME:  
     557                SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY, 
     558             
     559            EsgSamlNamespaces.LASTNAME_ATTRNAME:  
     560                SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY, 
     561         
     562            EsgSamlNamespaces.EMAILADDRESS_ATTRNAME:  
     563                SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY, 
     564         
     565            SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]:  
     566                SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY                     
     567        } 
     568         
     569        attributeInterface = SQLAlchemyAttributeInterface( 
     570                                samlAttribute2SqlQuery=samlAttribute2SqlQuery) 
     571         
     572        attributeInterface.connectionString = \ 
     573                        SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR 
     574                 
     575        attributeInterface.samlValidRequestorDNs = ( 
     576            '/O=STFC/OU=CEDA/CN=AuthorisationService', 
     577            '/O=ESG/OU=NCAR/CN=Gateway') 
     578         
     579        attributeInterface.setProperties(samlAssertionLifetime=28800., 
     580                                issuerName='/CN=Attribute Authority/O=Site A') 
     581         
     582        attributeInterface.samlSubjectSqlQuery = ( 
     583            SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY) 
     584         
     585        # Make the query 
     586        try: 
     587            attributeInterface.getAttributes(attributeQuery, samlResponse) 
     588        except InvalidAttributeFormat: 
     589            print("PASSED: caught InvalidAttributeFormat exception") 
     590        else: 
     591            self.fail("Expecting InvalidAttributeFormat exception") 
    506592         
    507593if __name__ == "__main__": 
Note: See TracChangeset for help on using the changeset viewer.