source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_attributeauthorityclient.py @ 6572

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_attributeauthorityclient.py@6572
Revision 6572, 10.9 KB checked in by pjkersha, 11 years ago (diff)

Working refactored Attribute Authority Client unit tests.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""NDG Attribute Authority WSDL SOAP client unit tests
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "05/05/05, major update 16/01/07"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id:test_attributeauthorityclient.py 4372 2008-10-29 09:45:39Z pjkersha $'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import unittest
16import os, re
17   
18from os.path import expandvars as xpdVars
19
20from ndg.security.common.attributeauthority import (AttributeAuthorityClient, 
21                                                NoMatchingRoleInTrustedHosts)
22from ndg.security.common.AttCert import AttCertRead
23from ndg.security.common.X509 import X509CertParse
24
25from ndg.security.test.unit.attributeauthorityclient import \
26                                        AttributeAuthorityClientBaseTestCase   
27     
28     
29class AttributeAuthorityClientTestCase(AttributeAuthorityClientBaseTestCase):
30    '''NDG Attribute Authority WSDL SOAP client unit tests'''
31    clntPriKeyPwd = None
32    pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----"
33
34    def __init__(self, *arg, **kw):
35        super(AttributeAuthorityClientTestCase, self).__init__(*arg, **kw)           
36        self.startAttributeAuthorities() 
37       
38    def _getCertChainFromProxyCertFile(self, proxyCertFilePath):
39        '''Read proxy cert and user cert from a single PEM file and put in
40        a list ready for input into SignatureHandler'''               
41        proxyCertFileTxt = open(proxyCertFilePath).read()
42       
43        pemPatRE = re.compile(self.__class__.pemPat, re.S)
44        x509CertList = pemPatRE.findall(proxyCertFileTxt)
45       
46        signingCertChain = [X509CertParse(x509Cert) 
47                            for x509Cert in x509CertList]
48   
49        # Expecting proxy cert first - move this to the end.  This will
50        # be the cert used to verify the message signature
51        signingCertChain.reverse()
52       
53        return signingCertChain
54
55    def setUp(self):
56        super(AttributeAuthorityClientTestCase, self).setUp()
57               
58        if 'NDGSEC_INT_DEBUG' in os.environ:
59            import pdb
60            pdb.set_trace()
61           
62        thisSection = self.cfg['setUp']
63       
64        # Instantiate WS proxy
65        self.siteAClnt = AttributeAuthorityClient(uri=thisSection['uri'],
66                                sslPeerCertCN=thisSection.get('sslPeerCertCN'),
67                                sslCACertList=self.sslCACertList,
68                                cfgFileSection='wsse',
69                                cfg=self.cfgParser)           
70
71    def test01GetHostInfo(self):
72        """test01GetHostInfo: retrieve info for AA host"""
73        hostInfo = self.siteAClnt.getHostInfo()
74        print "Host Info:\n %s" % hostInfo       
75
76    def test02GetTrustedHostInfo(self):
77        """test02GetTrustedHostInfo: retrieve trusted host info matching a
78        given role"""
79        trustedHostInfo = self.siteAClnt.getTrustedHostInfo(
80                                 self.cfg['test02GetTrustedHostInfo']['role'])
81        for hostname, hostInfo in trustedHostInfo.items():
82            self.assert_(hostname, "Hostname not set")
83            for k in hostInfo.keys():
84                self.assert_(k, "hostInfo value key unset")
85
86        print "Trusted Host Info:\n %s" % trustedHostInfo
87
88    def test03GetTrustedHostInfoWithNoMatchingRoleFound(self):
89        """test03GetTrustedHostInfoWithNoMatchingRoleFound: test the case
90        where the input role doesn't match any roles in the target AA's map
91        config file"""
92        _cfg = self.cfg['test03GetTrustedHostInfoWithNoMatchingRoleFound']
93        try:
94            trustedHostInfo = self.siteAClnt.getTrustedHostInfo(_cfg['role'])
95            self.fail("Expecting NoMatchingRoleInTrustedHosts exception")
96           
97        except NoMatchingRoleInTrustedHosts, e:
98            print('As expected - no match for role "%s": %s' % 
99                  (_cfg['role'], e))
100
101    def test04GetTrustedHostInfoWithNoRole(self):
102        """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info
103        irrespective of role"""
104        trustedHostInfo = self.siteAClnt.getTrustedHostInfo()
105        for hostname, hostInfo in trustedHostInfo.items():
106            self.assert_(hostname, "Hostname not set")
107            for k in hostInfo.keys():
108                self.assert_(k, "hostInfo value key unset")
109                   
110        print "Trusted Host Info:\n %s" % trustedHostInfo
111       
112    def test05GetAllHostsInfo(self):
113        """test05GetAllHostsInfo: retrieve info for all hosts"""
114        allHostInfo = self.siteAClnt.getAllHostsInfo()
115        for hostname, hostInfo in allHostInfo.items():
116            self.assert_(hostname, "Hostname not set")
117            for k in hostInfo.keys():
118                self.assert_(k, "hostInfo value key unset")
119                   
120        print "All Hosts Info:\n %s" % allHostInfo
121
122    def test06GetAttCert(self):       
123        """test06GetAttCert: Request attribute certificate from NDG Attribute
124        Authority Web Service."""
125        _cfg = self.cfg['test06GetAttCert']
126       
127        # Read user Certificate into a string ready for passing via WS
128        try:
129            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
130            userX509CertTxt = open(userX509CertFilePath, 'r').read()
131       
132        except TypeError:
133            # No issuing cert set
134            userX509CertTxt = None
135               
136        except IOError, ioErr:
137            raise IOError("Error reading certificate file \"%s\": %s" % 
138                          (ioErr.filename, ioErr.strerror))
139
140        # Make attribute certificate request
141        attCert = self.siteAClnt.getAttCert(userX509Cert=userX509CertTxt)
142       
143        print "Attribute Certificate: \n\n:" + str(attCert)
144       
145        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
146        attCert.write()     
147       
148    def test07GetAttCertWithUserIdSet(self):       
149        """test07GetAttCertWithUserIdSet: Request attribute certificate from
150        NDG Attribute Authority Web Service setting a specific user Id
151        independent of the signer of the SOAP request."""
152        _cfg = self.cfg['test07GetAttCertWithUserIdSet']
153       
154        # Read user Certificate into a string ready for passing via WS
155        try:
156            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
157            userX509CertTxt = open(userX509CertFilePath, 'r').read()
158       
159        except TypeError:
160            # No issuing cert set
161            userX509CertTxt = None
162               
163        except IOError, ioErr:
164            raise IOError("Error reading certificate file \"%s\": %s" % 
165                          (ioErr.filename, ioErr.strerror))
166
167        # Make attribute certificate request
168        userId = _cfg['userId']
169        attCert = self.siteAClnt.getAttCert(userId=userId,
170                                            userX509Cert=userX509CertTxt)
171       
172        print "Attribute Certificate: \n\n:" + str(attCert)
173       
174        attCert.filePath = xpdVars(_cfg['attCertFilePath'])
175        attCert.write()
176
177    def test08GetMappedAttCert(self):       
178        """test08GetMappedAttCert: Request mapped attribute certificate from
179        NDG Attribute Authority Web Service."""
180        _cfg = self.cfg['test08GetMappedAttCert']
181       
182        # Read user Certificate into a string ready for passing via WS
183        try:
184            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
185            userX509CertTxt = open(userX509CertFilePath, 'r').read()
186       
187        except TypeError:
188            # No issuing cert set
189            userX509CertTxt = None
190               
191        except IOError, ioErr:
192            raise IOError("Error reading certificate file \"%s\": %s" % 
193                          (ioErr.filename, ioErr.strerror))
194   
195        # Simlarly for Attribute Certificate
196        try:
197            userAttCert = AttCertRead(xpdVars(_cfg['userAttCertFilePath']))
198           
199        except IOError, ioErr:
200            raise Exception("Error reading attribute certificate file \"%s\": "
201                            "%s" % (ioErr.filename, ioErr.strerror))
202       
203        # Make client to site B Attribute Authority
204        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
205                                       cfgFileSection='wsse',
206                                       cfg=self.cfgParser)
207   
208        # Make attribute certificate request
209        attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
210                                       userAttCert=userAttCert)
211        print "Attribute Certificate: \n\n:" + str(attCert)
212       
213        attCert.filePath = xpdVars(_cfg['mappedAttCertFilePath'])
214        attCert.write()
215       
216       
217    def test09GetMappedAttCertStressTest(self):       
218        """test09GetMappedAttCertStressTest: Request mapped attribute
219        certificate from NDG Attribute Authority Web Service."""
220        _cfg = self.cfg['test09GetMappedAttCertStressTest']
221       
222        # Read user Certificate into a string ready for passing via WS
223        try:
224            userX509CertFilePath = xpdVars(_cfg.get('issuingClntCertFilePath'))
225            userX509CertTxt = open(userX509CertFilePath, 'r').read()
226       
227        except TypeError:
228            # No issuing cert set
229            userX509CertTxt = None
230               
231        except IOError, ioErr:
232            raise IOError("Error reading certificate file \"%s\": %s" % 
233                          (ioErr.filename, ioErr.strerror))
234
235        # Make client to site B Attribute Authority
236        siteBClnt = AttributeAuthorityClient(uri=_cfg['uri'], 
237                                       cfgFileSection='wsse',
238                                       cfg=self.cfgParser)
239
240        acFilePathList = [xpdVars(acFile) 
241                          for acFile in _cfg['userAttCertFilePathList'].split()]
242
243        for acFilePath in acFilePathList:
244            try:
245                userAttCert = AttCertRead(acFilePath)
246               
247            except IOError, ioErr:
248                raise Exception("Error reading attribute certificate file "
249                                '"%s": %s' % (ioErr.filename, ioErr.strerror))
250       
251            # Make attribute certificate request
252            try:
253                attCert = siteBClnt.getAttCert(userX509Cert=userX509CertTxt,
254                                               userAttCert=userAttCert)
255            except Exception, e:
256                outFilePfx = 'test09GetMappedAttCertStressTest-%s' % \
257                        os.path.basename(acFilePath)   
258                msgFile = open(outFilePfx+".msg", 'w')
259                msgFile.write('Failed for "%s": %s\n' % (acFilePath, e))
260               
261            self.assert_(attCert)
262
263       
264if __name__ == "__main__":
265    unittest.main()
Note: See TracBrowser for help on using the repository browser.