Ignore:
Timestamp:
05/03/10 16:56:35 (11 years ago)
Author:
pjkersha
Message:

Refactoring Attribute Authority to remove NDG Attribute Certificate and role mapping code.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py

    r6615 r6686  
    1919logging.basicConfig(level=logging.DEBUG) 
    2020 
    21 from os.path import expandvars as xpdVars 
    22 from os.path import join as jnPath 
    23 mkPath = lambda file:jnPath(os.environ['NDGSEC_AA_UNITTEST_DIR'], file) 
     21from warnings import warn 
     22from uuid import uuid4 
     23from datetime import datetime 
     24from os import path 
    2425 
    2526from ndg.security.test.unit import BaseTestCase 
     
    2829    CaseSensitiveConfigParser) 
    2930from ndg.security.server.attributeauthority import (AttributeAuthority,  
    30     AttributeAuthorityNoMatchingRoleInTrustedHosts,  
    3131    SQLAlchemyAttributeInterface, InvalidAttributeFormat) 
    3232 
    33 from ndg.security.common.AttCert import AttCert 
    34  
    35  
    36 class AttributeAuthorityTestCase(BaseTestCase): 
    37     clntPriKeyPwd = None 
    38  
    39     def setUp(self): 
    40         super(AttributeAuthorityTestCase, self).setUp() 
    41          
    42         if 'NDGSEC_INT_DEBUG' in os.environ: 
    43             import pdb 
    44             pdb.set_trace() 
    45          
    46         if 'NDGSEC_AA_UNITTEST_DIR' not in os.environ: 
    47             os.environ['NDGSEC_AA_UNITTEST_DIR'] = \ 
    48                 os.path.abspath(os.path.dirname(__file__)) 
    49  
    50         self.cfgParser = CaseSensitiveConfigParser() 
    51         cfgFilePath = mkPath('test_attributeauthority.cfg') 
    52         self.cfgParser.read(cfgFilePath) 
    53          
    54         self.cfg = {} 
    55         for section in self.cfgParser.sections() + ['DEFAULT']: 
    56             self.cfg[section] = dict(self.cfgParser.items(section)) 
    57              
    58         self.aa = AttributeAuthority.fromPropertyFile( 
    59                                             self.cfg['setUp']['propFilePath']) 
    60  
    61     _mkSiteBAttributeAuthority = lambda self: \ 
    62         AttributeAuthority.fromPropertyFile( 
    63                         propFilePath=self.cfg['DEFAULT']['siteBPropFilePath']) 
    64      
    65     def test01GetHostInfo(self): 
    66         """test01GetHostInfo: retrieve info for AA host""" 
    67         hostInfo = self.aa.hostInfo 
    68         print("Host Info:\n %s" % hostInfo)      
    69  
    70     def test02GetTrustedHostInfo(self): 
    71         """test02GetTrustedHostInfo: retrieve trusted host info matching a 
    72         given role""" 
    73         thisSection = self.cfg['test02GetTrustedHostInfo'] 
    74          
    75         trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role']) 
    76         for hostname, hostInfo in trustedHostInfo.items(): 
    77             self.assert_(hostname, "Hostname not set") 
    78             for k, v in hostInfo.items(): 
    79                 self.assert_(k, "hostInfo value key unset") 
    80  
    81         print("Trusted Host Info:\n %s" % trustedHostInfo) 
    82  
    83     def test03GetTrustedHostInfoWithNoMatchingRoleFound(self): 
    84         """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case  
    85         where the input role doesn't match any roles in the target AA's map  
    86         config file""" 
    87         thisSection = self.cfg[ 
    88                             'test03GetTrustedHostInfoWithNoMatchingRoleFound'] 
    89         try: 
    90             trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role']) 
    91             self.fail("Expecting NoMatchingRoleInTrustedHosts exception") 
    92              
    93         except AttributeAuthorityNoMatchingRoleInTrustedHosts, e: 
    94             print('PASSED - no match for role "%s": %s' % (thisSection['role'], 
    95                                                            e)) 
    96  
    97  
    98     def test04GetTrustedHostInfoWithNoRole(self): 
    99         """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info  
    100         irrespective of role""" 
    101         trustedHostInfo = self.aa.getTrustedHostInfo() 
    102         for hostname, hostInfo in trustedHostInfo.items(): 
    103             self.assert_(hostname, "Hostname not set") 
    104             for k, v in hostInfo.items(): 
    105                 self.assert_(k, "hostInfo value key unset") 
    106                     
    107         print("Trusted Host Info:\n %s" % trustedHostInfo) 
    108  
    109     def test05GetAttCert(self):         
    110         """test05GetAttCert: Request attribute certificate from NDG Attribute  
    111         Authority Web Service.""" 
    112         thisSection = self.cfg['test05GetAttCert'] 
    113          
    114         # Read user Certificate into a string ready for passing via WS 
    115         try: 
    116             userX509CertFilePath = xpdVars(thisSection.get( 
    117                                                     'issuingClntCertFilePath')) 
    118             userX509CertTxt = open(userX509CertFilePath, 'r').read() 
    119          
    120         except TypeError: 
    121             # No issuing cert set 
    122             userX509CertTxt = None 
    123                  
    124         except IOError, ioErr: 
    125             raise Exception("Error reading certificate file \"%s\": %s" % 
    126                                     (ioErr.filename, ioErr.strerror)) 
    127  
    128         # Make attribute certificate request 
    129         attCert = self.aa.getAttCert(holderX509Cert=userX509CertTxt) 
    130          
    131         print("Attribute Certificate: \n\n:" + str(attCert)) 
    132          
    133         attCert.filePath = xpdVars(thisSection['attCertFilePath']) 
    134         attCert.write() 
    135          
    136          
    137     def test06GetAttCertWithUserIdSet(self):         
    138         """test06GetAttCertWithUserIdSet: Request attribute certificate from  
    139         NDG Attribute Authority Web Service setting a specific user Id  
    140         independent of the signer of the SOAP request.""" 
    141         thisSection = self.cfg['test06GetAttCertWithUserIdSet'] 
    142          
    143         # Make attribute certificate request 
    144         userId = thisSection['userId'] 
    145         attCert = self.aa.getAttCert(userId=userId) 
    146          
    147         print("Attribute Certificate: \n\n:" + str(attCert)) 
    148          
    149         attCert.filePath = xpdVars(thisSection['attCertFilePath']) 
    150         attCert.write() 
    151  
    152  
    153     def test07GetMappedAttCert(self):         
    154         """test07GetMappedAttCert: Request mapped attribute certificate from  
    155         NDG Attribute Authority Web Service.""" 
    156         thisSection = self.cfg['test07GetMappedAttCert'] 
    157          
    158         # Read user Certificate into a string ready for passing via WS 
    159         try: 
    160             userX509CertFilePath = xpdVars(thisSection.get( 
    161                                                     'issuingClntCertFilePath')) 
    162             userX509CertTxt = open(userX509CertFilePath, 'r').read() 
    163          
    164         except TypeError: 
    165             # No issuing cert set 
    166             userX509CertTxt = None 
    167                  
    168         except IOError, ioErr: 
    169             raise Exception("Error reading certificate file \"%s\": %s" %  
    170                                     (ioErr.filename, ioErr.strerror)) 
    171      
    172         # Simlarly for Attribute Certificate  
    173         try: 
    174             userAttCert = AttCert.Read( 
    175                                 xpdVars(thisSection['userAttCertFilePath'])) 
    176              
    177         except IOError, ioErr: 
    178             raise Exception("Error reading attribute certificate file \"%s\": " 
    179                             "%s" % (ioErr.filename, ioErr.strerror)) 
    180          
    181         # Make client to site B Attribute Authority 
    182         siteBAA = self._mkSiteBAttributeAuthority() 
    183      
    184         # Make attribute certificate request 
    185         attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt, 
    186                                      userAttCert=userAttCert) 
    187         print("Attribute Certificate: \n\n:" + str(attCert)) 
    188          
    189         attCert.filePath = xpdVars(thisSection['mappedAttCertFilePath']) 
    190         attCert.write() 
    191          
    192          
    193     def test08GetMappedAttCertStressTest(self):         
    194         """test08GetMappedAttCertStressTest: Request mapped attribute  
    195         certificate from NDG Attribute Authority Web Service.""" 
    196         thisSection = self.cfg['test08GetMappedAttCertStressTest'] 
    197          
    198         # Read user Certificate into a string ready for passing via WS 
    199         try: 
    200             userX509CertFilePath = xpdVars(thisSection.get( 
    201                                                     'issuingClntCertFilePath')) 
    202             userX509CertTxt = open(userX509CertFilePath, 'r').read() 
    203          
    204         except TypeError: 
    205             # No issuing cert set 
    206             userX509CertTxt = None 
    207                  
    208         except IOError, ioErr: 
    209             raise Exception("Error reading certificate file \"%s\": %s" %  
    210                                     (ioErr.filename, ioErr.strerror)) 
    211  
    212         # Make client to site B Attribute Authority 
    213         siteBAA = self._mkSiteBAttributeAuthority() 
    214  
    215         acFilePathList = [xpdVars(file) for file in \ 
    216                           thisSection['userAttCertFilePathList'].split()] 
    217  
    218         passed = True 
    219         for acFilePath in acFilePathList: 
    220             try: 
    221                 userAttCert = AttCert.Read(acFilePath) 
    222                  
    223             except IOError, ioErr: 
    224                 raise Exception("Error reading attribute certificate file " 
    225                                 '"%s": %s' % (ioErr.filename, ioErr.strerror)) 
    226          
    227             # Make attribute certificate request 
    228             try: 
    229                 attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt, 
    230                                              userAttCert=userAttCert) 
    231             except Exception, e: 
    232                 passed = True 
    233                 outFilePfx = 'test08GetMappedAttCertStressTest-%s' % \ 
    234                         os.path.basename(acFilePath)     
    235                 msgFile = open(outFilePfx+".msg", 'w') 
    236                 msgFile.write('Failed for "%s": %s\n' % (acFilePath, e)) 
    237                  
    238         self.assert_(passed,  
    239                      "At least one Attribute Certificate request failed.  " 
    240                      "Check the .msg files in this directory") 
    241  
    242  
    243 from warnings import warn 
    244 from uuid import uuid4 
    245 from datetime import datetime 
    246 from ndg.saml.saml2.core import (Response, Attribute, SAMLVersion, Subject, NameID, 
    247                              Issuer, AttributeQuery, XSStringAttributeValue,  
    248                              Status, StatusMessage, StatusCode) 
     33from ndg.saml.saml2.core import (Response, Attribute, SAMLVersion, Subject,  
     34                                 NameID, Issuer, AttributeQuery,  
     35                                 XSStringAttributeValue, Status, StatusMessage,  
     36                                 StatusCode) 
    24937from ndg.saml.xml import XMLConstants 
    25038from ndg.security.common.saml_utils.esg import EsgSamlNamespaces 
    25139 
     40THIS_DIR = path.dirname(__file__) 
     41 
     42 
     43class AttributeAuthorityTestCase(BaseTestCase): 
     44    THIS_DIR = THIS_DIR 
     45    PROPERTIES_FILENAME = 'test_attributeauthority.cfg' 
     46    PROPERTIES_FILEPATH = path.join(THIS_DIR, PROPERTIES_FILENAME) 
     47    ISSUER_NAME = '/O=My Organisation/OU=Centre/CN=Attribute Authority' 
     48     
     49    def test01ParsePropertiesFile(self): 
     50        cls = AttributeAuthorityTestCase 
     51        aa = AttributeAuthority.fromPropertyFile(cls.PROPERTIES_FILEPATH) 
     52        self.assert_(aa) 
     53        self.assert_(aa.assertionLifetime == 3600) 
     54        self.assert_(aa.issuerName == cls.ISSUER_NAME) 
     55         
     56    def test02FromPropertis(self): 
     57         
     58        # Casts from string to float 
     59        assertionLifetime = "86400" 
     60        issuerName = 'My issuer' 
     61        aa = AttributeAuthority.fromProperties(issuerName=issuerName, 
     62                                            assertionLifetime=assertionLifetime) 
     63        self.assert_(aa) 
     64        self.assert_(aa.assertionLifetime == float(assertionLifetime)) 
     65        self.assert_(aa.issuerName == issuerName) 
     66 
    25267 
    25368class SQLAlchemyAttributeInterfaceTestCase(BaseTestCase): 
     69    THIS_DIR = THIS_DIR 
     70    PROPERTIES_FILENAME = 'test_sqlalchemyattributeinterface.cfg' 
     71    PROPERTIES_FILEPATH = path.join(THIS_DIR, PROPERTIES_FILENAME) 
     72     
    25473    SAML_SUBJECT_SQLQUERY = ("select count(*) from users where openid = " 
    25574                             "'${userId}'") 
     
    364183            return 
    365184        cfgParser = CaseSensitiveConfigParser() 
    366         cfgFilePath = mkPath('test_sqlalchemyattributeinterface.cfg') 
     185        cls = SQLAlchemyAttributeInterfaceTestCase 
     186        cfgFilePath = cls.PROPERTIES_FILEPATH 
    367187        cfgParser.read(cfgFilePath) 
    368188         
Note: See TracChangeset for help on using the changeset viewer.