source: TI12-security/trunk/MyProxyClient/myproxy/test/test_myproxyclient.py @ 6918

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/MyProxyClient/myproxy/test/test_myproxyclient.py@6918
Revision 6918, 14.0 KB checked in by pjkersha, 11 years ago (diff)

Incomplete - task 5: MyProxy? Logon HTTPS Interface

  • Updated MyProxyClient with bootstrap capability for get trust roots and logon methods.
  • Property svn:executable set to *
  • Property svn:keywords set to Id
RevLine 
[1858]1#!/usr/bin/env python
[4654]2"""MyProxy Client unit tests
[1858]3
4NERC Data Grid Project
5"""
[2909]6__author__ = "P J Kershaw"
7__date__ = "02/07/07"
[4770]8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
[5048]9__license__ = """BSD- See LICENSE file in top-level directory"""
[4404]10__contact__ = "Philip.Kershaw@stfc.ac.uk"
[2270]11__revision__ = '$Id$'
[6827]12import logging
13logging.basicConfig(level=logging.DEBUG)
[1967]14
[1858]15import unittest
16import os
17import sys
[6918]18from getpass import getpass
[1861]19import traceback
[6827]20from os import path
[1858]21
[6837]22from OpenSSL import crypto
[6827]23
[4644]24from myproxy.client import CaseSensitiveConfigParser, MyProxyClient
[1858]25
[6827]26mkPath = lambda file: path.join(os.environ['MYPROXYCLIENT_UNITTEST_DIR'], file)
[3176]27
[4637]28class _MyProxyClientTestCase(unittest.TestCase):
29    '''Base implements environment settings common to all test case classes'''
30    if 'NDGSEC_INT_DEBUG' in os.environ:
31        import pdb
32        pdb.set_trace()
[1858]33   
[4637]34    if 'MYPROXYCLIENT_UNITTEST_DIR' not in os.environ:
35        os.environ['MYPROXYCLIENT_UNITTEST_DIR'] = \
[6827]36                                        path.abspath(path.dirname(__file__))
[4637]37
38
39class MyProxyClientLiveTestCase(_MyProxyClientTestCase):
[6918]40    '''Tests require a connection to a real MyProxy service running on a host.
41   
42    The server must be set up as a credential repository - i.e. able to receive
43    and store credentials
[4637]44    '''
[6827]45    CONFIG_FILENAME = "myProxyClientTest.cfg"
[4637]46   
[1858]47    def setUp(self):
48       
[4637]49        super(MyProxyClientLiveTestCase, self).setUp()
[3176]50       
[4624]51        configParser = CaseSensitiveConfigParser()
[6827]52        configFilePath = path.join(os.environ['MYPROXYCLIENT_UNITTEST_DIR'],
53                                   MyProxyClientLiveTestCase.CONFIG_FILENAME)
[3176]54        configParser.read(configFilePath)
[1858]55       
56        self.cfg = {}
57        for section in configParser.sections():
58            self.cfg[section] = dict(configParser.items(section))
[1861]59       
[6827]60        configFilePath = path.expandvars(self.cfg['setUp']['cfgFilePath'])
61        self.clnt = MyProxyClient(cfgFilePath=configFilePath)
[6918]62
63    def __del__(self):
64        """Clear up CA certs retrieved in test01GetTrustRoots call ready for
65        next run of these unit tests
66        """
67        self._deleteTrustRootFiles()
[4624]68       
[6918]69    def _deleteTrustRootFiles(self):
70        """Helper method clears up CA certs in trust roots directory set from
71        previous call to test01GetTrustRoots()
72        """       
73        if hasattr(self, 'trustRootFiles'):
74            for fileName in self.trustRootFiles:
75                os.remove(fileName)
76           
77    def test01GetTrustRoots(self):
78        # Test get trust roots command bootstrapping trust
79        trustRoots = self.clnt.getTrustRoots(writeToCACertDir=True, 
80                                             bootstrap=True)
81        self.assert_(trustRoots)
82        self.assert_(isinstance(trustRoots, dict))
83        self.assert_(len(trustRoots) > 0)
84        for fileName, fileContents in trustRoots.items():
85            if fileName.endswith('.0'):
86                # test parsing certificate
87                cert = crypto.load_certificate(crypto.FILETYPE_PEM,
88                                               fileContents)
89                self.assert_(cert)
90                self.assert_(isinstance(cert, crypto.X509))
91                subj = cert.get_subject()
92                self.assert_(subj)
93                print("Trust root certificate retrieved with DN=%s" % subj) 
94               
95        # Keep for __del__ tidy up
96        self.trustRootFiles = []
97       
98        dirContents = os.listdir(self.clnt.caCertDir)
99        for fileName in trustRoots:     
100            self.assert_(fileName in dirContents)
101            self.trustRootFiles.append(os.path.join(self.clnt.caCertDir, 
102                                                    fileName))
103   
104    def test02Store(self):
[6827]105        # upload X509 cert and private key to repository
[6918]106        thisSection = self.cfg['test02Store']
[6827]107       
[4624]108        passphrase = thisSection.get('passphrase')
[2893]109        if passphrase is None:
[6918]110            passphrase = getpass("\ntest02Store credential pass-phrase: ")
[1945]111           
[6840]112        sslKeyFilePassphrase = thisSection.get('sslKeyFilePassphrase')
113        if sslKeyFilePassphrase is None:
[6918]114            sslKeyFilePassphrase = getpass("\ntest02Store credential owner "
115                                           "pass-phrase: ")
[3176]116
[6844]117        certFile = path.expandvars(thisSection['ownerCertFile'])
118        keyFile = path.expandvars(thisSection['ownerKeyFile'])
[6840]119        sslCertFile = path.expandvars(thisSection['sslCertFile'])
120        sslKeyFile = path.expandvars(thisSection['sslKeyFile'])
[1861]121           
[4624]122        self.clnt.store(thisSection['username'],
123                        passphrase,
124                        certFile,
125                        keyFile,
[6840]126                        sslCertFile=sslCertFile,
127                        sslKeyFile=sslKeyFile,
128                        sslKeyFilePassphrase=sslKeyFilePassphrase,
[4624]129                        force=False)
[4637]130        print("Store creds for user %s" % thisSection['username'])
[1858]131   
[6918]132    def test03GetDelegation(self):
[6827]133        # retrieve proxy cert./private key
[6918]134        thisSection = self.cfg['test03GetDelegation']
[4624]135       
136        passphrase = thisSection.get('passphrase')
[2893]137        if passphrase is None:
[6918]138            passphrase = getpass("\ntest03GetDelegation passphrase: ")
[1861]139         
[6827]140        proxyCertFile = path.expandvars(thisSection['proxyCertFileOut'])
141        proxyKeyFile = path.expandvars(thisSection['proxyKeyFileOut'])
[6918]142       
[6829]143        creds = self.clnt.getDelegation(thisSection['username'], passphrase)
[4624]144        print "proxy credentials:" 
145        print ''.join(creds)
[6837]146        open(proxyCertFile, 'w').write(creds[0]+''.join(creds[2:]))
[4624]147        open(proxyKeyFile, 'w').write(creds[1])
[1858]148
[6918]149    def test04Info(self):
[6827]150        # Retrieve information about a given credential
[6918]151        thisSection = self.cfg['test04Info']
[1945]152       
[6840]153        # sslKeyFilePassphrase can be omitted from the congif file in which case
[1945]154        # the get call below would return None
[6840]155        sslKeyFilePassphrase = thisSection.get('sslKeyFilePassphrase')
156        if sslKeyFilePassphrase is None:
[6918]157            sslKeyFilePassphrase = getpass("\ntest04Info owner credentials "
158                                           "passphrase: ")
[1881]159
[4624]160        credExists, errorTxt, fields = self.clnt.info(
[6837]161                                 thisSection['username'],
[6840]162                                 path.expandvars(thisSection['sslCertFile']),
163                                 path.expandvars(thisSection['sslKeyFile']),
164                                 sslKeyFilePassphrase=sslKeyFilePassphrase)
[6918]165        print "test04Info... "
[4624]166        print "credExists: %s" % credExists
167        print "errorTxt: " + errorTxt
168        print "fields: %s" % fields
[1858]169
[6918]170    def test06ChangePassphrase(self):       
[6827]171        # change pass-phrase protecting a given credential
[6918]172        thisSection = self.cfg['test06ChangePassphrase']
[4624]173       
174        passphrase = thisSection.get('passphrase')
175        if passphrase is None:
[6918]176            passphrase = getpass("test06ChangePassphrase - passphrase: ")
[4624]177       
[4637]178        newPassphrase = thisSection.get('newPassphrase')
[4624]179        if newPassphrase is None:
[6918]180            newPassphrase = getpass("test06ChangePassphrase - new passphrase: ")
[1861]181
[6918]182            confirmNewPassphrase = getpass("test06ChangePassphrase - confirm "
183                                           "new passphrase: ")
[1861]184
[4624]185            if newPassphrase != confirmNewPassphrase:
186                self.fail("New and confirmed new password don't match")
187               
[6918]188        sslKeyFilePassphrase = thisSection.get('sslKeyFilePassphrase') or \
189                            passphrase
[4624]190
191        self.clnt.changePassphrase(thisSection['username'],
[6827]192                               passphrase,
193                               newPassphrase, 
[6840]194                               path.expandvars(thisSection['sslCertFile']),
195                               path.expandvars(thisSection['sslKeyFile']),
196                               sslKeyFilePassphrase=sslKeyFilePassphrase)
[4637]197        print("Changed pass-phrase")
[4624]198
[6918]199    def test05GetDelegationWithBootstrappedTrustRoots(self):
200        # Get delegation call whilst simulataneously bootstrapping trust roots
201        thisSection = self.cfg['test05GetDelegationWithBootstrappedTrustRoots']
202       
203        passphrase = thisSection.get('passphrase')
204        if passphrase is None:
205            passphrase = getpass("\n"
206                                 "test05GetDelegationWithBootstrappedTrustRoots"
207                                 "passphrase: ")
208       
209        # Ensure any previously set trust root files are removed
210        self._deleteTrustRootFiles()
211       
212        creds = self.clnt.getDelegation(thisSection['username'], passphrase,
213                                        bootstrap=True)
214        print "proxy credentials:" 
215        print ''.join(creds)
216             
217    def test07Destroy(self):
[6827]218        # destroy credentials for a given user
[6918]219        thisSection = self.cfg['test07Destroy']
[4637]220       
[6840]221        sslKeyFilePassphrase = thisSection.get('sslKeyFilePassphrase')
222        if sslKeyFilePassphrase is None:
[6918]223            sslKeyFilePassphrase = getpass("\ntest07Destroy credential owner "
224                                           "passphrase: ")
[1861]225
[4637]226        self.clnt.destroy(thisSection['username'], 
[6840]227                      sslCertFile=path.expandvars(thisSection['sslCertFile']),
228                      sslKeyFile=path.expandvars(thisSection['sslKeyFile']),
229                      sslKeyFilePassphrase=sslKeyFilePassphrase)
[4637]230        print("Destroy creds for user %s" % thisSection['username'])
231       
232       
[4647]233from myproxy.utils.openssl import OpenSSLConfigError
[4637]234
235class MyProxyClientInterfaceTestCase(_MyProxyClientTestCase):
236    '''Test interface for correct getting/setting of attributes'''
[6847]237    HOSTCERT_FILENAME = 'localhost.crt'
238    HOSTCERT_FILEPATH = mkPath(HOSTCERT_FILENAME)
239    HOSTCERT_DN = '/C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost'
[4637]240   
241    def test01EnvironmentVarsSet(self):
242
[1861]243        try:
[4637]244            environBackup = os.environ.copy()
245           
246            os.environ['MYPROXY_SERVER'] = 'localhost.domain'
247            os.environ['MYPROXY_SERVER_DN'] = '/O=NDG/OU=Raphael/CN=raphael'
248            os.environ['MYPROXY_SERVER_PORT'] = '20000'
[6840]249            client = MyProxyClient(openSSLConfFilePath=mkPath('openssl.conf'),
[4637]250                                   proxyCertMaxLifetime=60000,
251                                   proxyCertLifetime=30000,           
[6918]252                                   caCertDir=mkPath(''))
[4637]253           
254            self.assert_(client.port == 20000)
255            self.assert_(client.hostname == 'localhost.domain')
256            self.assert_(client.serverDN == '/O=NDG/OU=Raphael/CN=raphael')
257            self.assert_(client.proxyCertMaxLifetime == 60000)
258            self.assert_(client.proxyCertLifetime == 30000)
259            self.assert_(client.openSSLConfFilePath == mkPath('openssl.conf'))
[6918]260            self.assert_(client.caCertDir == mkPath(''))
[4637]261        finally:
[6840]262            os.environ = environBackup       
[4637]263
264    def test02SetProperties(self):
[1858]265       
[4637]266        client = MyProxyClient()
267        try:
268            client.port = None
269            self.fail("Expecting AttributeError raised from port set to "
270                      "invalid type")
[6847]271        except TypeError:
[4637]272            pass
273
274        client.port = 8000
275        client.hostname = '127.0.0.1'
276        client.serverDN = '/O=NDG/OU=BADC/CN=raphael'
277        client.proxyCertMaxLifetime = 80000
278        client.proxyCertLifetime = 70000
279       
280        try:
281            client.openSSLConfFilePath = mkPath('ssl.cnf')
282            self.fail("Expecting OpenSSLConfigError raised for invalid file "
283                      "'ssl.cnf'")
284        except OpenSSLConfigError:
285            pass
286       
287        client.caCertDir = mkPath('/etc/grid-security/certificates') 
288       
289        self.assert_(client.port == 8000)
290        self.assert_(client.hostname == '127.0.0.1')
291        self.assert_(client.serverDN == '/O=NDG/OU=BADC/CN=raphael')
292        self.assert_(client.proxyCertMaxLifetime == 80000)
293        self.assert_(client.proxyCertLifetime == 70000)
294        self.assert_(client.openSSLConfFilePath == mkPath('ssl.cnf'))
295        self.assert_(
296                client.caCertDir == mkPath('/etc/grid-security/certificates')) 
[6847]297 
298    def test03SSLVerification(self):
299        # SSL verification callback
300       
301        # Ensure no relevant environment variables are set which might affect
302        # the result
303        try:
304            serverDN = os.environ.get(
305                                    MyProxyClient.MYPROXY_SERVER_DN_ENVVARNAME)
306            if serverDN is not None:
307                del os.environ[MyProxyClient.MYPROXY_SERVER_DN_ENVVARNAME]
308       
309            client = MyProxyClient()
310           
311            connection = None
312            errorStatus = 1
313            successStatus = 0
314            errorDepth = 0
315            peerCertStr = open(self.__class__.HOSTCERT_FILEPATH).read()
316            peerCert = crypto.load_certificate(crypto.FILETYPE_PEM, peerCertStr)
317           
318            args = (connection, peerCert, errorStatus, errorDepth, 
319                    successStatus)
320           
321            # This would normally called implicitly during the SSL handshake
322            status = client.serverSSLCertVerify(*args)
323            self.assert_(status == errorStatus)
324           
325            # Won't match because the default 'host/' CN prefix is set
326            client.hostname = 'localhost'
327            status = client.serverSSLCertVerify(*args)
328            self.assert_(status == errorStatus)
329           
330            # Should now match
331            client.serverCNPrefix = ''
332            status = client.serverSSLCertVerify(*args)
333            self.assert_(status == successStatus)
334           
335            # Match based on full DN instead - this takes precedence over
336            # hostname match
337            client.serverDN = self.__class__.HOSTCERT_DN
338            status = client.serverSSLCertVerify(*args)
339            self.assert_(status == successStatus)
340           
341        finally:
342            if serverDN:
343                os.environ[MyProxyClient.MYPROXY_SERVER_DN_ENVVARNAME
344                           ] = serverDN
[1858]345                                       
346if __name__ == "__main__":
347    unittest.main()
Note: See TracBrowser for help on using the repository browser.