source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/authz/xacml/test_saml_pip.py @ 7698

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/authz/xacml/test_saml_pip.py@7698
Revision 7698, 7.0 KB checked in by pjkersha, 10 years ago (diff)

Integrated SAML ESGF Group/Role? attribute value type into SAML Attribute Authority client unit tests.

Line 
1"""Unit tests for XACML Policy Information Point with SAML interface to
2Attribute Authority
3
4"""
5__author__ = "P J Kershaw"
6__date__ = "11/08/10"
7__copyright__ = "(C) 2010 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id:$'
11import logging
12logging.basicConfig(level=logging.DEBUG)
13log = logging.getLogger(__name__)
14
15from os import path
16import unittest
17
18from urllib2 import URLError
19
20from ndg.xacml.core.attributedesignator import SubjectAttributeDesignator
21from ndg.xacml.core.attribute import Attribute
22from ndg.xacml.core.attributevalue import AttributeValueClassFactory
23from ndg.xacml.core.context.request import Request
24from ndg.xacml.core.context.subject import Subject
25
26from ndg.saml.saml2.core import Issuer as SamlIssuer
27
28from ndg.security.test.unit import BaseTestCase
29from ndg.security.server.xacml.pip.saml_pip import PIP
30
31
32class SamlPipTestCase(BaseTestCase):
33    """Test XACML Policy Information Point.  This PIP has a SAML interface to
34    query a remote attribute authority for attributes
35    """
36    THIS_DIR = path.abspath(path.dirname(__file__))
37    MAPPING_FILENAME = "pip-mapping.txt"
38    MAPPING_FILEPATH = path.join(THIS_DIR, MAPPING_FILENAME)
39    CONFIG_FILENAME = 'saml_pip.cfg'
40    CONFIG_FILEPATH = path.join(THIS_DIR, CONFIG_FILENAME)
41   
42    NDGS_ATTR_ID = BaseTestCase.ATTRIBUTE_NAMES[0]
43    OPENID_ATTR_ID = 'urn:esg:openid'
44   
45    CLNT_CERT_FILEPATH = path.join(BaseTestCase.PKI_DIR, 'localhost.crt')
46    CLNT_PRIKEY_FILEPATH = path.join(BaseTestCase.PKI_DIR, 'localhost.key')
47                                   
48    attributeValueClassFactory = AttributeValueClassFactory()
49           
50    def test01CreateAndCheckAttributes(self):
51        pip = PIP()
52        self.assert_(pip)
53        self.assert_(pip.mappingFilePath is None)
54        try:
55            pip.attribute2AttributeAuthorityMap = {}
56            self.fail("pip.attribute2AttributeAuthorityMap should be read-only")
57        except AttributeError:
58            pass
59       
60        setattr(pip, 'sessionCacheDataDir', 'My data dir')
61        self.assert_(pip.sessionCacheDataDir == 'My data dir')
62        self.assert_(pip.sessionCacheTimeout is None)
63       
64        try:
65            pip.sessionCacheTimeout = {}
66            self.fail("pip.sessionCacheTimeout accepts only float/int/long/"
67                      "string or None type value")
68        except TypeError:
69            pass
70       
71        pip.sessionCacheTimeout = 86400L
72        self.assert_(pip.sessionCacheTimeout == 86400L)
73
74    def test02ReadMappingFile(self):
75        pip = PIP()
76        pip.mappingFilePath = self.__class__.MAPPING_FILEPATH
77        pip.readMappingFile()
78        self.assert_(len(pip.attribute2AttributeAuthorityMap.keys()) > 0)
79        self.assert_(self.__class__.NDGS_ATTR_ID in
80                     pip.attribute2AttributeAuthorityMap)
81        print(pip.attribute2AttributeAuthorityMap)
82       
83    @classmethod
84    def _createXacmlRequestCtx(cls):
85        """Helper to create a XACML request context"""
86        ctx = Request()
87       
88        ctx.subjects.append(Subject())
89        openidAttr = Attribute()
90        ctx.subjects[-1].attributes.append(openidAttr)
91        openidAttr.attributeId = cls.OPENID_ATTR_ID
92        openidAttr.dataType = 'http://www.w3.org/2001/XMLSchema#anyURI'
93       
94        anyUriAttrValue = cls.attributeValueClassFactory(openidAttr.dataType)
95       
96        openidAttrVal = anyUriAttrValue(cls.OPENID_URI)
97        openidAttr.attributeValues.append(openidAttrVal) 
98       
99        return ctx
100   
101    @classmethod
102    def _createPIP(cls):   
103        """Create PIP from test attribute settings"""             
104        pip = PIP()
105        pip.mappingFilePath = cls.MAPPING_FILEPATH
106        pip.readMappingFile()
107        pip.subjectAttributeId = cls.OPENID_ATTR_ID
108       
109        pip.attributeQueryBinding.issuerName = \
110                                            'O=NDG, OU=Security, CN=localhost'
111        pip.attributeQueryBinding.issuerFormat = SamlIssuer.X509_SUBJECT
112        pip.attributeQueryBinding.sslCertFilePath = cls.CLNT_CERT_FILEPATH
113        pip.attributeQueryBinding.sslPriKeyFilePath = cls.CLNT_PRIKEY_FILEPATH
114           
115        pip.attributeQueryBinding.sslCACertDir = cls.CACERT_DIR
116       
117        return pip
118
119    @classmethod
120    def _createSubjectAttributeDesignator(cls):
121        '''Make attribute designator - in practice this would be passed back
122        from the PDP via the context handler
123        '''
124        designator = SubjectAttributeDesignator()
125        designator.attributeId = cls.NDGS_ATTR_ID
126        designator.dataType = 'http://www.w3.org/2001/XMLSchema#string'
127       
128        stringAttrValue = cls.attributeValueClassFactory(
129                                    'http://www.w3.org/2001/XMLSchema#string')
130       
131        return designator
132   
133    @classmethod
134    def _initQuery(cls):
135        '''Convenience method to set-up the parameters needed for a query'''
136        pip = cls._createPIP()
137        designator = cls._createSubjectAttributeDesignator()
138        ctx = cls._createXacmlRequestCtx()
139        return pip, designator, ctx
140   
141    def test03Query(self):
142        self.startSiteAAttributeAuthority(withSSL=True, 
143                    port=self.__class__.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
144       
145        pip, designator, ctx = self.__class__._initQuery()
146       
147        # Avoid caching to avoid impacting other tests in this class
148        pip.cacheSessions = False
149       
150        attributeValues = pip.attributeQuery(ctx, designator)
151        self.assert_(len(attributeValues) > 0)
152        print("PIP retrieved attribute values %r" % attributeValues)
153       
154        self.stopAllServices()
155       
156    def test04InitFromConfigFile(self):
157        # Initialise from settings in a config file
158        pip = PIP.fromConfig(self.__class__.CONFIG_FILEPATH)
159        self.assert_(pip.mappingFilePath)
160        self.assert_(pip.sessionCacheTimeout == 1800)
161       
162# TODO: fix test - left out for now because can't get threading to correctly
163# close down the Attribute Authority thread.
164#    def test05SessionCaching(self):
165#        self.startSiteAAttributeAuthority(withSSL=True,
166#                    port=self.__class__.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM)
167#       
168#        pipA, designator, ctx = self._initQuery()
169#        attributeValuesA = pipA.attributeQuery(ctx, designator)
170#       
171#        pipB = self._createPIP()
172#        pipB.cacheSessions = False
173#       
174#        attributeValuesB = pipB.attributeQuery(ctx, designator)
175#       
176#        self.stopAllServices()
177#       
178#        attributeValuesA2 = pipA.attributeQuery(ctx, designator)
179#        self.assert_(len(attributeValuesA2) > 0)
180#       
181#        try:
182#            attributeValuesB2 = pipB.attributeQuery(ctx, designator)
183#            self.fail("Expected URLError exception for call with no-caching "
184#                      "set")
185#        except URLError, e:
186#            print("Pass: expected %r error for call with no-caching set" % e)
187       
188       
189       
190if __name__ == "__main__":
191    unittest.main()
Note: See TracBrowser for help on using the repository browser.