Ignore:
Timestamp:
24/08/10 16:39:39 (11 years ago)
Author:
pjkersha
Message:

Incomplete - task 2: XACML-Security Integration

  • simplified credential wallet - now a single class SAMLAssertionWallet for caching assertions containing authorisation decision statements and/or attribute statements
File:
1 edited

Legend:

Unmodified
Added
Removed
  • TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py

    r7356 r7359  
    3030from ndg.security.test.unit import BaseTestCase 
    3131from ndg.security.common.utils.etree import prettyPrint 
    32 from ndg.security.common.credentialwallet import (SAMLAttributeWallet, 
    33                                                   SAMLAuthzDecisionWallet) 
     32from ndg.security.common.credentialwallet import SAMLAssertionWallet 
    3433 
    3534 
     
    9493        return AssertionElementTree.fromXML(assertionElem) 
    9594 
    96     def _addCredential(self): 
    97         wallet = SAMLAttributeWallet()    
    98         wallet.addCredential(self.assertion, issuerEndpoint=\ 
    99         self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI) 
     95    def _addCredentials(self): 
     96        wallet = SAMLAssertionWallet()    
     97        wallet.addCredentials(self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI, 
     98                              [self.assertion]) 
    10099        return wallet 
    101100     
    102     def test01AddCredential(self): 
    103         wallet = self._addCredential() 
    104          
    105         self.assert_(len(wallet.credentials) == 1) 
    106         self.assert_(self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI in \ 
    107                      wallet.credentialsKeyedByURI) 
    108         self.assert_(self.__class__.SITEA_SAML_ISSUER_NAME in \ 
    109                      wallet.credentials) 
    110          
    111         assertion = wallet.credentials[ 
    112             self.__class__.SITEA_SAML_ISSUER_NAME 
    113         ].credential 
     101    def test01AddCredentials(self): 
     102        wallet = self._addCredentials() 
     103        k = self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI 
     104        self.assert_(len(wallet.retrieveCredentials(k)) == 1) 
     105        assertions = wallet.retrieveCredentials( 
     106                            self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI) 
     107        self.assert_(assertions) 
    114108         
    115109        print("SAML Assertion:\n%s" %  
    116               prettyPrint(AssertionElementTree.toXML(assertion))) 
     110              prettyPrint(AssertionElementTree.toXML(assertions[0]))) 
    117111     
    118112    def test02VerifyCredential(self): 
    119         wallet = SAMLAttributeWallet() 
     113        wallet = SAMLAssertionWallet() 
    120114        self.assert_(wallet.isValidCredential(self.assertion)) 
    121115         
     
    130124        self.assert_(not wallet.isValidCredential(futureAssertion)) 
    131125         
    132     def test03AuditCredential(self): 
     126    def test03AuditCredentials(self): 
    133127        # Add a short lived credential and ensure it's removed when an audit 
    134128        # is carried to prune expired credentials 
    135129        shortExpiryAssertion = self._createAssertion(validityDuration=1) 
    136         wallet = SAMLAttributeWallet() 
    137         wallet.addCredential(shortExpiryAssertion) 
    138          
    139         self.assert_(len(wallet.credentials) == 1) 
     130        wallet = SAMLAssertionWallet() 
     131        wallet.addCredentials('a', [shortExpiryAssertion]) 
     132         
     133        self.assert_(wallet.retrieveCredentials('a')) 
    140134        sleep(2) 
    141135        wallet.audit() 
    142         self.assert_(len(wallet.credentials) == 0) 
     136        self.assert_(wallet.retrieveCredentials('a') is None) 
    143137 
    144138    def test04ClockSkewTolerance(self): 
     
    146140        # a clock skew of  
    147141        shortExpiryAssertion = self._createAssertion(validityDuration=1) 
    148         wallet = SAMLAttributeWallet() 
     142        wallet = SAMLAssertionWallet() 
    149143         
    150144        # Set a tolerance of five seconds 
    151145        wallet.clockSkewTolerance = 5.*60*60 
    152         wallet.addCredential(shortExpiryAssertion) 
    153          
    154         self.assert_(len(wallet.credentials) == 1) 
     146        wallet.addCredentials('a', [shortExpiryAssertion]) 
     147         
     148        self.assert_(wallet.retrieveCredentials('a')) 
    155149        sleep(2) 
    156150        wallet.audit() 
    157         self.assert_(len(wallet.credentials) == 1) 
     151        self.assert_(wallet.retrieveCredentials('a')) 
    158152         
    159153    def test05ReplaceCredential(self): 
    160154        # Replace an existing credential from a given institution with a more 
    161155        # up to date one 
    162         wallet = self._addCredential() 
    163         self.assert_(len(wallet.credentials) == 1) 
     156        k = self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI 
     157        wallet = self._addCredentials() 
     158        self.assert_(len(wallet.retrieveCredentials(k)) == 1) 
    164159         
    165160        newAssertion = self._createAssertion()   
    166161 
    167         wallet.addCredential(newAssertion) 
    168         self.assert_(len(wallet.credentials) == 1) 
     162        wallet.addCredentials(k, [newAssertion]) 
     163        self.assert_(len(wallet.retrieveCredentials(k)) == 1) 
    169164        self.assert_(newAssertion.conditions.notOnOrAfter == \ 
    170                      wallet.credentials[ 
    171                         self.__class__.SITEA_SAML_ISSUER_NAME 
    172                     ].credential.conditions.notOnOrAfter) 
    173          
    174     def test06CredentialsFromSeparateSites(self): 
    175         wallet = self._addCredential() 
    176         wallet.addCredential(self._createAssertion(issuerName="MySite")) 
    177         self.assert_(len(wallet.credentials) == 2) 
     165                     wallet.retrieveCredentials(k)[0].conditions.notOnOrAfter) 
     166         
     167    def test06CredentialsFromSeparateKeys(self): 
     168        wallet = self._addCredentials() 
     169        wallet.addCredentials("MySite", 
     170                              [self._createAssertion(issuerName="MySite"), 
     171                               self._createAssertion()]) 
     172        self.assert_(len(wallet.retrieveCredentials("MySite")) == 2) 
     173        k = self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI 
     174        self.assert_(len(wallet.retrieveCredentials(k)) == 1) 
    178175 
    179176    def test07Pickle(self): 
    180         wallet = self._addCredential() 
     177        wallet = self._addCredentials() 
    181178        outFile = open(self.__class__.PICKLE_FILEPATH, 'w') 
    182179        pickle.dump(wallet, outFile) 
     
    185182        inFile = open(self.__class__.PICKLE_FILEPATH) 
    186183        unpickledWallet = pickle.load(inFile) 
    187         self.assert_(unpickledWallet.credentialsKeyedByURI.get( 
    188             self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI)) 
    189          
    190         self.assert_(unpickledWallet.credentials.items()[0][1].issuerName == \ 
    191                      BaseTestCase.SITEA_SAML_ISSUER_NAME) 
     184         
     185        assertions = unpickledWallet.retrieveCredentials( 
     186            self.__class__.SITEA_ATTRIBUTEAUTHORITY_SAML_URI) 
     187        self.assert_(assertions) 
     188         
     189        self.assert_(assertions[0].issuer.value == \ 
     190                     self.__class__.SITEA_SAML_ISSUER_NAME) 
    192191 
    193192    def test08CreateFromConfig(self): 
    194         wallet = SAMLAttributeWallet.fromConfig( 
     193        wallet = SAMLAssertionWallet.fromConfig( 
    195194                                self.__class__.CONFIG_FILEPATH) 
    196195        self.assert_(wallet.clockSkewTolerance == timedelta(seconds=0.01)) 
     
    243242        return AssertionElementTree.fromXML(assertionElem) 
    244243                     
    245     def _addCredential(self): 
    246         wallet = SAMLAuthzDecisionWallet()    
    247         wallet.addCredential(self.assertion) 
     244    def _addCredentials(self): 
     245        wallet = SAMLAssertionWallet()    
     246        wallet.addCredentials(self.__class__.RESOURCE_ID, [self.assertion]) 
    248247        return wallet 
    249248     
    250     def test01AddCredential(self): 
    251         wallet = self._addCredential() 
    252          
    253         self.assert_(len(wallet.credentials) == 1) 
    254         self.assert_(self.__class__.RESOURCE_ID in wallet.credentials) 
    255  
    256         assertion = wallet.credentials[self.__class__.RESOURCE_ID].credential 
     249    def test01AddCredentials(self): 
     250        wallet = self._addCredentials() 
     251         
     252        self.assert_( 
     253            len(wallet.retrieveCredentials(self.__class__.RESOURCE_ID)) == 1) 
     254 
     255        assertion = wallet.retrieveCredentials(self.__class__.RESOURCE_ID)[-1] 
    257256         
    258257        print("SAML Assertion:\n%s" %  
     
    260259     
    261260    def test02VerifyCredential(self): 
    262         wallet = SAMLAttributeWallet() 
     261        wallet = SAMLAssertionWallet() 
    263262        self.assert_(wallet.isValidCredential(self.assertion)) 
    264263         
     
    272271 
    273272        self.assert_(not wallet.isValidCredential(futureAssertion)) 
    274          
    275     def test03AuditCredential(self): 
    276         # Add a short lived credential and ensure it's removed when an audit 
    277         # is carried to prune expired credentials 
    278         shortExpiryAssertion = self._createAssertion(validityDuration=1) 
    279         wallet = SAMLAttributeWallet() 
    280         wallet.addCredential(shortExpiryAssertion) 
    281          
    282         self.assert_(len(wallet.credentials) == 1) 
    283         sleep(2) 
    284         wallet.audit() 
    285         self.assert_(len(wallet.credentials) == 0) 
    286  
    287     def test04ClockSkewTolerance(self): 
    288         # Add a short lived credential but with the wallet set to allow for 
    289         # a clock skew of  
    290         shortExpiryAssertion = self._createAssertion(validityDuration=1) 
    291         wallet = SAMLAuthzDecisionWallet() 
    292          
    293         # Set a tolerance of five seconds 
    294         wallet.clockSkewTolerance = 5.*60*60 
    295         wallet.addCredential(shortExpiryAssertion) 
    296          
    297         self.assert_(len(wallet.credentials) == 1) 
    298         sleep(2) 
    299         wallet.audit() 
    300         self.assert_(len(wallet.credentials) == 1) 
    301          
    302     def test05ReplaceCredential(self): 
    303         # Replace an existing credential from a given institution with a more 
    304         # up to date one 
    305         wallet = self._addCredential() 
    306         self.assert_(len(wallet.credentials) == 1) 
    307          
    308         newAssertion = self._createAssertion()   
    309  
    310         wallet.addCredential(newAssertion) 
    311         self.assert_(len(wallet.credentials) == 1) 
    312         self.assert_(newAssertion.conditions.notOnOrAfter == \ 
    313                      wallet.credentials[ 
    314                         self.__class__.RESOURCE_ID 
    315                     ].credential.conditions.notOnOrAfter) 
    316273 
    317274    def test06Pickle(self): 
    318         wallet = self._addCredential() 
     275        wallet = self._addCredentials() 
    319276        outFile = open(self.__class__.PICKLE_FILEPATH, 'w') 
    320277        pickle.dump(wallet, outFile) 
     
    323280        inFile = open(self.__class__.PICKLE_FILEPATH) 
    324281        unpickledWallet = pickle.load(inFile) 
    325         self.assert_(unpickledWallet.credentials.get( 
     282        self.assert_(unpickledWallet.retrieveCredentials( 
    326283                                                    self.__class__.RESOURCE_ID)) 
    327284         
    328285    def test07CreateFromConfig(self): 
    329         wallet = SAMLAttributeWallet.fromConfig( 
     286        wallet = SAMLAssertionWallet.fromConfig( 
    330287                                self.__class__.CONFIG_FILEPATH) 
    331288        self.assert_(wallet.clockSkewTolerance == timedelta(seconds=0.01)) 
Note: See TracChangeset for help on using the changeset viewer.