1 | #!/usr/bin/env python |
---|
2 | """Unit tests for NDG Security MyProxy Extensions callout for adding SAML |
---|
3 | Attribute Assertions to issued X.509 Certificates |
---|
4 | |
---|
5 | NERC DataGrid Project |
---|
6 | """ |
---|
7 | __author__ = "P J Kershaw" |
---|
8 | __date__ = "29/10/09" |
---|
9 | __copyright__ = "(C) 2009 Science and Technology Facilities Council" |
---|
10 | __license__ = "BSD - see LICENSE file in top-level directory" |
---|
11 | __contact__ = "Philip.Kershaw@stfc.ac.uk" |
---|
12 | __revision__ = '$Id$' |
---|
13 | import logging |
---|
14 | logging.basicConfig(level=logging.DEBUG) |
---|
15 | |
---|
16 | import os |
---|
17 | import sys |
---|
18 | from cStringIO import StringIO |
---|
19 | import unittest |
---|
20 | |
---|
21 | from ndg.security.common.saml_utils.esgf import ESGFSamlNamespaces |
---|
22 | from ndg.security.test.unit import BaseTestCase |
---|
23 | from ndg.security.server.myproxy.certificate_extapp.saml_attribute_assertion \ |
---|
24 | import CertExtApp, CertExtConsoleApp |
---|
25 | |
---|
26 | |
---|
27 | class CertExtAppTestCase(BaseTestCase): |
---|
28 | """Test SAML Assertion Certificate Extension plugin for MyProxy""" |
---|
29 | THIS_DIR = os.path.dirname(os.path.abspath(__file__)) |
---|
30 | OPENID_SQL_QUERY = ("select openid from users where username = " |
---|
31 | "'${username}'") |
---|
32 | INI_FILEPATH = os.path.join(THIS_DIR, 'config.ini') |
---|
33 | |
---|
34 | def __init__(self, *arg, **kw): |
---|
35 | super(CertExtAppTestCase, self).__init__(*arg, **kw) |
---|
36 | self.startSiteAAttributeAuthority(withSSL=True, |
---|
37 | port=CertExtAppTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM) |
---|
38 | self.initDb() |
---|
39 | |
---|
40 | def test01DbQuery(self): |
---|
41 | myProxyCertExtApp = CertExtApp() |
---|
42 | myProxyCertExtApp.connectionString = \ |
---|
43 | CertExtAppTestCase.DB_CONNECTION_STR |
---|
44 | |
---|
45 | myProxyCertExtApp.openIdSqlQuery = CertExtAppTestCase.OPENID_SQL_QUERY |
---|
46 | |
---|
47 | openid = myProxyCertExtApp.queryOpenId(CertExtAppTestCase.USERNAME) |
---|
48 | self.assert_(openid == CertExtAppTestCase.OPENID_URI) |
---|
49 | |
---|
50 | def test02AttributeQuery(self): |
---|
51 | myProxyCertExtApp = CertExtApp() |
---|
52 | myProxyCertExtApp.attributeQuery.issuerName = \ |
---|
53 | "/CN=Authorisation Service/O=Site A" |
---|
54 | myProxyCertExtApp.attributeQuery.subjectIdFormat = \ |
---|
55 | ESGFSamlNamespaces.NAMEID_FORMAT |
---|
56 | myProxyCertExtApp.attributeQuery.subjectID = \ |
---|
57 | CertExtAppTestCase.OPENID_URI |
---|
58 | |
---|
59 | myProxyCertExtApp.attributeQuery.sslCACertDir = \ |
---|
60 | CertExtAppTestCase.CACERT_DIR |
---|
61 | myProxyCertExtApp.attributeQuery.sslCertFilePath = \ |
---|
62 | os.path.join(CertExtAppTestCase.PKI_DIR, 'test.crt') |
---|
63 | myProxyCertExtApp.attributeQuery.sslPriKeyFilePath = \ |
---|
64 | os.path.join(CertExtAppTestCase.PKI_DIR, 'test.key') |
---|
65 | myProxyCertExtApp.attributeQuery.sslValidDNs = \ |
---|
66 | CertExtAppTestCase.SSL_CERT_DN |
---|
67 | |
---|
68 | response = myProxyCertExtApp.attributeQuery.send( |
---|
69 | uri=CertExtAppTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI) |
---|
70 | print(response) |
---|
71 | |
---|
72 | def test03End2End(self): |
---|
73 | myProxyCertExtApp = CertExtApp() |
---|
74 | |
---|
75 | myProxyCertExtApp.connectionString = \ |
---|
76 | CertExtAppTestCase.DB_CONNECTION_STR |
---|
77 | |
---|
78 | myProxyCertExtApp.openIdSqlQuery = ("select openid from users where " |
---|
79 | "username = '%s'" % |
---|
80 | CertExtAppTestCase.USERNAME) |
---|
81 | |
---|
82 | myProxyCertExtApp.attributeAuthorityURI = \ |
---|
83 | CertExtAppTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_SAML_URI |
---|
84 | myProxyCertExtApp.attributeQuery.issuerName = \ |
---|
85 | "/CN=Authorisation Service/O=Site A" |
---|
86 | |
---|
87 | myProxyCertExtApp.attributeQuery.subjectIdFormat = \ |
---|
88 | ESGFSamlNamespaces.NAMEID_FORMAT |
---|
89 | myProxyCertExtApp.attributeQuery.sslCACertDir = \ |
---|
90 | CertExtAppTestCase.CACERT_DIR |
---|
91 | myProxyCertExtApp.attributeQuery.sslCertFilePath = \ |
---|
92 | os.path.join(CertExtAppTestCase.PKI_DIR, 'test.crt') |
---|
93 | myProxyCertExtApp.attributeQuery.sslPriKeyFilePath = \ |
---|
94 | os.path.join(CertExtAppTestCase.PKI_DIR, 'test.key') |
---|
95 | myProxyCertExtApp.attributeQuery.sslValidDNs = \ |
---|
96 | CertExtAppTestCase.SSL_CERT_DN |
---|
97 | |
---|
98 | assertion = myProxyCertExtApp(CertExtAppTestCase.USERNAME) |
---|
99 | self.assert_(assertion) |
---|
100 | print(assertion) |
---|
101 | |
---|
102 | def test04FromConfigFile(self): |
---|
103 | myProxyCertExtApp = CertExtApp.fromConfigFile( |
---|
104 | CertExtAppTestCase.INI_FILEPATH) |
---|
105 | assertion = myProxyCertExtApp(CertExtAppTestCase.USERNAME) |
---|
106 | self.assert_(assertion) |
---|
107 | print(assertion) |
---|
108 | |
---|
109 | def test05ConsoleApp(self): |
---|
110 | import sys |
---|
111 | sys.argv = [ |
---|
112 | None, |
---|
113 | "-f", CertExtAppTestCase.INI_FILEPATH, |
---|
114 | "-u", CertExtAppTestCase.USERNAME |
---|
115 | ] |
---|
116 | try: |
---|
117 | stdOut = sys.stdout |
---|
118 | sys.stdout = StringIO() |
---|
119 | |
---|
120 | CertExtConsoleApp.run() |
---|
121 | output = sys.stdout.getvalue() |
---|
122 | finally: |
---|
123 | sys.stdout = stdOut |
---|
124 | |
---|
125 | self.assert_(output) |
---|
126 | print(output) |
---|
127 | |
---|
128 | |
---|
129 | if __name__ == "__main__": |
---|
130 | unittest.main() |
---|