source: TI12-security/trunk/MyProxyClient/myproxy/test/test_myproxyclient.py @ 6919

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/MyProxyClient/myproxy/test/test_myproxyclient.py@6919
Revision 6919, 14.0 KB checked in by pjkersha, 11 years ago (diff)

New MyProxyClient release 1.1.0

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""MyProxy Client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "02/07/07"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = """BSD- See LICENSE file in top-level directory"""
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import unittest
16import os
17import sys
18from getpass import getpass
19import traceback
20from os import path
21
22from OpenSSL import crypto
23
24from myproxy.client import CaseSensitiveConfigParser, MyProxyClient
25
26mkPath = lambda file: path.join(os.environ['MYPROXYCLIENT_UNITTEST_DIR'], file)
27
28class _MyProxyClientTestCase(unittest.TestCase):
29    '''Base implements environment settings common to all test case classes'''
30    if 'NDGSEC_INT_DEBUG' in os.environ:
31        import pdb
32        pdb.set_trace()
33   
34    if 'MYPROXYCLIENT_UNITTEST_DIR' not in os.environ:
35        os.environ['MYPROXYCLIENT_UNITTEST_DIR'] = \
36                                        path.abspath(path.dirname(__file__))
37
38
39class MyProxyClientLiveTestCase(_MyProxyClientTestCase):
40    '''Tests require a connection to a real MyProxy service running on a host.
41   
42    The server must be set up as a credential repository - i.e. able to receive
43    and store credentials
44    '''
45    CONFIG_FILENAME = "myProxyClientTest.cfg"
46   
47    def setUp(self):
48       
49        super(MyProxyClientLiveTestCase, self).setUp()
50       
51        configParser = CaseSensitiveConfigParser()
52        configFilePath = path.join(os.environ['MYPROXYCLIENT_UNITTEST_DIR'],
53                                   MyProxyClientLiveTestCase.CONFIG_FILENAME)
54        configParser.read(configFilePath)
55       
56        self.cfg = {}
57        for section in configParser.sections():
58            self.cfg[section] = dict(configParser.items(section))
59       
60        configFilePath = path.expandvars(self.cfg['setUp']['cfgFilePath'])
61        self.clnt = MyProxyClient(cfgFilePath=configFilePath)
62
63    def __del__(self):
64        """Clear up CA certs retrieved in test01GetTrustRoots call ready for
65        next run of these unit tests
66        """
67        self._deleteTrustRootFiles()
68       
69    def _deleteTrustRootFiles(self):
70        """Helper method clears up CA certs in trust roots directory set from
71        previous call to test01GetTrustRoots()
72        """       
73        if hasattr(self, 'trustRootFiles'):
74            for fileName in self.trustRootFiles:
75                os.remove(fileName)
76           
77    def test01GetTrustRoots(self):
78        # Test get trust roots command bootstrapping trust
79        trustRoots = self.clnt.getTrustRoots(writeToCACertDir=True, 
80                                             bootstrap=True)
81        self.assert_(trustRoots)
82        self.assert_(isinstance(trustRoots, dict))
83        self.assert_(len(trustRoots) > 0)
84        for fileName, fileContents in trustRoots.items():
85            if fileName.endswith('.0'):
86                # test parsing certificate
87                cert = crypto.load_certificate(crypto.FILETYPE_PEM,
88                                               fileContents)
89                self.assert_(cert)
90                self.assert_(isinstance(cert, crypto.X509))
91                subj = cert.get_subject()
92                self.assert_(subj)
93                print("Trust root certificate retrieved with DN=%s" % subj) 
94               
95        # Keep for __del__ tidy up
96        self.trustRootFiles = []
97       
98        dirContents = os.listdir(self.clnt.caCertDir)
99        for fileName in trustRoots:     
100            self.assert_(fileName in dirContents)
101            self.trustRootFiles.append(os.path.join(self.clnt.caCertDir, 
102                                                    fileName))
103   
104    def test02Store(self):
105        # upload X509 cert and private key to repository
106        thisSection = self.cfg['test02Store']
107       
108        passphrase = thisSection.get('passphrase')
109        if passphrase is None:
110            passphrase = getpass("\ntest02Store credential pass-phrase: ")
111           
112        sslKeyFilePassphrase = thisSection.get('sslKeyFilePassphrase')
113        if sslKeyFilePassphrase is None:
114            sslKeyFilePassphrase = getpass("\ntest02Store credential owner "
115                                           "pass-phrase: ")
116
117        certFile = path.expandvars(thisSection['ownerCertFile'])
118        keyFile = path.expandvars(thisSection['ownerKeyFile'])
119        sslCertFile = path.expandvars(thisSection['sslCertFile'])
120        sslKeyFile = path.expandvars(thisSection['sslKeyFile'])
121           
122        self.clnt.store(thisSection['username'],
123                        passphrase,
124                        certFile,
125                        keyFile,
126                        sslCertFile=sslCertFile,
127                        sslKeyFile=sslKeyFile,
128                        sslKeyFilePassphrase=sslKeyFilePassphrase,
129                        force=False)
130        print("Store creds for user %s" % thisSection['username'])
131   
132    def test03GetDelegation(self):
133        # retrieve proxy cert./private key
134        thisSection = self.cfg['test03GetDelegation']
135       
136        passphrase = thisSection.get('passphrase')
137        if passphrase is None:
138            passphrase = getpass("\ntest03GetDelegation passphrase: ")
139         
140        proxyCertFile = path.expandvars(thisSection['proxyCertFileOut'])
141        proxyKeyFile = path.expandvars(thisSection['proxyKeyFileOut'])
142       
143        creds = self.clnt.getDelegation(thisSection['username'], passphrase)
144        print "proxy credentials:" 
145        print ''.join(creds)
146        open(proxyCertFile, 'w').write(creds[0]+''.join(creds[2:]))
147        open(proxyKeyFile, 'w').write(creds[1])
148
149    def test04Info(self):
150        # Retrieve information about a given credential
151        thisSection = self.cfg['test04Info']
152       
153        # sslKeyFilePassphrase can be omitted from the congif file in which case
154        # the get call below would return None
155        sslKeyFilePassphrase = thisSection.get('sslKeyFilePassphrase')
156        if sslKeyFilePassphrase is None:
157            sslKeyFilePassphrase = getpass("\ntest04Info owner credentials "
158                                           "passphrase: ")
159
160        credExists, errorTxt, fields = self.clnt.info(
161                                 thisSection['username'],
162                                 path.expandvars(thisSection['sslCertFile']),
163                                 path.expandvars(thisSection['sslKeyFile']),
164                                 sslKeyFilePassphrase=sslKeyFilePassphrase)
165        print "test04Info... "
166        print "credExists: %s" % credExists
167        print "errorTxt: " + errorTxt
168        print "fields: %s" % fields
169
170    def test06ChangePassphrase(self):       
171        # change pass-phrase protecting a given credential
172        thisSection = self.cfg['test06ChangePassphrase']
173       
174        passphrase = thisSection.get('passphrase')
175        if passphrase is None:
176            passphrase = getpass("test06ChangePassphrase - passphrase: ")
177       
178        newPassphrase = thisSection.get('newPassphrase')
179        if newPassphrase is None:
180            newPassphrase = getpass("test06ChangePassphrase - new passphrase: ")
181
182            confirmNewPassphrase = getpass("test06ChangePassphrase - confirm "
183                                           "new passphrase: ")
184
185            if newPassphrase != confirmNewPassphrase:
186                self.fail("New and confirmed new password don't match")
187               
188        sslKeyFilePassphrase = thisSection.get('sslKeyFilePassphrase') or \
189                            passphrase
190
191        self.clnt.changePassphrase(thisSection['username'],
192                               passphrase,
193                               newPassphrase, 
194                               path.expandvars(thisSection['sslCertFile']),
195                               path.expandvars(thisSection['sslKeyFile']),
196                               sslKeyFilePassphrase=sslKeyFilePassphrase)
197        print("Changed pass-phrase")
198
199    def test05GetDelegationWithBootstrappedTrustRoots(self):
200        # Get delegation call whilst simulataneously bootstrapping trust roots
201        thisSection = self.cfg['test05GetDelegationWithBootstrappedTrustRoots']
202       
203        passphrase = thisSection.get('passphrase')
204        if passphrase is None:
205            passphrase = getpass("\n"
206                                 "test05GetDelegationWithBootstrappedTrustRoots"
207                                 "passphrase: ")
208       
209        # Ensure any previously set trust root files are removed
210        self._deleteTrustRootFiles()
211       
212        creds = self.clnt.getDelegation(thisSection['username'], passphrase,
213                                        bootstrap=True)
214        print "proxy credentials:" 
215        print ''.join(creds)
216             
217    def test07Destroy(self):
218        # destroy credentials for a given user
219        thisSection = self.cfg['test07Destroy']
220       
221        sslKeyFilePassphrase = thisSection.get('sslKeyFilePassphrase')
222        if sslKeyFilePassphrase is None:
223            sslKeyFilePassphrase = getpass("\ntest07Destroy credential owner "
224                                           "passphrase: ")
225
226        self.clnt.destroy(thisSection['username'], 
227                      sslCertFile=path.expandvars(thisSection['sslCertFile']),
228                      sslKeyFile=path.expandvars(thisSection['sslKeyFile']),
229                      sslKeyFilePassphrase=sslKeyFilePassphrase)
230        print("Destroy creds for user %s" % thisSection['username'])
231       
232       
233from myproxy.utils.openssl import OpenSSLConfigError
234
235class MyProxyClientInterfaceTestCase(_MyProxyClientTestCase):
236    '''Test interface for correct getting/setting of attributes'''
237    HOSTCERT_FILENAME = 'localhost.crt'
238    HOSTCERT_FILEPATH = mkPath(HOSTCERT_FILENAME)
239    HOSTCERT_DN = '/C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost'
240   
241    def test01EnvironmentVarsSet(self):
242
243        try:
244            environBackup = os.environ.copy()
245           
246            os.environ['MYPROXY_SERVER'] = 'localhost.domain'
247            os.environ['MYPROXY_SERVER_DN'] = '/O=NDG/OU=Raphael/CN=raphael'
248            os.environ['MYPROXY_SERVER_PORT'] = '20000'
249            client = MyProxyClient(openSSLConfFilePath=mkPath('openssl.conf'),
250                                   proxyCertMaxLifetime=60000,
251                                   proxyCertLifetime=30000,           
252                                   caCertDir=mkPath(''))
253           
254            self.assert_(client.port == 20000)
255            self.assert_(client.hostname == 'localhost.domain')
256            self.assert_(client.serverDN == '/O=NDG/OU=Raphael/CN=raphael')
257            self.assert_(client.proxyCertMaxLifetime == 60000)
258            self.assert_(client.proxyCertLifetime == 30000)
259            self.assert_(client.openSSLConfFilePath == mkPath('openssl.conf'))
260            self.assert_(client.caCertDir == mkPath(''))
261        finally:
262            os.environ = environBackup       
263
264    def test02SetProperties(self):
265       
266        client = MyProxyClient()
267        try:
268            client.port = None
269            self.fail("Expecting AttributeError raised from port set to "
270                      "invalid type")
271        except TypeError:
272            pass
273
274        client.port = 8000
275        client.hostname = '127.0.0.1'
276        client.serverDN = '/O=NDG/OU=BADC/CN=raphael'
277        client.proxyCertMaxLifetime = 80000
278        client.proxyCertLifetime = 70000
279       
280        try:
281            client.openSSLConfFilePath = mkPath('ssl.cnf')
282            self.fail("Expecting OpenSSLConfigError raised for invalid file "
283                      "'ssl.cnf'")
284        except OpenSSLConfigError:
285            pass
286       
287        client.caCertDir = mkPath('/etc/grid-security/certificates') 
288       
289        self.assert_(client.port == 8000)
290        self.assert_(client.hostname == '127.0.0.1')
291        self.assert_(client.serverDN == '/O=NDG/OU=BADC/CN=raphael')
292        self.assert_(client.proxyCertMaxLifetime == 80000)
293        self.assert_(client.proxyCertLifetime == 70000)
294        self.assert_(client.openSSLConfFilePath == mkPath('ssl.cnf'))
295        self.assert_(
296                client.caCertDir == mkPath('/etc/grid-security/certificates')) 
297 
298    def test03SSLVerification(self):
299        # SSL verification callback
300       
301        # Ensure no relevant environment variables are set which might affect
302        # the result
303        try:
304            serverDN = os.environ.get(
305                                    MyProxyClient.MYPROXY_SERVER_DN_ENVVARNAME)
306            if serverDN is not None:
307                del os.environ[MyProxyClient.MYPROXY_SERVER_DN_ENVVARNAME]
308       
309            client = MyProxyClient()
310             
311            connection = None
312            errorStatus = False
313            successStatus = True
314            errorDepth = 0
315            peerCertStr = open(self.__class__.HOSTCERT_FILEPATH).read()
316            peerCert = crypto.load_certificate(crypto.FILETYPE_PEM, peerCertStr)
317           
318            args = (connection, peerCert, errorStatus, errorDepth, 
319                    successStatus)
320           
321            # This would normally called implicitly during the SSL handshake
322            status = client.serverSSLCertVerify(*args)
323            self.assert_(status == errorStatus)
324           
325            # Won't match because the default 'host/' CN prefix is set
326            client.hostname = 'localhost'
327            status = client.serverSSLCertVerify(*args)
328            self.assert_(status == errorStatus)
329           
330            # Should now match
331            client.serverCNPrefix = ''
332            status = client.serverSSLCertVerify(*args)
333            self.assert_(status == successStatus)
334           
335            # Match based on full DN instead - this takes precedence over
336            # hostname match
337            client.serverDN = self.__class__.HOSTCERT_DN
338            status = client.serverSSLCertVerify(*args)
339            self.assert_(status == successStatus)
340           
341        finally:
342            if serverDN:
343                os.environ[MyProxyClient.MYPROXY_SERVER_DN_ENVVARNAME
344                           ] = serverDN
345                                       
346if __name__ == "__main__":
347    unittest.main()
Note: See TracBrowser for help on using the repository browser.