source: TI12-security/branches/ndg-security-1.5.x/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py @ 6672

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/branches/ndg-security-1.5.x/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py@6672
Revision 6672, 15.8 KB checked in by pjkersha, 11 years ago (diff)

Patched ndg.security.common.AttCert? so that it uses a proxy to ndg.security.common.XMLSec.XMLSecDoc for Python versions >= 2.5.5. This is to allow for PyXML incompatibility with later versions of Python. Disabling XMLSecDoc means that Attribute Certificates are not signed but the NDG Attribute Certificates are no longer used. SAML assertions take their place. NDG AC functionality will be deleted from the trunk.

Line 
1#!/usr/bin/env python
2"""Unit tests for Credential Wallet classes
3
4NERC DataGrid Project
5"""
6__author__ = "P J Kershaw"
7__date__ = "03/10/08"
8__copyright__ = "(C) 2009 Science and Technology Facilities Council"
9__license__ = "BSD - see LICENSE file in top-level directory"
10__contact__ = "Philip.Kershaw@stfc.ac.uk"
11__revision__ = '$Id: $'
12
13import unittest
14import os, sys, getpass, re
15import traceback
16
17from string import Template
18from cStringIO import StringIO
19import cPickle as pickle
20
21from elementtree import ElementTree
22
23from time import sleep
24from datetime import datetime, timedelta
25from saml.utils import SAMLDateTime
26from saml.xml.etree import AssertionElementTree
27
28from ndg.security.test.unit import BaseTestCase
29
30from ndg.security.common.utils.configfileparsers import (
31                                                    CaseSensitiveConfigParser)
32from ndg.security.common.utils.etree import prettyPrint
33from ndg.security.common.X509 import X509CertParse
34from ndg.security.common.credentialwallet import (NDGCredentialWallet, 
35    CredentialWalletAttributeRequestDenied, SAMLCredentialWallet)
36from ndg.security.server.attributeauthority import AttributeAuthority
37
38from os.path import expandvars as xpdVars
39from os.path import join as jnPath
40mkPath = lambda file: jnPath(os.environ['NDGSEC_CREDWALLET_UNITTEST_DIR'], file)
41
42import logging
43logging.basicConfig(level=logging.DEBUG)
44
45
46class NDGCredentialWalletTestCase(BaseTestCase):
47    """Unit test case for
48    ndg.security.common.credentialwallet.NDGCredentialWallet class.
49    """
50    THIS_DIR = os.path.dirname(__file__)
51    PICKLE_FILENAME = 'NDGCredentialWalletPickle.dat'
52    PICKLE_FILEPATH = os.path.join(THIS_DIR, PICKLE_FILENAME)
53
54    def __init__(self, *arg, **kw):
55        super(NDGCredentialWalletTestCase, self).__init__(*arg, **kw)
56        self.startAttributeAuthorities()
57   
58    def setUp(self):
59        super(NDGCredentialWalletTestCase, self).setUp()
60       
61        if 'NDGSEC_INT_DEBUG' in os.environ:
62            import pdb
63            pdb.set_trace()
64       
65        if 'NDGSEC_CREDWALLET_UNITTEST_DIR' not in os.environ:
66            os.environ['NDGSEC_CREDWALLET_UNITTEST_DIR'] = \
67                os.path.abspath(os.path.dirname(__file__))
68       
69        self.cfg = CaseSensitiveConfigParser()
70        configFilePath = jnPath(os.environ['NDGSEC_CREDWALLET_UNITTEST_DIR'],
71                                "credWalletTest.cfg")
72        self.cfg.read(configFilePath)
73
74        self.userX509CertFilePath=self.cfg.get('setUp', 'userX509CertFilePath')
75        self.userPriKeyFilePath=self.cfg.get('setUp', 'userPriKeyFilePath')
76       
77
78    def test01ReadOnlyClassVariables(self):
79       
80        try:
81            NDGCredentialWallet.accessDenied = 'yes'
82            self.fail("accessDenied class variable should be read-only")
83        except Exception, e:
84            print("PASS - accessDenied class variable is read-only")
85
86        try:
87            NDGCredentialWallet.accessGranted = False
88            self.fail("accessGranted class variable should be read-only")
89        except Exception, e:
90            print("PASS - accessGranted class variable is read-only")
91           
92        assert(not NDGCredentialWallet.accessDenied)
93        assert(NDGCredentialWallet.accessGranted)
94       
95       
96    def test02SetAttributes(self):
97       
98        credWallet = NDGCredentialWallet()
99        credWallet.userX509Cert=open(xpdVars(self.userX509CertFilePath)).read()
100        print("userX509Cert=%s" % credWallet.userX509Cert)
101        credWallet.userId = 'ndg-user'
102        print("userId=%s" % credWallet.userId)
103       
104        try:
105            credWallet.blah = 'blah blah'
106            self.fail("Attempting to set attribute not in __slots__ class "
107                      "variable should fail")
108        except AttributeError:
109            print("PASS - expected AttributeError when setting attribute "
110                  "not in __slots__ class variable")
111           
112        credWallet.caCertFilePathList=None
113        credWallet.attributeAuthorityURI='http://localhost/AttributeAuthority'
114           
115        credWallet.attributeAuthority = None
116        credWallet._credentialRepository = None
117        credWallet.mapFromTrustedHosts = False
118        credWallet.rtnExtAttCertList = True
119        credWallet.attCertRefreshElapse = 7200
120     
121           
122    def test03GetAttCertWithUserId(self):
123                   
124        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
125                                                          'cfgFilePath'))
126        attCert = credWallet.getAttCert()
127       
128        # No user X.509 cert is set so the resulting Attribute Certificate
129        # user ID should be the same as that set for the wallet
130        assert(attCert.userId == credWallet.userId)
131        print("Attribute Certificate:\n%s" % attCert)
132       
133    def test04GetAttCertWithUserX509Cert(self):
134                   
135        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
136                                                          'cfgFilePath'))
137       
138        # Set a test individual user certificate to override the client
139        # cert. and private key in WS-Security settings in the config file
140        credWallet.userX509Cert=open(xpdVars(self.userX509CertFilePath)).read()
141        credWallet.userPriKey=open(xpdVars(self.userPriKeyFilePath)).read()
142        attCert = credWallet.getAttCert()
143       
144        # A user X.509 cert. was set so this cert's DN should be set in the
145        # userId field of the resulting Attribute Certificate
146        assert(attCert.userId == str(credWallet.userX509Cert.dn))
147        print("Attribute Certificate:\n%s" % attCert)
148
149    def test05GetAttCertRefusedWithUserX509Cert(self):
150       
151        # Keyword mapFromTrustedHosts overrides any setting in the config file
152        # This flag prevents role mapping from a trusted AA and so in this case
153        # forces refusal of the request
154        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
155                                                          'cfgFilePath'),
156                                         mapFromTrustedHosts=False)   
157        credWallet.userX509CertFilePath = self.userX509CertFilePath
158        credWallet.userPriKeyFilePath = self.userPriKeyFilePath
159       
160        # Set AA URI AFTER user PKI settings so that these are picked in the
161        # implicit call to create a new AA Client when the URI is set
162        credWallet.attributeAuthorityURI = self.cfg.get('setUp', 
163                                                        'attributeAuthorityURI')
164        try:
165            attCert = credWallet.getAttCert()
166        except CredentialWalletAttributeRequestDenied, e:
167            print("ok - obtained expected result: %s" % e)
168            return
169       
170        self.fail("Request allowed from Attribute Authority where user is NOT "
171                  "registered!")
172
173    def test06GetMappedAttCertWithUserId(self):
174       
175        # Call Site A Attribute Authority where user is registered
176        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
177                                                          'cfgFilePath'))
178        attCert = credWallet.getAttCert()
179
180        # Use Attribute Certificate cached in wallet to get a mapped
181        # Attribute Certificate from Site B's Attribute Authority
182        siteBURI = self.cfg.get('setUp', 'attributeAuthorityURI')       
183        attCert = credWallet.getAttCert(attributeAuthorityURI=siteBURI)
184           
185        print("Mapped Attribute Certificate from Site B Attribute "
186              "Authority:\n%s" % attCert)
187                       
188    def test07GetAttCertFromLocalAAInstance(self):
189        thisSection = 'test07GetAttCertFromLocalAAInstance'
190        aaPropFilePath = self.cfg.get(thisSection,
191                                      'attributeAuthorityPropFilePath') 
192                 
193        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
194                                                          'cfgFilePath'))
195        credWallet.attributeAuthority = AttributeAuthority.fromPropertyFile(
196                                            propFilePath=aaPropFilePath)
197        attCert = credWallet.getAttCert()
198       
199        # No user X.509 cert is set so the resulting Attribute Certificate
200        # user ID should be the same as that set for the wallet
201        assert(attCert.userId == credWallet.userId)
202
203    def test08Pickle(self):
204        credWallet = NDGCredentialWallet(cfg=self.cfg.get('setUp', 
205                                                          'cfgFilePath'))
206
207        outFile = open(NDGCredentialWalletTestCase.PICKLE_FILEPATH, 'w')
208        pickle.dump(credWallet, outFile)
209        outFile.close()
210       
211        inFile = open(NDGCredentialWalletTestCase.PICKLE_FILEPATH)
212        unpickledCredWallet = pickle.load(inFile)
213        self.assert_(unpickledCredWallet.userId == credWallet.userId)
214       
215
216class SAMLCredentialWalletTestCase(BaseTestCase):
217    THIS_DIR = os.path.dirname(__file__)
218    CONFIG_FILENAME = 'test_samlcredentialwallet.cfg'
219    CONFIG_FILEPATH = os.path.join(THIS_DIR, CONFIG_FILENAME)
220    PICKLE_FILENAME = 'SAMLCredentialWalletPickle.dat'
221    PICKLE_FILEPATH = os.path.join(THIS_DIR, PICKLE_FILENAME)
222   
223    ASSERTION_STR = (
224"""<saml:Assertion ID="192c67d9-f9cd-457a-9242-999e7b943166" IssueInstant="$timeNow" Version="2.0" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">
225   <saml:Issuer Format="urn:esg:issuer">$issuerName</saml:Issuer>
226   <saml:Subject>
227      <saml:NameID Format="urn:esg:openid">https://esg.prototype.ucar.edu/myopenid/testUser</saml:NameID>
228   </saml:Subject>
229   <saml:Conditions NotBefore="$timeNow" NotOnOrAfter="$timeExpires" />
230   <saml:AttributeStatement>
231      <saml:Attribute FriendlyName="FirstName" Name="urn:esg:first:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
232         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">Test</saml:AttributeValue>
233      </saml:Attribute>
234      <saml:Attribute FriendlyName="LastName" Name="urn:esg:last:name" NameFormat="http://www.w3.org/2001/XMLSchema#string">
235         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">User</saml:AttributeValue>
236      </saml:Attribute>
237      <saml:Attribute FriendlyName="EmailAddress" Name="urn:esg:first:email:address" NameFormat="http://www.w3.org/2001/XMLSchema#string">
238         <saml:AttributeValue xsi:type="xs:string" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">test@sitea.ac.uk</saml:AttributeValue>
239      </saml:Attribute>
240   </saml:AttributeStatement>
241</saml:Assertion>
242"""
243    )
244   
245    def __init__(self, *arg, **kw):
246        super(SAMLCredentialWalletTestCase, self).__init__(*arg, **kw)
247       
248    def setUp(self):
249        self.assertion = self._createAssertion()
250       
251    def _createAssertion(self, timeNow=None, validityDuration=60*60*8,
252                         issuerName=BaseTestCase.SITEA_SAML_ISSUER_NAME):
253        if timeNow is None:
254            timeNow = datetime.utcnow()
255           
256        timeExpires = timeNow + timedelta(seconds=validityDuration)
257        assertionStr = Template(
258            SAMLCredentialWalletTestCase.ASSERTION_STR).substitute(
259                dict(
260                 issuerName=issuerName,
261                 timeNow=SAMLDateTime.toString(timeNow), 
262                 timeExpires=SAMLDateTime.toString(timeExpires)
263                )
264            )
265
266        assertionStream = StringIO()
267        assertionStream.write(assertionStr)
268        assertionStream.seek(0)
269        assertionElem = ElementTree.parse(assertionStream).getroot()
270        return AssertionElementTree.fromXML(assertionElem)
271
272    def _addCredential(self):
273        wallet = SAMLCredentialWallet()   
274        wallet.addCredential(
275            self.assertion, 
276            attributeAuthorityURI=\
277                SAMLCredentialWalletTestCase.SITEA_ATTRIBUTEAUTHORITY_SAML_URI)
278        return wallet
279   
280    def test01AddCredential(self):
281        wallet = self._addCredential()
282       
283        self.assert_(len(wallet.credentials) == 1)
284        self.assert_(
285            SAMLCredentialWalletTestCase.SITEA_ATTRIBUTEAUTHORITY_SAML_URI in \
286            wallet.credentialsKeyedByURI)
287        self.assert_(SAMLCredentialWalletTestCase.SITEA_SAML_ISSUER_NAME in \
288                     wallet.credentials)
289       
290        assertion = wallet.credentials[
291            SAMLCredentialWalletTestCase.SITEA_SAML_ISSUER_NAME
292        ].credential
293       
294        print("SAML Assertion:\n%s" % 
295              prettyPrint(AssertionElementTree.toXML(assertion)))
296   
297    def test02VerifyCredential(self):
298        wallet = SAMLCredentialWallet()
299        self.assert_(wallet.isValidCredential(self.assertion))
300       
301        expiredAssertion = self._createAssertion(
302                                timeNow=datetime.utcnow() - timedelta(hours=24))
303                               
304        self.assert_(not wallet.isValidCredential(expiredAssertion))
305       
306        futureAssertion = self._createAssertion(
307                                timeNow=datetime.utcnow() + timedelta(hours=24))
308
309        self.assert_(not wallet.isValidCredential(futureAssertion))
310       
311    def test03AuditCredential(self):
312        # Add a short lived credential and ensure it's removed when an audit
313        # is carried to prune expired credentials
314        shortExpiryAssertion = self._createAssertion(validityDuration=1)
315        wallet = SAMLCredentialWallet()
316        wallet.addCredential(shortExpiryAssertion)
317       
318        self.assert_(len(wallet.credentials) == 1)
319        sleep(2)
320        wallet.audit()
321        self.assert_(len(wallet.credentials) == 0)
322
323    def test04ClockSkewTolerance(self):
324        # Add a short lived credential but with the wallet set to allow for
325        # a clock skew of
326        shortExpiryAssertion = self._createAssertion(validityDuration=1)
327        wallet = SAMLCredentialWallet()
328       
329        # Set a tolerance of five seconds
330        wallet.clockSkewTolerance = 5.*60*60
331        wallet.addCredential(shortExpiryAssertion)
332       
333        self.assert_(len(wallet.credentials) == 1)
334        sleep(2)
335        wallet.audit()
336        self.assert_(len(wallet.credentials) == 1)
337       
338    def test05ReplaceCredential(self):
339        # Replace an existing credential from a given institution with a more
340        # up to date one
341        wallet = self._addCredential()
342        self.assert_(len(wallet.credentials) == 1)
343       
344        newAssertion = self._createAssertion() 
345
346        wallet.addCredential(newAssertion)
347        self.assert_(len(wallet.credentials) == 1)
348        self.assert_(newAssertion.conditions.notOnOrAfter == \
349                     wallet.credentials[
350                        SAMLCredentialWalletTestCase.SITEA_SAML_ISSUER_NAME
351                    ].credential.conditions.notOnOrAfter)
352       
353    def test06CredentialsFromSeparateSites(self):
354        wallet = self._addCredential()
355        wallet.addCredential(self._createAssertion(issuerName="MySite"))
356        self.assert_(len(wallet.credentials) == 2)
357
358    def test07Pickle(self):
359        wallet = self._addCredential()
360        outFile = open(SAMLCredentialWalletTestCase.PICKLE_FILEPATH, 'w')
361        pickle.dump(wallet, outFile)
362        outFile.close()
363       
364        inFile = open(SAMLCredentialWalletTestCase.PICKLE_FILEPATH)
365        unpickledWallet = pickle.load(inFile)
366        self.assert_(unpickledWallet.credentialsKeyedByURI.get(
367            SAMLCredentialWalletTestCase.SITEA_ATTRIBUTEAUTHORITY_SAML_URI))
368       
369        self.assert_(unpickledWallet.credentials.items()[0][1].issuerName == \
370                     BaseTestCase.SITEA_SAML_ISSUER_NAME)
371
372    def test08CreateFromConfig(self):
373        wallet = SAMLCredentialWallet.fromConfig(
374                                SAMLCredentialWalletTestCase.CONFIG_FILEPATH)
375        self.assert_(wallet.clockSkewTolerance == timedelta(seconds=0.01))
376        self.assert_(wallet.userId == 'https://openid.localhost/philip.kershaw')
377       
378if __name__ == "__main__":
379    unittest.main()       
Note: See TracBrowser for help on using the repository browser.