source: TI12-security/trunk/MyProxyClient/myproxy/test/test_myproxyclient.py @ 6847

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/MyProxyClient/myproxy/test/test_myproxyclient.py@6847
Revision 6847, 12.7 KB checked in by pjkersha, 11 years ago (diff)

Fix for SSL verification DN and hostname settings. Re-release as 1.0

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""MyProxy Client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "02/07/07"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = """BSD- See LICENSE file in top-level directory"""
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import unittest
16import os
17import sys
18import getpass
19import traceback
20from os import path
21
22from OpenSSL import crypto
23
24from myproxy.client import CaseSensitiveConfigParser, MyProxyClient
25
26mkPath = lambda file: path.join(os.environ['MYPROXYCLIENT_UNITTEST_DIR'], file)
27
28class _MyProxyClientTestCase(unittest.TestCase):
29    '''Base implements environment settings common to all test case classes'''
30    if 'NDGSEC_INT_DEBUG' in os.environ:
31        import pdb
32        pdb.set_trace()
33   
34    if 'MYPROXYCLIENT_UNITTEST_DIR' not in os.environ:
35        os.environ['MYPROXYCLIENT_UNITTEST_DIR'] = \
36                                        path.abspath(path.dirname(__file__))
37
38
39class MyProxyClientLiveTestCase(_MyProxyClientTestCase):
40    '''Tests require a connection to a real MyProxy service running on a host
41    '''
42    CONFIG_FILENAME = "myProxyClientTest.cfg"
43   
44    def setUp(self):
45       
46        super(MyProxyClientLiveTestCase, self).setUp()
47       
48        configParser = CaseSensitiveConfigParser()
49        configFilePath = path.join(os.environ['MYPROXYCLIENT_UNITTEST_DIR'],
50                                   MyProxyClientLiveTestCase.CONFIG_FILENAME)
51        configParser.read(configFilePath)
52       
53        self.cfg = {}
54        for section in configParser.sections():
55            self.cfg[section] = dict(configParser.items(section))
56       
57        configFilePath = path.expandvars(self.cfg['setUp']['cfgFilePath'])
58        self.clnt = MyProxyClient(cfgFilePath=configFilePath)
59       
60    def test01Store(self):
61        # upload X509 cert and private key to repository
62        thisSection = self.cfg['test01Store']
63       
64        passphrase = thisSection.get('passphrase')
65        if passphrase is None:
66            passphrase = getpass.getpass(prompt="\ntest1Store credential "
67                                                "pass-phrase: ")
68           
69        sslKeyFilePassphrase = thisSection.get('sslKeyFilePassphrase')
70        if sslKeyFilePassphrase is None:
71            sslKeyFilePassphrase = getpass.getpass(
72                        prompt="\ntest1Store credential owner pass-phrase: ")
73
74        certFile = path.expandvars(thisSection['ownerCertFile'])
75        keyFile = path.expandvars(thisSection['ownerKeyFile'])
76        sslCertFile = path.expandvars(thisSection['sslCertFile'])
77        sslKeyFile = path.expandvars(thisSection['sslKeyFile'])
78           
79        self.clnt.store(thisSection['username'],
80                        passphrase,
81                        certFile,
82                        keyFile,
83                        sslCertFile=sslCertFile,
84                        sslKeyFile=sslKeyFile,
85                        sslKeyFilePassphrase=sslKeyFilePassphrase,
86                        force=False)
87        print("Store creds for user %s" % thisSection['username'])
88   
89    def test02GetDelegation(self):
90        # retrieve proxy cert./private key
91        thisSection = self.cfg['test02GetDelegation']
92       
93        passphrase = thisSection.get('passphrase')
94        if passphrase is None:
95            passphrase = getpass.getpass(prompt="\ntest2GetDelegation "
96                                                "passphrase: ")
97         
98        proxyCertFile = path.expandvars(thisSection['proxyCertFileOut'])
99        proxyKeyFile = path.expandvars(thisSection['proxyKeyFileOut'])
100
101        creds = self.clnt.getDelegation(thisSection['username'], passphrase)
102        print "proxy credentials:" 
103        print ''.join(creds)
104        open(proxyCertFile, 'w').write(creds[0]+''.join(creds[2:]))
105        open(proxyKeyFile, 'w').write(creds[1])
106
107    def test03Info(self):
108        # Retrieve information about a given credential
109        thisSection = self.cfg['test03Info']
110       
111        # sslKeyFilePassphrase can be omitted from the congif file in which case
112        # the get call below would return None
113        sslKeyFilePassphrase = thisSection.get('sslKeyFilePassphrase')
114        if sslKeyFilePassphrase is None:
115            sslKeyFilePassphrase = getpass.getpass(prompt="\ntest3Info owner "
116                                              "credentials passphrase: ")
117
118        credExists, errorTxt, fields = self.clnt.info(
119                                 thisSection['username'],
120                                 path.expandvars(thisSection['sslCertFile']),
121                                 path.expandvars(thisSection['sslKeyFile']),
122                                 sslKeyFilePassphrase=sslKeyFilePassphrase)
123        print "test3Info... "
124        print "credExists: %s" % credExists
125        print "errorTxt: " + errorTxt
126        print "fields: %s" % fields
127
128    def test04ChangePassphrase(self):       
129        # change pass-phrase protecting a given credential
130        thisSection = self.cfg['test04ChangePassphrase']
131       
132        passphrase = thisSection.get('passphrase')
133        if passphrase is None:
134            passphrase = getpass.getpass(prompt="test4ChangePassphrase - "
135                                                "passphrase: ")
136       
137        newPassphrase = thisSection.get('newPassphrase')
138        if newPassphrase is None:
139            newPassphrase = getpass.getpass(prompt="test4ChangePassphrase "
140                                                   "- new passphrase: ")
141
142            confirmNewPassphrase = getpass.getpass(prompt=\
143                                                   "test4ChangePassphrase "
144                                                   "- confirm new "
145                                                   "passphrase: ")
146
147            if newPassphrase != confirmNewPassphrase:
148                self.fail("New and confirmed new password don't match")
149               
150        sslKeyFilePassphrase = thisSection.get('sslKeyFilePassphrase') or passphrase
151
152        self.clnt.changePassphrase(thisSection['username'],
153                               passphrase,
154                               newPassphrase, 
155                               path.expandvars(thisSection['sslCertFile']),
156                               path.expandvars(thisSection['sslKeyFile']),
157                               sslKeyFilePassphrase=sslKeyFilePassphrase)
158        print("Changed pass-phrase")
159
160    def test05Destroy(self):
161        # destroy credentials for a given user
162        thisSection = self.cfg['test05Destroy']
163       
164        sslKeyFilePassphrase = thisSection.get('sslKeyFilePassphrase')
165        if sslKeyFilePassphrase is None:
166            sslKeyFilePassphrase = getpass.getpass(prompt="\ntest5Destroy "
167                                              "credential owner passphrase: ")
168
169        self.clnt.destroy(thisSection['username'], 
170                      sslCertFile=path.expandvars(thisSection['sslCertFile']),
171                      sslKeyFile=path.expandvars(thisSection['sslKeyFile']),
172                      sslKeyFilePassphrase=sslKeyFilePassphrase)
173        print("Destroy creds for user %s" % thisSection['username'])
174       
175    def test06GetTrustRoots(self):
176        # Test get trust roots command
177        trustRoots = self.clnt.getTrustRoots()
178        self.assert_(trustRoots)
179        self.assert_(isinstance(trustRoots, dict))
180        self.assert_(len(trustRoots) > 0)
181        for fileName, fileContents in trustRoots.items():
182            if fileName.endswith('.0'):
183                # test parsing certificate
184                cert = crypto.load_certificate(crypto.FILETYPE_PEM,
185                                               fileContents)
186                self.assert_(cert)
187                self.assert_(isinstance(cert, crypto.X509))
188                subj = cert.get_subject()
189                self.assert_(subj)
190                print("Trust root certificate retrieved with DN=%s" % subj)
191       
192       
193from myproxy.utils.openssl import OpenSSLConfigError
194
195class MyProxyClientInterfaceTestCase(_MyProxyClientTestCase):
196    '''Test interface for correct getting/setting of attributes'''
197    HOSTCERT_FILENAME = 'localhost.crt'
198    HOSTCERT_FILEPATH = mkPath(HOSTCERT_FILENAME)
199    HOSTCERT_DN = '/C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost'
200   
201    def test01EnvironmentVarsSet(self):
202
203        try:
204            environBackup = os.environ.copy()
205           
206            os.environ['MYPROXY_SERVER'] = 'localhost.domain'
207            os.environ['MYPROXY_SERVER_DN'] = '/O=NDG/OU=Raphael/CN=raphael'
208            os.environ['MYPROXY_SERVER_PORT'] = '20000'
209            client = MyProxyClient(openSSLConfFilePath=mkPath('openssl.conf'),
210                                   proxyCertMaxLifetime=60000,
211                                   proxyCertLifetime=30000,           
212                                   caCertFilePath=mkPath('ndg-test-ca.crt'))
213           
214            self.assert_(client.port == 20000)
215            self.assert_(client.hostname == 'localhost.domain')
216            self.assert_(client.serverDN == '/O=NDG/OU=Raphael/CN=raphael')
217            self.assert_(client.proxyCertMaxLifetime == 60000)
218            self.assert_(client.proxyCertLifetime == 30000)
219            self.assert_(client.openSSLConfFilePath == mkPath('openssl.conf'))
220            self.assert_(client.caCertFilePath == mkPath('ndg-test-ca.crt'))
221        finally:
222            os.environ = environBackup       
223
224    def test02SetProperties(self):
225       
226        client = MyProxyClient()
227        try:
228            client.port = None
229            self.fail("Expecting AttributeError raised from port set to "
230                      "invalid type")
231        except TypeError:
232            pass
233
234        client.port = 8000
235        client.hostname = '127.0.0.1'
236        client.serverDN = '/O=NDG/OU=BADC/CN=raphael'
237        client.proxyCertMaxLifetime = 80000
238        client.proxyCertLifetime = 70000
239       
240        try:
241            client.openSSLConfFilePath = mkPath('ssl.cnf')
242            self.fail("Expecting OpenSSLConfigError raised for invalid file "
243                      "'ssl.cnf'")
244        except OpenSSLConfigError:
245            pass
246       
247        client.caCertFilePath = mkPath('ca.pem') 
248        client.caCertDir = mkPath('/etc/grid-security/certificates') 
249       
250        self.assert_(client.port == 8000)
251        self.assert_(client.hostname == '127.0.0.1')
252        self.assert_(client.serverDN == '/O=NDG/OU=BADC/CN=raphael')
253        self.assert_(client.proxyCertMaxLifetime == 80000)
254        self.assert_(client.proxyCertLifetime == 70000)
255        self.assert_(client.openSSLConfFilePath == mkPath('ssl.cnf'))
256        self.assert_(client.caCertFilePath == mkPath('ca.pem')) 
257        self.assert_(
258                client.caCertDir == mkPath('/etc/grid-security/certificates')) 
259 
260    def test03SSLVerification(self):
261        # SSL verification callback
262       
263        # Ensure no relevant environment variables are set which might affect
264        # the result
265        try:
266            serverDN = os.environ.get(
267                                    MyProxyClient.MYPROXY_SERVER_DN_ENVVARNAME)
268            if serverDN is not None:
269                del os.environ[MyProxyClient.MYPROXY_SERVER_DN_ENVVARNAME]
270       
271            client = MyProxyClient()
272           
273            connection = None
274            errorStatus = 1
275            successStatus = 0
276            errorDepth = 0
277            peerCertStr = open(self.__class__.HOSTCERT_FILEPATH).read()
278            peerCert = crypto.load_certificate(crypto.FILETYPE_PEM, peerCertStr)
279           
280            args = (connection, peerCert, errorStatus, errorDepth, 
281                    successStatus)
282           
283            # This would normally called implicitly during the SSL handshake
284            status = client.serverSSLCertVerify(*args)
285            self.assert_(status == errorStatus)
286           
287            # Won't match because the default 'host/' CN prefix is set
288            client.hostname = 'localhost'
289            status = client.serverSSLCertVerify(*args)
290            self.assert_(status == errorStatus)
291           
292            # Should now match
293            client.serverCNPrefix = ''
294            status = client.serverSSLCertVerify(*args)
295            self.assert_(status == successStatus)
296           
297            # Match based on full DN instead - this takes precedence over
298            # hostname match
299            client.serverDN = self.__class__.HOSTCERT_DN
300            status = client.serverSSLCertVerify(*args)
301            self.assert_(status == successStatus)
302           
303        finally:
304            if serverDN:
305                os.environ[MyProxyClient.MYPROXY_SERVER_DN_ENVVARNAME
306                           ] = serverDN
307                                       
308if __name__ == "__main__":
309    unittest.main()
Note: See TracBrowser for help on using the repository browser.