Ignore:
Timestamp:
16/12/08 16:43:38 (12 years ago)
Author:
pjkersha
Message:
  • Completed Attribute Authority unit test
  • re-issued out of date test certs.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • TI12-security/trunk/python/ndg.security.test/ndg/security/test/attributeauthority/test_attributeauthority.py

    r4654 r4667  
    2121logging.basicConfig() 
    2222 
    23 from ndg.security.common.utils.configfileparsers import \ 
    24     CaseSensitiveConfigParser 
    25 from ndg.security.server.attributeauthority import AttributeAuthority 
    26      
    2723from os.path import expandvars as xpdVars 
    2824from os.path import join as jnPath 
    2925mkPath = lambda file:jnPath(os.environ['NDGSEC_AA_UNITTEST_DIR'], file) 
    3026 
     27from ndg.security.common.utils.configfileparsers import \ 
     28    CaseSensitiveConfigParser 
     29from ndg.security.server.attributeauthority import AttributeAuthority, \ 
     30    AttributeAuthorityNoMatchingRoleInTrustedHosts 
     31 
     32from ndg.security.common.AttCert import AttCert 
     33 
    3134 
    3235class AttributeAuthorityTestCase(unittest.TestCase): 
     
    4851         
    4952        self.cfg = {} 
    50         for section in self.cfgParser.sections(): 
     53        for section in self.cfgParser.sections() + ['DEFAULT']: 
    5154            self.cfg[section] = dict(self.cfgParser.items(section)) 
    5255             
     
    5457                                propFilePath=self.cfg['setUp']['propFilePath'])             
    5558 
     59    _mkSiteBAttributeAuthority = lambda self: AttributeAuthority(\ 
     60                        propFilePath=self.cfg['DEFAULT']['siteBPropFilePath']) 
     61     
    5662    def test01GetHostInfo(self): 
    5763        """test01GetHostInfo: retrieve info for AA host""" 
    58         hostInfo = self.aa.getHostInfo() 
     64        hostInfo = self.aa.hostInfo 
    5965        print("Host Info:\n %s" % hostInfo)      
    6066 
    61 #    def test02GetTrustedHostInfo(self): 
    62 #        """test02GetTrustedHostInfo: retrieve trusted host info matching a 
    63 #        given role""" 
    64 #        trustedHostInfo = self.siteAClnt.getTrustedHostInfo(\ 
    65 #                                 self.cfg['test02GetTrustedHostInfo']['role']) 
    66 #        for hostname, hostInfo in trustedHostInfo.items(): 
    67 #            assert hostname, "Hostname not set" 
    68 #            for k, v in hostInfo.items(): 
    69 #                assert k, "hostInfo value key unset" 
    70 # 
    71 #        print "Trusted Host Info:\n %s" % trustedHostInfo 
    72 # 
    73 #    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self): 
    74 #        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case  
    75 #        where the input role doesn't match any roles in the target AA's map  
    76 #        config file""" 
    77 #        _cfg = self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound'] 
    78 #        try: 
    79 #            trustedHostInfo = self.siteAClnt.getTrustedHostInfo(_cfg['role']) 
    80 #            self.fail("Expecting NoMatchingRoleInTrustedHosts exception") 
    81 #             
    82 #        except NoMatchingRoleInTrustedHosts, e: 
    83 #            print 'As expected - no match for role "%s": %s' % \ 
    84 #                (_cfg['role'], e) 
    85 # 
    86 # 
    87 #    def test04GetTrustedHostInfoWithNoRole(self): 
    88 #        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info  
    89 #        irrespective of role""" 
    90 #        trustedHostInfo = self.siteAClnt.getTrustedHostInfo() 
    91 #        for hostname, hostInfo in trustedHostInfo.items(): 
    92 #            assert hostname, "Hostname not set" 
    93 #            for k, v in hostInfo.items(): 
    94 #                assert k, "hostInfo value key unset" 
    95 #                assert v, ("%s value not set" % k) 
    96 #                    
    97 #        print "Trusted Host Info:\n %s" % trustedHostInfo 
    98 #         
    99 # 
    100 #    def test05GetAllHostsInfo(self): 
    101 #        """test05GetAllHostsInfo: retrieve info for all hosts""" 
    102 #        allHostInfo = self.siteAClnt.getAllHostsInfo() 
    103 #        for hostname, hostInfo in allHostInfo.items(): 
    104 #            assert hostname, "Hostname not set" 
    105 #            for k, v in hostInfo.items(): 
    106 #                assert k, "hostInfo value key unset" 
    107 #                    
    108 #        print "All Hosts Info:\n %s" % allHostInfo 
    109 # 
    110 # 
    111 #    def test06GetAttCert(self):         
    112 #        """test06GetAttCert: Request attribute certificate from NDG Attribute  
    113 #        Authority Web Service.""" 
    114 #        _cfg = self.cfg['test06GetAttCert'] 
    115 #         
    116 #        # Read user Certificate into a string ready for passing via WS 
    117 #        try: 
    118 #            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath')) 
    119 #            userX509CertTxt = open(userX509CertFilePath, 'r').read() 
    120 #         
    121 #        except TypeError: 
    122 #            # No issuing cert set 
    123 #            userX509CertTxt = None 
    124 #                 
    125 #        except IOError, ioErr: 
    126 #            raise Exception("Error reading certificate file \"%s\": %s" % \ 
    127 #                                    (ioErr.filename, ioErr.strerror)) 
    128 # 
    129 #        # Make attribute certificate request 
    130 #        attCert = self.siteAClnt.getAttCert(userX509Cert=userX509CertTxt) 
    131 #         
    132 #        print "Attribute Certificate: \n\n:" + str(attCert) 
    133 #         
    134 #        attCert.filePath = xpdVars(_cfg['attCertFilePath']) 
    135 #        attCert.write() 
    136 #         
    137 #         
    138 #    def test07GetAttCertWithUserIdSet(self):         
    139 #        """test07GetAttCertWithUserIdSet: Request attribute certificate from  
    140 #        NDG Attribute Authority Web Service setting a specific user Id  
    141 #        independent of the signer of the SOAP request.""" 
    142 #        _cfg = self.cfg['test07GetAttCertWithUserIdSet'] 
    143 #         
    144 #        # Read user Certificate into a string ready for passing via WS 
    145 #        try: 
    146 #            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath')) 
    147 #            userX509CertTxt = open(userX509CertFilePath, 'r').read() 
    148 #         
    149 #        except TypeError: 
    150 #            # No issuing cert set 
    151 #            userX509CertTxt = None 
    152 #                 
    153 #        except IOError, ioErr: 
    154 #            raise Exception("Error reading certificate file \"%s\": %s" % \ 
    155 #                                    (ioErr.filename, ioErr.strerror)) 
    156 # 
    157 #        # Make attribute certificate request 
    158 #        userId = _cfg['userId'] 
    159 #        attCert = self.siteAClnt.getAttCert(userId=userId, 
    160 #                                            userX509Cert=userX509CertTxt) 
    161 #         
    162 #        print "Attribute Certificate: \n\n:" + str(attCert) 
    163 #         
    164 #        attCert.filePath = xpdVars(_cfg['attCertFilePath']) 
    165 #        attCert.write() 
    166 # 
    167 # 
    168 #    def test08GetMappedAttCert(self):         
    169 #        """test08GetMappedAttCert: Request mapped attribute certificate from  
    170 #        NDG Attribute Authority Web Service.""" 
    171 #        _cfg = self.cfg['test08GetMappedAttCert'] 
    172 #         
    173 #        # Read user Certificate into a string ready for passing via WS 
    174 #        try: 
    175 #            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath')) 
    176 #            userX509CertTxt = open(userX509CertFilePath, 'r').read() 
    177 #         
    178 #        except TypeError: 
    179 #            # No issuing cert set 
    180 #            userX509CertTxt = None 
    181 #                 
    182 #        except IOError, ioErr: 
    183 #            raise Exception("Error reading certificate file \"%s\": %s" % \ 
    184 #                                    (ioErr.filename, ioErr.strerror)) 
    185 #     
    186 #        # Simlarly for Attribute Certificate  
    187 #        try: 
    188 #            userAttCert = AttCertRead(xpdVars(_cfg['userAttCertFilePath'])) 
    189 #             
    190 #        except IOError, ioErr: 
    191 #            raise Exception("Error reading attribute certificate file \"%s\": " 
    192 #                            "%s" % (ioErr.filename, ioErr.strerror)) 
    193 #         
    194 #        # Make client to site B Attribute Authority 
    195 #        siteBClnt = AttributeAuthority(uri=_cfg['uri'],  
    196 #                                       cfgFileSection='wsse', 
    197 #                                       cfg=self.cfgParser) 
    198 #     
    199 #        # Make attribute certificate request 
    200 #        attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt, 
    201 #                                       userAttCert=userAttCert) 
    202 #        print "Attribute Certificate: \n\n:" + str(attCert) 
    203 #         
    204 #        attCert.filePath = xpdVars(_cfg['mappedAttCertFilePath']) 
    205 #        attCert.write() 
    206 #         
    207 #         
    208 #    def test09GetMappedAttCertStressTest(self):         
    209 #        """test09GetMappedAttCertStressTest: Request mapped attribute  
    210 #        certificate from NDG Attribute Authority Web Service.""" 
    211 #        _cfg = self.cfg['test09GetMappedAttCertStressTest'] 
    212 #         
    213 #        # Read user Certificate into a string ready for passing via WS 
    214 #        try: 
    215 #            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath')) 
    216 #            userX509CertTxt = open(userX509CertFilePath, 'r').read() 
    217 #         
    218 #        except TypeError: 
    219 #            # No issuing cert set 
    220 #            userX509CertTxt = None 
    221 #                 
    222 #        except IOError, ioErr: 
    223 #            raise Exception("Error reading certificate file \"%s\": %s" %  
    224 #                                    (ioErr.filename, ioErr.strerror)) 
    225 # 
    226 #        # Make client to site B Attribute Authority 
    227 #        siteBClnt = AttributeAuthority(uri=_cfg['uri'],  
    228 #                                       cfgFileSection='wsse', 
    229 #                                       cfg=self.cfgParser) 
    230 # 
    231 #        acFilePathList = [xpdVars(file) for file in \ 
    232 #                          _cfg['userAttCertFilePathList'].split()] 
    233 # 
    234 #        for acFilePath in acFilePathList: 
    235 #            try: 
    236 #                userAttCert = AttCertRead(acFilePath) 
    237 #                 
    238 #            except IOError, ioErr: 
    239 #                raise Exception("Error reading attribute certificate file " 
    240 #                                '"%s": %s' % (ioErr.filename, ioErr.strerror)) 
    241 #         
    242 #            # Make attribute certificate request 
    243 #            try: 
    244 #                attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt, 
    245 #                                               userAttCert=userAttCert) 
    246 #            except Exception, e: 
    247 #                outFilePfx = 'test09GetMappedAttCertStressTest-%s' % \ 
    248 #                        os.path.basename(acFilePath)     
    249 #                msgFile = open(outFilePfx+".msg", 'w') 
    250 #                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e)) 
    251  
     67    def test02GetTrustedHostInfo(self): 
     68        """test02GetTrustedHostInfo: retrieve trusted host info matching a 
     69        given role""" 
     70        thisSection = self.cfg['test02GetTrustedHostInfo'] 
     71         
     72        trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role']) 
     73        for hostname, hostInfo in trustedHostInfo.items(): 
     74            self.assert_(hostname, "Hostname not set") 
     75            for k, v in hostInfo.items(): 
     76                self.assert_(k, "hostInfo value key unset") 
     77 
     78        print("Trusted Host Info:\n %s" % trustedHostInfo) 
     79 
     80    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self): 
     81        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case  
     82        where the input role doesn't match any roles in the target AA's map  
     83        config file""" 
     84        thisSection=self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound'] 
     85        try: 
     86            trustedHostInfo = self.aa.getTrustedHostInfo(thisSection['role']) 
     87            self.fail("Expecting NoMatchingRoleInTrustedHosts exception") 
     88             
     89        except AttributeAuthorityNoMatchingRoleInTrustedHosts, e: 
     90            print('PASSED - no match for role "%s": %s' % (thisSection['role'], 
     91                                                           e)) 
     92 
     93 
     94    def test04GetTrustedHostInfoWithNoRole(self): 
     95        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info  
     96        irrespective of role""" 
     97        trustedHostInfo = self.aa.getTrustedHostInfo() 
     98        for hostname, hostInfo in trustedHostInfo.items(): 
     99            self.assert_(hostname, "Hostname not set") 
     100            for k, v in hostInfo.items(): 
     101                self.assert_(k, "hostInfo value key unset") 
     102                self.assert_(v, "%s value not set" % k) 
     103                    
     104        print("Trusted Host Info:\n %s" % trustedHostInfo) 
     105 
     106    def test05GetAttCert(self):         
     107        """test05GetAttCert: Request attribute certificate from NDG Attribute  
     108        Authority Web Service.""" 
     109        thisSection = self.cfg['test05GetAttCert'] 
     110         
     111        # Read user Certificate into a string ready for passing via WS 
     112        try: 
     113            userX509CertFilePath = xpdVars(thisSection.get( 
     114                                                    'issuingClntCertFilePath')) 
     115            userX509CertTxt = open(userX509CertFilePath, 'r').read() 
     116         
     117        except TypeError: 
     118            # No issuing cert set 
     119            userX509CertTxt = None 
     120                 
     121        except IOError, ioErr: 
     122            raise Exception("Error reading certificate file \"%s\": %s" % 
     123                                    (ioErr.filename, ioErr.strerror)) 
     124 
     125        # Make attribute certificate request 
     126        attCert = self.aa.getAttCert(holderX509Cert=userX509CertTxt) 
     127         
     128        print("Attribute Certificate: \n\n:" + str(attCert)) 
     129         
     130        attCert.filePath = xpdVars(thisSection['attCertFilePath']) 
     131        attCert.write() 
     132         
     133         
     134    def test06GetAttCertWithUserIdSet(self):         
     135        """test06GetAttCertWithUserIdSet: Request attribute certificate from  
     136        NDG Attribute Authority Web Service setting a specific user Id  
     137        independent of the signer of the SOAP request.""" 
     138        thisSection = self.cfg['test06GetAttCertWithUserIdSet'] 
     139         
     140        # Make attribute certificate request 
     141        userId = thisSection['userId'] 
     142        attCert = self.aa.getAttCert(userId=userId) 
     143         
     144        print("Attribute Certificate: \n\n:" + str(attCert)) 
     145         
     146        attCert.filePath = xpdVars(thisSection['attCertFilePath']) 
     147        attCert.write() 
     148 
     149 
     150    def test07GetMappedAttCert(self):         
     151        """test07GetMappedAttCert: Request mapped attribute certificate from  
     152        NDG Attribute Authority Web Service.""" 
     153        thisSection = self.cfg['test07GetMappedAttCert'] 
     154         
     155        # Read user Certificate into a string ready for passing via WS 
     156        try: 
     157            userX509CertFilePath = xpdVars(thisSection.get( 
     158                                                    'issuingClntCertFilePath')) 
     159            userX509CertTxt = open(userX509CertFilePath, 'r').read() 
     160         
     161        except TypeError: 
     162            # No issuing cert set 
     163            userX509CertTxt = None 
     164                 
     165        except IOError, ioErr: 
     166            raise Exception("Error reading certificate file \"%s\": %s" %  
     167                                    (ioErr.filename, ioErr.strerror)) 
     168     
     169        # Simlarly for Attribute Certificate  
     170        try: 
     171            userAttCert = AttCert.Read( 
     172                                xpdVars(thisSection['userAttCertFilePath'])) 
     173             
     174        except IOError, ioErr: 
     175            raise Exception("Error reading attribute certificate file \"%s\": " 
     176                            "%s" % (ioErr.filename, ioErr.strerror)) 
     177         
     178        # Make client to site B Attribute Authority 
     179        siteBAA = self._mkSiteBAttributeAuthority() 
     180     
     181        # Make attribute certificate request 
     182        attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt, 
     183                                     userAttCert=userAttCert) 
     184        print("Attribute Certificate: \n\n:" + str(attCert)) 
     185         
     186        attCert.filePath = xpdVars(thisSection['mappedAttCertFilePath']) 
     187        attCert.write() 
     188         
     189         
     190    def test08GetMappedAttCertStressTest(self):         
     191        """test08GetMappedAttCertStressTest: Request mapped attribute  
     192        certificate from NDG Attribute Authority Web Service.""" 
     193        thisSection = self.cfg['test08GetMappedAttCertStressTest'] 
     194         
     195        # Read user Certificate into a string ready for passing via WS 
     196        try: 
     197            userX509CertFilePath = xpdVars(thisSection.get( 
     198                                                    'issuingClntCertFilePath')) 
     199            userX509CertTxt = open(userX509CertFilePath, 'r').read() 
     200         
     201        except TypeError: 
     202            # No issuing cert set 
     203            userX509CertTxt = None 
     204                 
     205        except IOError, ioErr: 
     206            raise Exception("Error reading certificate file \"%s\": %s" %  
     207                                    (ioErr.filename, ioErr.strerror)) 
     208 
     209        # Make client to site B Attribute Authority 
     210        siteBAA = self._mkSiteBAttributeAuthority() 
     211 
     212        acFilePathList = [xpdVars(file) for file in \ 
     213                          thisSection['userAttCertFilePathList'].split()] 
     214 
     215        for acFilePath in acFilePathList: 
     216            try: 
     217                userAttCert = AttCert.Read(acFilePath) 
     218                 
     219            except IOError, ioErr: 
     220                raise Exception("Error reading attribute certificate file " 
     221                                '"%s": %s' % (ioErr.filename, ioErr.strerror)) 
     222         
     223            # Make attribute certificate request 
     224            try: 
     225                attCert = siteBAA.getAttCert(holderX509Cert=userX509CertTxt, 
     226                                             userAttCert=userAttCert) 
     227            except Exception, e: 
     228                outFilePfx = 'test08GetMappedAttCertStressTest-%s' % \ 
     229                        os.path.basename(acFilePath)     
     230                msgFile = open(outFilePfx+".msg", 'w') 
     231                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e)) 
    252232                                         
    253233if __name__ == "__main__": 
Note: See TracChangeset for help on using the changeset viewer.