source: TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py @ 4120

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/python/ndg.security.test/ndg/security/test/sessionMgr/test.py@4120
Revision 4120, 11.2 KB checked in by pjkersha, 13 years ago (diff)

Fix for ndg.security.test.sessionMgr.test.SessionMgrTestCase?.test1Connect: use asPEM method to convert self.userCert to a string for output to file.

  • Property svn:executable set to *
Line 
1#!/usr/bin/env python
2"""Test harness for NDG Session Manager - makes requests for
3authentication and authorisation.  An Attribute Authority and Simple CA
4services must be running for the reqAuthorisation and addUser tests
5
6NERC Data Grid Project
7"""
8__author__ = "P J Kershaw"
9__date__ = "20/11/07"
10__copyright__ = "(C) 2007 STFC & NERC"
11__license__ = \
12"""This software may be distributed under the terms of the Q Public
13License, version 1.0 or later."""
14__contact__ = "P.J.Kershaw@rl.ac.uk"
15__revision__ = '$Id$'
16
17import unittest
18import os, sys, getpass, re
19from ConfigParser import SafeConfigParser
20import traceback
21
22from ndg.security.common.X509 import X509CertParse
23from ndg.security.server.SessionMgr import *
24from ndg.security.server.MyProxy import MyProxyClient
25
26from os.path import expandvars as xpdVars
27from os.path import join as jnPath
28mkPath = lambda file: jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'], file)
29
30import logging
31logging.basicConfig(level=logging.ERROR)
32
33
34class SessionMgrTestCase(unittest.TestCase):
35    """Unit test case for ndg.security.server.SessionMgr.SessionMgr class.
36   
37    This class manages server side sessions"""
38   
39    test1Passphrase = None
40    test3Passphrase = None
41   
42    def setUp(self):
43       
44        if 'NDGSEC_INT_DEBUG' in os.environ:
45            import pdb
46            pdb.set_trace()
47       
48        if 'NDGSEC_SM_UNITTEST_DIR' not in os.environ:
49            os.environ['NDGSEC_SM_UNITTEST_DIR'] = \
50                os.path.abspath(os.path.dirname(__file__))
51       
52        self.cfg = SafeConfigParser()
53        configFilePath = jnPath(os.environ['NDGSEC_SM_UNITTEST_DIR'],
54                                "sessionMgrTest.cfg")
55        self.cfg.read(configFilePath)
56                   
57        # Initialise the Session Manager client connection
58        # Omit traceFile keyword to leave out SOAP debug info
59        propFilePath = xpdVars(self.cfg.get('setUp', 'propFilePath'))
60        self.sm = SessionMgr(propFilePath=propFilePath)
61       
62                                 
63    def test1Connect(self):
64        """test1Connect: make a new session"""
65       
66        print "\n\t" + self.test1Connect.__doc__
67       
68        username = self.cfg.get('test1Connect', 'username')
69       
70        if SessionMgrTestCase.test1Passphrase is None and \
71           self.cfg.has_option('test1Connect', 'passphrase'):
72            SessionMgrTestCase.test1Passphrase = \
73                                    self.cfg.get('test1Connect', 'passphrase')
74       
75        if not SessionMgrTestCase.test1Passphrase:
76            SessionMgrTestCase.test1Passphrase = getpass.getpass(\
77                prompt="\ntest1Connect pass-phrase for user %s: " % username)
78
79        userCert, self.userPriKey, self.issuingCert, self.sessID = \
80            self.sm.connect(username=username, 
81                            passphrase=SessionMgrTestCase.test1Passphrase)
82        self.userCert = X509CertParse(userCert)
83       
84        print "User '%s' connected to Session Manager:\n%s" % \
85                                                        (username, self.sessID)
86        creds='\n'.join((self.issuingCert or '',
87                         self.userCert.asPEM().strip(),
88                         self.userPriKey))
89        open(mkPath("user.creds"), "w").write(creds)
90   
91           
92    def test2GetSessionStatus(self):
93        """test2GetSessionStatus: check a session is alive"""
94        print "\n\t" + self.test2GetSessionStatus.__doc__
95       
96        self.test1Connect()
97        assert self.sm.getSessionStatus(sessID=self.sessID), "Session is dead"
98        print "User connected to Session Manager with sessID=%s" % self.sessID
99
100        assert not self.sm.getSessionStatus(sessID='abc'), \
101            "sessID=abc shouldn't exist!"
102           
103        print "CORRECT: sessID=abc doesn't exist"
104       
105    def test3ConnectNoCreateServerSess(self):
106        """test3ConnectNoCreateServerSess: Connect as a non browser client -
107        sessID should be None"""
108
109        print "\n\t" + self.test3ConnectNoCreateServerSess.__doc__
110       
111        username = self.cfg.get('test3ConnectNoCreateServerSess', 'username')
112
113        if SessionMgrTestCase.test3Passphrase is None and \
114           self.cfg.has_option('test3ConnectNoCreateServerSess', 
115                               'passphrase'):
116            SessionMgrTestCase.test3Passphrase = \
117                self.cfg.get('test3ConnectNoCreateServerSess', 'passphrase')
118       
119        if not SessionMgrTestCase.test3Passphrase:
120            SessionMgrTestCase.test3Passphrase = getpass.getpass(\
121        prompt="\ntest3ConnectNoCreateServerSess pass-phrase for user %s: " % \
122            username)
123
124        self.userCert, self.userPriKey, self.issuingCert, sessID = \
125            self.sm.connect(username=username, 
126                            passphrase=SessionMgrTestCase.test3Passphrase,
127                            createServerSess=False)
128       
129        # Expect null session ID
130        assert not sessID, "Expecting a null session ID!"
131         
132        print "User '%s' retrieved creds. from Session Manager:\n%s" % \
133                                                    (username, self.userCert)
134           
135
136    def test4DisconnectWithSessID(self):
137        """test4DisconnectWithSessID: disconnect as if acting as a browser client
138        """
139       
140        print "\n\t" + self.test4DisconnectWithSessID.__doc__
141        self.test1Connect()       
142        self.sm.deleteUserSession(sessID=self.sessID)
143       
144        print "User disconnected from Session Manager:\n%s" % self.sessID
145           
146
147    def test5DisconnectWithUserCert(self):
148        """test5DisconnectWithUserCert: Disconnect as a command line client
149        """
150       
151        print "\n\t" + self.test5DisconnectWithUserCert.__doc__
152        self.test1Connect()
153       
154        # Proxy cert in signature determines ID of session to
155        # delete
156        self.sm.deleteUserSession(userCert=self.userCert)
157        print "User disconnected from Session Manager:\n%s" % self.userCert
158
159
160    def test6GetAttCertWithSessID(self):
161        """test6GetAttCertWithSessID: make an attribute request using
162        a session ID as authentication credential"""
163
164        print "\n\t" + self.test6GetAttCertWithSessID.__doc__       
165        self.test1Connect()
166       
167        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
168            sessID=self.sessID, 
169            aaURI=self.cfg.get('test6GetAttCertWithSessID', 'aauri'))
170        if errMsg:
171            self.fail(errMsg)
172           
173        print "Attribute Certificate:\n%s" % attCert
174        attCert.filePath = \
175            xpdVars(self.cfg.get('test6GetAttCertWithSessID', 'acoutfilepath')) 
176        attCert.write()
177       
178        return self.sm
179
180
181    def test6aGetAttCertRefusedWithSessID(self):
182        """test6aGetAttCertRefusedWithSessID: make an attribute request using
183        a sessID as authentication credential requesting an AC from an
184        Attribute Authority where the user is NOT registered"""
185
186        print "\n\t" + self.test6aGetAttCertRefusedWithSessID.__doc__       
187        self.test1Connect()
188       
189        aaURI = self.cfg.get('test6aGetAttCertRefusedWithSessID', 'aauri')
190       
191        attCert, errMsg, extAttCertList = self.sm.getAttCert(sessID=self.sessID, 
192                                         aaURI=aaURI,
193                                         mapFromTrustedHosts=False)
194        if errMsg:
195            print "SUCCESS - obtained expected result: %s" % errMsg
196            return
197       
198        self.fail("Request allowed from AA where user is NOT registered!")
199
200
201    def test6bGetMappedAttCertWithSessID(self):
202        """test6bGetMappedAttCertWithSessID: make an attribute request using
203        a session ID as authentication credential"""
204
205        print "\n\t" + self.test6bGetMappedAttCertWithSessID.__doc__       
206        self.test1Connect()
207       
208        # Attribute Certificate cached in test 6 can be used to get a mapped
209        # AC for this test ...
210        self.sm = self.test6GetAttCertWithSessID()
211
212        aaURI = self.cfg.get('test6bGetMappedAttCertWithSessID', 'aauri')
213       
214        attCert, errMsg, extAttCertList=self.sm.getAttCert(sessID=self.sessID,
215                                                   aaURI=aaURI,
216                                                   mapFromTrustedHosts=True)
217        if errMsg:
218            self.fail(errMsg)
219           
220        print "Attribute Certificate:\n%s" % attCert 
221
222
223    def test6cGetAttCertWithExtAttCertListWithSessID(self):
224        """test6cGetAttCertWithSessID: make an attribute request using
225        a session ID as authentication credential"""
226       
227        print "\n\t" + \
228            self.test6cGetAttCertWithExtAttCertListWithSessID.__doc__       
229        self.test1Connect()
230       
231        aaURI = \
232            self.cfg.get('test6cGetAttCertWithExtAttCertListWithSessID', 'aauri')
233       
234        # Use output from test6GetAttCertWithSessID!
235        extACFilePath = \
236        xpdVars(self.cfg.get('test6cGetAttCertWithExtAttCertListWithSessID', 
237                             'extacfilepath'))   
238        extAttCert = open(extACFilePath).read()
239       
240        attCert, errMsg, extAttCertList = self.sm.getAttCert(
241                                                   sessID=self.sessID, 
242                                                   aaURI=aaURI,
243                                                   extAttCertList=[extAttCert])
244        if errMsg:
245            self.fail(errMsg)
246         
247        print "Attribute Certificate:\n%s" % attCert 
248
249
250    def test7GetAttCertWithUserCert(self):
251        """test7GetAttCertWithUserCert: make an attribute request using
252        a user cert as authentication credential"""
253        print "\n\t" + self.test7GetAttCertWithUserCert.__doc__
254        self.test1Connect()
255
256        # Request an attribute certificate from an Attribute Authority
257        # using the userCert returned from connect()
258       
259        aaURI = self.cfg.get('test7GetAttCertWithUserCert', 'aauri')
260        attCert, errMsg, extAttCertList = self.sm.getAttCert(\
261                                     userCert=self.userCert, aaURI=aaURI)
262        if errMsg:
263            self.fail(errMsg)
264         
265        print "Attribute Certificate:\n%s" % attCert 
266
267
268#_____________________________________________________________________________       
269class SessionMgrTestSuite(unittest.TestSuite):
270   
271    def __init__(self):
272        print "SessionMgrTestSuite ..."
273        smTestCaseMap = map(SessionMgrTestCase,
274                          (
275                            "test1Connect",
276                            "test2GetSessionStatus",
277                            "test3ConnectNoCreateServerSess",
278                            "test4DisconnectWithSessID",
279                            "test5DisconnectWithUserCert",
280                            "test6GetAttCertWithSessID",
281                            "test6bGetMappedAttCertWithSessID",
282                            "test6cGetAttCertWithExtAttCertListWithSessID",
283                            "test7GetAttCertWithUserCert",
284                          ))
285        unittest.TestSuite.__init__(self, smTestCaseMap)
286           
287                                                   
288if __name__ == "__main__":
289#    suite = SessionMgrTestSuite()
290#    unittest.TextTestRunner(verbosity=2).run(suite)
291    unittest.main()       
Note: See TracBrowser for help on using the repository browser.