source: TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py @ 7517

Subversion URL: http://proj.badc.rl.ac.uk/svn/ndg-security/TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/__init__.py@7698
Revision 7517, 11.9 KB checked in by pjkersha, 10 years ago (diff)

2.0.0 release for NDG Security

  • Fixed bug with incorrect SAML X.509 Subject Name urn in test ini files.
  • All unit tests and integration tests pass
  • Property svn:keywords set to Id
Line 
1"""NDG Security unit test package
2
3NERC DataGrid Project
4"""
5__author__ = "P J Kershaw"
6__date__ = "14/05/09"
7__copyright__ = "(C) 2009 Science and Technology Facilities Council"
8__license__ = "BSD - see LICENSE file in top-level directory"
9__contact__ = "Philip.Kershaw@stfc.ac.uk"
10__revision__ = '$Id$'
11
12import unittest
13import logging
14import socket
15
16logging.basicConfig()
17log = logging.getLogger(__name__)
18
19import os
20from os.path import expandvars, join, dirname, abspath
21
22try:
23    from hashlib import md5
24except ImportError:
25    # Allow for < Python 2.5
26    from md5 import md5
27
28
29TEST_CONFIG_DIR = join(abspath(dirname(dirname(__file__))), 'config')
30
31mkDataDirPath = lambda file:join(TEST_CONFIG_DIR, file)
32
33from ndg.security.common.X509 import X500DN
34from ndg.security.test.unit.wsgi import PasteDeployAppServer
35
36try:
37    from sqlalchemy import (create_engine, MetaData, Table, Column, Integer, 
38                            String)
39    from sqlalchemy.ext.declarative import declarative_base
40    from sqlalchemy.orm import sessionmaker
41   
42    sqlAlchemyInstalled = True
43except ImportError:
44    sqlAlchemyInstalled = False
45   
46   
47class BaseTestCase(unittest.TestCase):
48    '''Convenience base class from which other unit tests can extend.  Its
49    sets the generic data directory path'''
50    configDirEnvVarName = 'NDGSEC_TEST_CONFIG_DIR'
51   
52    AUTHORISATION_SERVICE_PORTNUM = 9443
53    AUTHORISATION_SERVICE_URI = 'https://localhost:%s/authorisation-service' % \
54                                AUTHORISATION_SERVICE_PORTNUM
55    AUTHORISATION_SERVICE_INI_FILEPATH = mkDataDirPath(
56            os.path.join('authorisationservice', 'authorisation-service.ini'))
57                         
58    SITEA_ATTRIBUTEAUTHORITY_PORTNUM = 5000
59    SITEB_ATTRIBUTEAUTHORITY_PORTNUM = 5100
60   
61    SITEA_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
62                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
63                                   
64    SITEB_ATTRIBUTEAUTHORITY_URI = 'http://localhost:%s/AttributeAuthority' % \
65                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
66                                   
67    SITEA_ATTRIBUTEAUTHORITY_SAML_URI = \
68        'http://localhost:%s/AttributeAuthority/saml' % \
69                                    SITEA_ATTRIBUTEAUTHORITY_PORTNUM
70                                   
71    SITEB_ATTRIBUTEAUTHORITY_SAML_URI = \
72        'http://localhost:%s/AttributeAuthority/saml' % \
73                                    SITEB_ATTRIBUTEAUTHORITY_PORTNUM
74                                   
75    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM = 5443
76    SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI = \
77        'https://localhost:%d/AttributeAuthority' % \
78                                    SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM
79    SSL_CERT_DN = "/O=NDG/OU=Security/CN=localhost"
80                                   
81    SITEA_SAML_ISSUER_NAME = "/O=Site A/CN=Attribute Authority"
82   
83    NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR = \
84        'NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES'
85   
86    _disableServiceStartup = lambda self: bool(os.environ.get(
87        BaseTestCase.NDGSEC_UNITTESTS_DISABLE_THREAD_SERVICES_ENVVAR))
88   
89    disableServiceStartup = property(fget=_disableServiceStartup,
90                                     doc="Stop automated start-up of services "
91                                         "for unit tests")
92   
93    NDGSEC_TEST_CONFIG_DIR = os.environ.get(configDirEnvVarName, 
94                                            TEST_CONFIG_DIR)
95   
96    CACERT_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'ca')
97    PKI_DIR = os.path.join(NDGSEC_TEST_CONFIG_DIR, 'pki')
98    SSL_CERT_FILEPATH = os.path.join(PKI_DIR, 'localhost.crt')
99    SSL_PRIKEY_FILEPATH = os.path.join(PKI_DIR, 'localhost.key')
100   
101    # Test database set-up
102    DB_FILENAME = 'user.db'
103    DB_FILEPATH = join(NDGSEC_TEST_CONFIG_DIR, DB_FILENAME)
104    DB_CONNECTION_STR = 'sqlite:///%s' % DB_FILEPATH
105   
106    USERNAME = 'pjk'
107    PASSWORD = 'testpassword'
108    MD5_PASSWORD = md5(PASSWORD).hexdigest()
109   
110    OPENID_URI_STEM = 'https://localhost:7443/openid/'
111    OPENID_IDENTIFIER = 'philip.kershaw'
112    OPENID_URI = OPENID_URI_STEM + OPENID_IDENTIFIER
113   
114    FIRSTNAME = 'Philip'
115    LASTNAME = 'Kershaw'
116    EMAILADDRESS = 'pjk@somewhere.ac.uk'
117   
118    ATTRIBUTE_NAMES = (
119        "urn:siteA:security:authz:1.0:attr",
120    )
121
122    ATTRIBUTE_VALUES = (
123        'postdoc',
124        'staff', 
125        'undergrad', 
126        'coapec',
127        'rapid',
128        'admin'
129    )
130    N_ATTRIBUTE_VALUES = len(ATTRIBUTE_VALUES)
131   
132    VALID_REQUESTOR_IDS = (
133        X500DN.fromString("/O=Site A/CN=Authorisation Service"), 
134        X500DN.fromString("/O=Site B/CN=Authorisation Service"),
135        X500DN.fromString('/CN=test/O=NDG/OU=BADC'),
136        X500DN.fromString('/O=NDG/OU=Security/CN=localhost')
137    )
138   
139    SSL_PEM_FILENAME = 'localhost.pem'
140    SSL_PEM_FILEPATH = mkDataDirPath(os.path.join('pki', SSL_PEM_FILENAME))
141   
142    def __init__(self, *arg, **kw):
143        if BaseTestCase.configDirEnvVarName not in os.environ:
144            os.environ[BaseTestCase.configDirEnvVarName] = TEST_CONFIG_DIR
145               
146        unittest.TestCase.__init__(self, *arg, **kw)
147        self.services = []
148       
149    def addService(self, *arg, **kw):
150        """Utility for setting up threads to run Paste HTTP based services with
151        unit tests
152       
153        @param cfgFilePath: ini file containing configuration for the service
154        @type cfgFilePath: basestring
155        @param port: port number to run the service from
156        @type port: int
157        """
158        if self.disableServiceStartup:
159            return
160       
161        withSSL = kw.pop('withSSL', False)
162        if withSSL:
163            from OpenSSL import SSL
164           
165            certFilePath = mkDataDirPath(os.path.join('pki', 'localhost.crt'))
166            priKeyFilePath = mkDataDirPath(os.path.join('pki', 'localhost.key'))
167           
168            kw['ssl_context'] = SSL.Context(SSL.SSLv23_METHOD)
169            kw['ssl_context'].set_options(SSL.OP_NO_SSLv2)
170       
171            kw['ssl_context'].use_privatekey_file(priKeyFilePath)
172            kw['ssl_context'].use_certificate_file(certFilePath)
173           
174        try:
175            self.services.append(PasteDeployAppServer(*arg, **kw))
176            self.services[-1].startThread()
177           
178        except socket.error:
179            pass
180
181    def startAttributeAuthorities(self, withSSL=False, port=None):
182        """Serve test Attribute Authorities to test against"""
183        self.startSiteAAttributeAuthority(withSSL=withSSL, port=port)
184        self.startSiteBAttributeAuthority(withSSL=withSSL, port=port)
185       
186    def startSiteAAttributeAuthority(self, withSSL=False, port=None):
187        siteACfgFilePath = mkDataDirPath(join('attributeauthority', 
188                                              'sitea', 
189                                              'site-a.ini'))
190        self.addService(cfgFilePath=siteACfgFilePath, 
191                        port=(port or 
192                              BaseTestCase.SITEA_ATTRIBUTEAUTHORITY_PORTNUM),
193                        withSSL=withSSL)
194       
195    def startSiteBAttributeAuthority(self, withSSL=False, port=None):
196        siteBCfgFilePath = mkDataDirPath(join('attributeauthority',
197                                              'siteb', 
198                                              'site-b.ini'))
199        self.addService(cfgFilePath=siteBCfgFilePath, 
200                        port=(port or 
201                              BaseTestCase.SITEB_ATTRIBUTEAUTHORITY_PORTNUM),
202                        withSSL=withSSL)
203       
204    def startAuthorisationService(self, 
205                                  withSSL=True, 
206                                  port=AUTHORISATION_SERVICE_PORTNUM):
207        self.addService(
208            cfgFilePath=self.__class__.AUTHORISATION_SERVICE_INI_FILEPATH, 
209            port=port,
210            withSSL=withSSL)
211       
212    def __del__(self):
213        self.stopAllServices()
214       
215    def stopAllServices(self):
216        """Stop any services started with the addService method"""
217        if hasattr(self, 'services'):
218            for service in self.services:
219                service.terminateThread()
220 
221    @classmethod
222    def initDb(cls):
223        """Wrapper to _createDb - Create database only if it doesn't already
224        exist"""
225        if not os.path.isfile(cls.DB_FILEPATH):
226            cls._createDb()
227       
228    @classmethod 
229    def _createDb(cls):
230        """Create a test SQLite database with SQLAlchemy for use with unit
231        tests
232        """
233        log.debug("Creating database for %r ..." % cls.__name__)
234       
235        if not sqlAlchemyInstalled:
236            raise NotImplementedError("SQLAlchemy must be installed in order "
237                                      "for this method to be implemented")
238           
239        db = create_engine(cls.DB_CONNECTION_STR)
240       
241        metadata = MetaData()
242        usersTable = Table('users', metadata,
243                           Column('id', Integer, primary_key=True),
244                           Column('username', String),
245                           Column('md5password', String),
246                           Column('openid', String),
247                           Column('openid_identifier', String),
248                           Column('firstname', String),
249                           Column('lastname', String),
250                           Column('emailaddress', String))
251       
252        attributesTable = Table('attributes', metadata,
253                                Column('id', Integer, primary_key=True),
254                                Column('openid', String),
255                                Column('attributename', String))
256        metadata.create_all(db)
257       
258        class User(declarative_base()):
259            __tablename__ = 'users'
260       
261            id = Column(Integer, primary_key=True)
262            username = Column('username', String(40))
263            md5password = Column('md5password', String(64))
264            openid = Column('openid', String(128))
265            openid_identifier = Column('openid_identifier', String(40))
266            firstname = Column('firstname', String(40))
267            lastname = Column('lastname', String(40))
268            emailAddress = Column('emailaddress', String(40))
269       
270            def __init__(self, username, md5password, openid, openid_identifier, 
271                         firstname, lastname, emailaddress):
272                self.username = username
273                self.md5password = md5password
274                self.openid = openid
275                self.openid_identifier = openid_identifier
276                self.firstname = firstname
277                self.lastname = lastname
278                self.emailAddress = emailaddress
279       
280        class Attribute(declarative_base()):
281            __tablename__ = 'attributes'
282       
283            id = Column(Integer, primary_key=True)
284            openid = Column('openid', String(128))
285            attributename = Column('attributename', String(40))
286       
287            def __init__(self, openid, attributename):
288                self.openid = openid
289                self.attributename = attributename
290
291        Session = sessionmaker(bind=db)
292        session = Session()
293       
294        attributes = [Attribute(cls.OPENID_URI, attrVal)
295                      for attrVal in cls.ATTRIBUTE_VALUES]
296        session.add_all(attributes)
297           
298        user = User(cls.USERNAME, 
299                    cls.MD5_PASSWORD,
300                    cls.OPENID_URI,
301                    cls.OPENID_IDENTIFIER,
302                    cls.FIRSTNAME,
303                    cls.LASTNAME,
304                    cls.EMAILADDRESS)
305       
306        session.add(user)
307        session.commit() 
308
309
310def _getParentDir(depth=0, path=dirname(__file__)):
311    """
312    @type path: basestring
313    @param path: directory path from which to get parent directory, defaults
314    to dir of this module
315    @rtype: basestring
316    @return: parent directory at depth levels up from the current path
317    """
318    for i in range(depth):
319        path = dirname(path)
320    return path
321
322
Note: See TracBrowser for help on using the repository browser.