source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py @ 7327

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py@7327
Revision 7327, 12.0 KB checked in by pjkersha, 11 years ago (diff)

Incomplete - task 2: XACML-Security Integration

  • added unit tests for XACML Context handler
  • Property svn:keywords set to Id
Line 
1"""NDG Security unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "14/05/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11
12import unittest
13import logging
14import socket
15logging.basicConfig()
16log = logging.getLogger(__name__)
17
18import os
19from os.path import expandvars, join, dirname, abspath
20
21try:
22    from hashlib import md5
23except ImportError:
24    # Allow for < Python 2.5
25    from md5 import md5
26
27
28TEST_CONFIG_DIR = join(abspath(dirname(dirname(__file__))), 'config')
29
30mkDataDirPath = lambda file:join(TEST_CONFIG_DIR, file)
31
32from ndg.security.common.X509 import X500DN
33from ndg.security.test.unit.wsgi import PasteDeployAppServer
34
35try:
36    from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, 
37                            String)
38    from sqlalchemy.ext.declarative import declarative_base
39    from sqlalchemy.orm import sessionmaker
40   
41    sqlAlchemyInstalled = True
42except ImportError:
43    sqlAlchemyInstalled = False
44   
45   
46class BaseTestCase(unittest.TestCase):
47    '''Convenience base class from which other unit tests can extend.  Its
48    sets the generic data directory path'''
49    configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR'
50   
51    AUTHORISATION_SERVICE_PORTNUM = 9443
52    AUTHORISATION_SERVICE_URI = 'https://localhost:%s/authorisation-service' % \
53                                AUTHORISATION_SERVICE_PORTNUM
54    AUTHORISATION_SERVICE_INI_FILEPATH = mkDataDirPath(
55            os.path.join('authorisationservice', 'authorisation-service.ini'))
56                         
57    SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000
58    SITEB_ATTRIBUTEAUTHORITY_PORTNUM = 5100
59   
60    SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
61                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
62                                   
63    SITEB_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
64                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
65                                   
66    SITEA_ATTRIBUTEAUTHORITY_SAML_URI = \
67        'http://localhost:%s/AttributeAuthority/saml' % \
68                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
69                                   
70    SITEB_ATTRIBUTEAUTHORITY_SAML_URI = \
71        'http://localhost:%s/AttributeAuthority/saml' % \
72                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
73                                   
74    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM = 5443
75    SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI = \
76        'https://localhost:%d/AttributeAuthority' % \
77                                    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM
78    SSL_CERT_DN = "/C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost"
79                                   
80    SITEA_SAML_ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
81   
82    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \
83        'NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES'
84   
85    _disableServiceStartup = lambda self: bool(os.environ.get(
86        BaseTestCase.NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR))
87   
88    disableServiceStartup = property(fget=_disableServiceStartup,
89                                     doc="Stop automated start-up of services "
90                                         "for unit tests")
91   
92    NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName, 
93                                            TEST_CONFIG_DIR)
94   
95    CACERT_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'ca')
96    PKI_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'pki')
97    SSL_CERT_FILEPATH = os.path.join(PKI_DIR, 'localhost.crt')
98    SSL_PRIKEY_FILEPATH = os.path.join(PKI_DIR, 'localhost.key')
99   
100    # Test database set-up
101    DB_FILENAME = 'user.db'
102    DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME)
103    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH
104   
105    USERNAME = 'pjk'
106    PASSWORD = 'testpassword'
107    MD5_PASSWORD = md5(PASSWORD).hexdigest()
108   
109    OPENID_URI_STEM = 'https://localhost:7443/openid/'
110    OPENID_IDENTIFIER = 'philip.kershaw'
111    OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER
112   
113    FIRSTNAME = 'Philip'
114    LASTNAME = 'Kershaw'
115    EMAILADDRESS = 'pjk@somewhere.ac.uk'
116   
117    ATTRIBUTE_NAMES = (
118        "urn:siteA:security:authz:1.0:attr",
119    )
120
121    ATTRIBUTE_VALUES = (
122        'urn:siteA:security:authz:1.0:attr:postdoc',
123        'urn:siteA:security:authz:1.0:attr:staff', 
124        'urn:siteA:security:authz:1.0:attr:undergrad', 
125        'urn:siteA:security:authz:1.0:attr:coapec',
126        'urn:siteA:security:authz:1.0:attr:rapid'
127    )
128    N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES)
129   
130    VALID_REQUESTOR_IDS = (
131        X500DN.fromString("/O=Site A/CN=Authorisation Service"), 
132        X500DN.fromString("/O=Site B/CN=Authorisation Service"),
133        X500DN.fromString('/CN=test/O=NDG/OU=BADC'),
134        X500DN.fromString('/O=NDG/OU=Security/CN=localhost')
135    )
136   
137    SSL_PEM_FILENAME = 'localhost.pem'
138    SSL_PEM_FILEPATH = mkDataDirPath(os.path.join('pki', SSL_PEM_FILENAME))
139   
140    def __init__(self, *arg, **kw):
141        if BaseTestCase.configDirEnvVarName not in os.environ:
142            os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR
143               
144        unittest.TestCase.__init__(self, *arg, **kw)
145        self.services = []
146       
147    def addService(self, *arg, **kw):
148        """Utility for setting up threads to run Paste HTTP based services with
149        unit tests
150       
151        @param cfgFilePath: ini file containing configuration for the service
152        @type cfgFilePath: basestring
153        @param port: port number to run the service from
154        @type port: int
155        """
156        if self.disableServiceStartup:
157            return
158       
159        withSSL = kw.pop('withSSL', False)
160        if withSSL:
161            from OpenSSL import SSL
162           
163            certFilePath = mkDataDirPath(os.path.join('pki', 'localhost.crt'))
164            priKeyFilePath = mkDataDirPath(os.path.join('pki', 'localhost.key'))
165           
166            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
167            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
168       
169            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
170            kw['ssl_context'].use_certificate_file(certFilePath)
171           
172        try:
173            self.services.append(PasteDeployAppServer(*arg, **kw))
174            self.services[-1].startThread()
175           
176        except socket.error:
177            pass
178
179    def startAttributeAuthorities(self, withSSL=False, port=None):
180        """Serve test Attribute Authorities to test against"""
181        self.startSiteAAttributeAuthority(withSSL=withSSL, port=port)
182        self.startSiteBAttributeAuthority(withSSL=withSSL, port=port)
183       
184    def startSiteAAttributeAuthority(self, withSSL=False, port=None):
185        siteACfgFilePath = mkDataDirPath(join('attributeauthority', 
186                                              'sitea', 
187                                              'site-a.ini'))
188        self.addService(cfgFilePath=siteACfgFilePath, 
189                        port=(port or 
190                              BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM),
191                        withSSL=withSSL)
192       
193    def startSiteBAttributeAuthority(self, withSSL=False, port=None):
194        siteBCfgFilePath = mkDataDirPath(join('attributeauthority',
195                                              'siteb', 
196                                              'site-b.ini'))
197        self.addService(cfgFilePath=siteBCfgFilePath, 
198                        port=(port or 
199                              BaseTestCase.SITEB_ATTRIBUTEAUTHORITY_PORTNUM),
200                        withSSL=withSSL)
201       
202    def startAuthorisationService(self, 
203                                  withSSL=True, 
204                                  port=AUTHORISATION_SERVICE_PORTNUM):
205        self.addService(
206            cfgFilePath=self.__class__.AUTHORISATION_SERVICE_INI_FILEPATH, 
207            port=port,
208            withSSL=withSSL)
209       
210    def __del__(self):
211        """Stop any services started with the addService method"""
212        if hasattr(self, 'services'):
213            for service in self.services:
214                service.terminateThread()
215 
216    @classmethod
217    def initDb(cls):
218        """Wrapper to _createDb - Create database only if it doesn't already
219        exist"""
220        if not os.path.isfile(cls.DB_FILEPATH):
221            cls._createDb()
222       
223    @classmethod 
224    def _createDb(cls):
225        """Create a test SQLite database with SQLAlchemy for use with unit
226        tests
227        """
228        log.debug("Creating database for %r ..." % cls.__name__)
229       
230        if not sqlAlchemyInstalled:
231            raise NotImplementedError("SQLAlchemy must be installed in order "
232                                      "for this method to be implemented")
233           
234        db = create_engine(cls.DB_CONNECTION_STR)
235       
236        metadata = MetaData()
237        usersTable = Table('users', metadata,
238                           Column('id', Integer, primary_key=True),
239                           Column('username', String),
240                           Column('md5password', String),
241                           Column('openid', String),
242                           Column('openid_identifier', String),
243                           Column('firstname', String),
244                           Column('lastname', String),
245                           Column('emailaddress', String))
246       
247        attributesTable = Table('attributes', metadata,
248                                Column('id', Integer, primary_key=True),
249                                Column('openid', String),
250                                Column('attributename', String))
251        metadata.create_all(db)
252       
253        class User(declarative_base()):
254            __tablename__ = 'users'
255       
256            id = Column(Integer, primary_key=True)
257            username = Column('username', String(40))
258            md5password = Column('md5password', String(64))
259            openid = Column('openid', String(128))
260            openid_identifier = Column('openid_identifier', String(40))
261            firstname = Column('firstname', String(40))
262            lastname = Column('lastname', String(40))
263            emailAddress = Column('emailaddress', String(40))
264       
265            def __init__(self, username, md5password, openid, openid_identifier, 
266                         firstname, lastname, emailaddress):
267                self.username = username
268                self.md5password = md5password
269                self.openid = openid
270                self.openid_identifier = openid_identifier
271                self.firstname = firstname
272                self.lastname = lastname
273                self.emailAddress = emailaddress
274       
275        class Attribute(declarative_base()):
276            __tablename__ = 'attributes'
277       
278            id = Column(Integer, primary_key=True)
279            openid = Column('openid', String(128))
280            attributename = Column('attributename', String(40))
281       
282            def __init__(self, openid, attributename):
283                self.openid = openid
284                self.attributename = attributename
285
286        Session = sessionmaker(bind=db)
287        session = Session()
288       
289        attributes = [Attribute(cls.OPENID_URI, attrVal)
290                      for attrVal in cls.ATTRIBUTE_VALUES]
291        session.add_all(attributes)
292           
293        user = User(cls.USERNAME, 
294                    cls.MD5_PASSWORD,
295                    cls.OPENID_URI,
296                    cls.OPENID_IDENTIFIER,
297                    cls.FIRSTNAME,
298                    cls.LASTNAME,
299                    cls.EMAILADDRESS)
300       
301        session.add(user)
302        session.commit() 
303
304
305def _getParentDir(depth=0, path=dirname(__file__)):
306    """
307    @type path: basestring
308    @param path: directory path from which to get parent directory, defaults
309    to dir of this module
310    @rtype: basestring
311    @return: parent directory at depth levels up from the current path
312    """
313    for i in range(depth):
314        path = dirname(path)
315    return path
316
317
Note: See TracBrowser for help on using the repository browser.