source: TI12-security/branches/MyProxyClient-pyopenssl/myproxy/test/test_myproxyclient.py @ 6835

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/branches/MyProxyClient-pyopenssl/myproxy/test/test_myproxyclient.py@6835
Revision 6835, 10.7 KB checked in by pjkersha, 11 years ago (diff)

Completed port to PyOpenSSL from M2Crypto with all unit tests passed. Merge back into the main trunk as next step.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
Line 
1#!/usr/bin/env python
2"""MyProxy Client unit tests
3
4NERC Data Grid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "02/07/07"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = """BSD- See LICENSE file in top-level directory"""
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id$'
12import logging
13logging.basicConfig(level=logging.DEBUG)
14
15import unittest
16import os
17import sys
18import getpass
19import traceback
20from os import path
21
22from OpenSSL import crypto
23
24from myproxy.client import CaseSensitiveConfigParser, MyProxyClient
25
26mkPath = lambda file: path.join(os.environ['MYPROXYCLIENT_UNITTEST_DIR'], file)
27
28class _MyProxyClientTestCase(unittest.TestCase):
29    '''Base implements environment settings common to all test case classes'''
30    if 'NDGSEC_INT_DEBUG' in os.environ:
31        import pdb
32        pdb.set_trace()
33   
34    if 'MYPROXYCLIENT_UNITTEST_DIR' not in os.environ:
35        os.environ['MYPROXYCLIENT_UNITTEST_DIR'] = \
36                                        path.abspath(path.dirname(__file__))
37
38
39class MyProxyClientLiveTestCase(_MyProxyClientTestCase):
40    '''Tests require a connection to a real MyProxy service running on a host
41    '''
42    CONFIG_FILENAME = "myProxyClientTest.cfg"
43   
44    def setUp(self):
45       
46        super(MyProxyClientLiveTestCase, self).setUp()
47       
48        configParser = CaseSensitiveConfigParser()
49        configFilePath = path.join(os.environ['MYPROXYCLIENT_UNITTEST_DIR'],
50                                   MyProxyClientLiveTestCase.CONFIG_FILENAME)
51        configParser.read(configFilePath)
52       
53        self.cfg = {}
54        for section in configParser.sections():
55            self.cfg[section] = dict(configParser.items(section))
56       
57        configFilePath = path.expandvars(self.cfg['setUp']['cfgFilePath'])
58        self.clnt = MyProxyClient(cfgFilePath=configFilePath)
59       
60    def test01Store(self):
61        # upload X509 cert and private key to repository
62        thisSection = self.cfg['test01Store']
63       
64        passphrase = thisSection.get('passphrase')
65        if passphrase is None:
66            passphrase = getpass.getpass(prompt="\ntest1Store credential "
67                                                "pass-phrase: ")
68           
69        ownerPassphrase = thisSection.get('ownerPassphrase')
70        if ownerPassphrase is None:
71            ownerPassphrase = getpass.getpass(prompt="\ntest1Store credential "
72                                                     " owner pass-phrase: ")
73
74        certFile = path.expandvars(thisSection['signingCertFilePath'])
75        keyFile = path.expandvars(thisSection['signingPriKeyFilePath'])
76        ownerCertFile = path.expandvars(thisSection['ownerCertFile'])
77        ownerKeyFile = path.expandvars(thisSection['ownerKeyFile'])
78           
79        self.clnt.store(thisSection['username'],
80                        passphrase,
81                        certFile,
82                        keyFile,
83                        ownerCertFile=ownerCertFile,
84                        ownerKeyFile=ownerKeyFile,
85                        ownerPassphrase=ownerPassphrase,
86                        force=False)
87        print("Store creds for user %s" % thisSection['username'])
88   
89    def test02GetDelegation(self):
90        # retrieve proxy cert./private key
91        thisSection = self.cfg['test02GetDelegation']
92       
93        passphrase = thisSection.get('passphrase')
94        if passphrase is None:
95            passphrase = getpass.getpass(prompt="\ntest2GetDelegation "
96                                                "passphrase: ")
97         
98        proxyCertFile = path.expandvars(thisSection['proxyCertFileOut'])
99        proxyKeyFile = path.expandvars(thisSection['proxyKeyFileOut'])
100
101        creds = self.clnt.getDelegation(thisSection['username'], passphrase)
102        print "proxy credentials:" 
103        print ''.join(creds)
104        open(proxyCertFile, 'w').write(creds[0]+''.join(creds[2:]))
105        open(proxyKeyFile, 'w').write(creds[1])
106
107    def test03Info(self):
108        # Retrieve information about a given credential
109        thisSection = self.cfg['test03Info']
110       
111        # ownerPassphrase can be omitted from the congif file in which case
112        # the get call below would return None
113        ownerPassphrase = thisSection.get('ownerPassphrase')
114        if ownerPassphrase is None:
115            ownerPassphrase = getpass.getpass(prompt="\ntest3Info owner "
116                                              "credentials passphrase: ")
117
118        credExists, errorTxt, fields = self.clnt.info(
119                                 thisSection['username'],
120                                 path.expandvars(thisSection['ownerCertFile']),
121                                 path.expandvars(thisSection['ownerKeyFile']),
122                                 ownerPassphrase=ownerPassphrase)
123        print "test3Info... "
124        print "credExists: %s" % credExists
125        print "errorTxt: " + errorTxt
126        print "fields: %s" % fields
127
128    def test04ChangePassphrase(self):       
129        # change pass-phrase protecting a given credential
130        thisSection = self.cfg['test04ChangePassphrase']
131       
132        passphrase = thisSection.get('passphrase')
133        if passphrase is None:
134            passphrase = getpass.getpass(prompt="test4ChangePassphrase - "
135                                                "passphrase: ")
136       
137        newPassphrase = thisSection.get('newPassphrase')
138        if newPassphrase is None:
139            newPassphrase = getpass.getpass(prompt="test4ChangePassphrase "
140                                                   "- new passphrase: ")
141
142            confirmNewPassphrase = getpass.getpass(prompt=\
143                                                   "test4ChangePassphrase "
144                                                   "- confirm new "
145                                                   "passphrase: ")
146
147            if newPassphrase != confirmNewPassphrase:
148                self.fail("New and confirmed new password don't match")
149               
150        ownerPassphrase = thisSection.get('ownerPassphrase') or passphrase
151
152        self.clnt.changePassphrase(thisSection['username'],
153                               passphrase,
154                               newPassphrase, 
155                               path.expandvars(thisSection['ownerCertFile']),
156                               path.expandvars(thisSection['ownerKeyFile']),
157                               ownerPassphrase=ownerPassphrase)
158        print("Changed pass-phrase")
159
160    def test05Destroy(self):
161        # destroy credentials for a given user
162        thisSection = self.cfg['test05Destroy']
163       
164        ownerPassphrase = thisSection.get('ownerPassphrase')
165        if ownerPassphrase is None:
166            ownerPassphrase = getpass.getpass(prompt="\ntest5Destroy "
167                                              "credential owner passphrase: ")
168
169        self.clnt.destroy(thisSection['username'], 
170                  ownerCertFile=path.expandvars(thisSection['ownerCertFile']),
171                  ownerKeyFile=path.expandvars(thisSection['ownerKeyFile']),
172                  ownerPassphrase=ownerPassphrase)
173        print("Destroy creds for user %s" % thisSection['username'])
174       
175    def test06GetTrustRoots(self):
176        # Test get trust roots command
177        trustRoots = self.clnt.getTrustRoots()
178        self.assert_(trustRoots)
179        self.assert_(isinstance(trustRoots, dict))
180        self.assert_(len(trustRoots) > 0)
181        for fileName, fileContents in trustRoots.items():
182            if fileName.endswith('.0'):
183                # test parsing certificate
184                cert = crypto.load_certificate(crypto.FILETYPE_PEM,
185                                               fileContents)
186                self.assert_(cert)
187                self.assert_(isinstance(cert, crypto.X509))
188                subj = cert.get_subject()
189                self.assert_(subj)
190                print("Trust root certificate retrieved with DN=%s" % subj)
191               
192       
193from myproxy.utils.openssl import OpenSSLConfigError
194
195class MyProxyClientInterfaceTestCase(_MyProxyClientTestCase):
196    '''Test interface for correct getting/setting of attributes'''
197   
198    def test01EnvironmentVarsSet(self):
199
200        try:
201            environBackup = os.environ.copy()
202           
203            os.environ['MYPROXY_SERVER'] = 'localhost.domain'
204            os.environ['MYPROXY_SERVER_DN'] = '/O=NDG/OU=Raphael/CN=raphael'
205            os.environ['MYPROXY_SERVER_PORT'] = '20000'
206            client = MyProxyClient(serverCNPrefix='',
207                                   openSSLConfFilePath=mkPath('openssl.conf'),
208                                   proxyCertMaxLifetime=60000,
209                                   proxyCertLifetime=30000,           
210                                   caCertFilePath=mkPath('ndg-test-ca.crt'))
211           
212            self.assert_(client.port == 20000)
213            self.assert_(client.hostname == 'localhost.domain')
214            self.assert_(client.serverDN == '/O=NDG/OU=Raphael/CN=raphael')
215            self.assert_(client.proxyCertMaxLifetime == 60000)
216            self.assert_(client.proxyCertLifetime == 30000)
217            self.assert_(client.openSSLConfFilePath == mkPath('openssl.conf'))
218            self.assert_(client.caCertFilePath == mkPath('ndg-test-ca.crt'))
219        finally:
220            os.environ = environBackup
221           
222
223    def test02SetProperties(self):
224       
225        client = MyProxyClient()
226        try:
227            client.port = None
228            self.fail("Expecting AttributeError raised from port set to "
229                      "invalid type")
230        except AttributeError:
231            pass
232
233        client.port = 8000
234        client.hostname = '127.0.0.1'
235        client.serverDN = '/O=NDG/OU=BADC/CN=raphael'
236        client.proxyCertMaxLifetime = 80000
237        client.proxyCertLifetime = 70000
238       
239        try:
240            client.openSSLConfFilePath = mkPath('ssl.cnf')
241            self.fail("Expecting OpenSSLConfigError raised for invalid file "
242                      "'ssl.cnf'")
243        except OpenSSLConfigError:
244            pass
245       
246        client.caCertFilePath = mkPath('ca.pem') 
247        client.caCertDir = mkPath('/etc/grid-security/certificates') 
248       
249        self.assert_(client.port == 8000)
250        self.assert_(client.hostname == '127.0.0.1')
251        self.assert_(client.serverDN == '/O=NDG/OU=BADC/CN=raphael')
252        self.assert_(client.proxyCertMaxLifetime == 80000)
253        self.assert_(client.proxyCertLifetime == 70000)
254        self.assert_(client.openSSLConfFilePath == mkPath('ssl.cnf'))
255        self.assert_(client.caCertFilePath == mkPath('ca.pem')) 
256        self.assert_(
257                client.caCertDir == mkPath('/etc/grid-security/certificates')) 
258                                       
259if __name__ == "__main__":
260    unittest.main()
Note: See TracBrowser for help on using the repository browser.