- Timestamp:
- 15/02/10 09:24:37 (11 years ago)
- Location:
- TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient
- Files:
-
- 2 added
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient
- Property svn:ignore
-
old new 1 1 *.pem 2 ac-clnt.xml 3 ac-clnt-test6.xml 4 mapped-ac.xml
-
- Property svn:ignore
-
TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/__init__.py
r4840 r6571 9 9 __contact__ = "Philip.Kershaw@stfc.ac.uk" 10 10 __revision__ = '$Id$' 11 from os import path, environ 12 13 from ndg.security.test.unit import BaseTestCase, mkDataDirPath 14 from ndg.security.common.X509 import X509Cert 15 from ndg.security.common.utils.configfileparsers import ( 16 CaseSensitiveConfigParser) 17 18 19 class AttributeAuthorityClientBaseTestCase(BaseTestCase): 20 """Base class for NDG and SAML Attribute Authority client interfaces""" 21 CONFIG_FILENAME = 'attAuthorityClientTest.cfg' 22 23 def __init__(self, *arg, **kw): 24 super(AttributeAuthorityClientBaseTestCase, self).__init__(*arg, **kw) 25 26 if 'NDGSEC_AACLNT_UNITTEST_DIR' not in environ: 27 environ['NDGSEC_AACLNT_UNITTEST_DIR' 28 ] = path.abspath(path.dirname(__file__)) 29 30 self.cfgParser = CaseSensitiveConfigParser() 31 self.cfgFilePath = path.join(environ['NDGSEC_AACLNT_UNITTEST_DIR'], 32 self.__class__.CONFIG_FILENAME) 33 self.cfgParser.read(self.cfgFilePath) 34 35 self.cfg = {} 36 for section in self.cfgParser.sections(): 37 self.cfg[section] = dict(self.cfgParser.items(section)) 38 39 try: 40 self.sslCACertList = [X509Cert.Read(xpdVars(caFile)) 41 for caFile in self.cfg['setUp'][ 42 'sslcaCertFilePathList'].split()] 43 except KeyError: 44 self.sslCACertList = [] 45 46 self.startAttributeAuthorities() -
TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/attAuthorityClientTest.cfg
r6570 r6571 1 # NERC Data 1 # NERC DataGrid Project 2 2 # 3 3 # P J Kershaw 16/01/07 … … 48 48 userAttCertFilePathList = $NDGSEC_AACLNT_UNITTEST_DIR/ac-clnt.xml 49 49 50 # SAML Interface use a separate test case class in the test module and is51 # included here52 [test01SAMLAttributeQuery]53 uri = http://localhost:5000/AttributeAuthority/saml54 subject = https://openid.localhost/philip.kershaw55 siteAttributeName = urn:siteA:security:authz:1.0:attr56 57 [test02SAMLAttributeQueryInvalidIssuer]58 uri = http://localhost:5000/AttributeAuthority/saml59 subject = https://openid.localhost/philip.kershaw60 siteAttributeName = urn:siteA:security:authz:1.0:attr61 62 [test03SAMLAttributeQueryUnknownSubject]63 uri = http://localhost:5000/AttributeAuthority/saml64 subject = https://openid.localhost/unknown65 siteAttributeName = urn:siteA:security:authz:1.0:attr66 67 [test04SAMLAttributeQueryInvalidAttrName]68 uri = http://localhost:5000/AttributeAuthority/saml69 subject = https://openid.localhost/philip.kershaw70 siteAttributeName = invalid-attr71 72 [test05AttributeQuerySOAPBindingInterface]73 uri = http://localhost:5000/AttributeAuthority/saml74 subject = https://openid.localhost/philip.kershaw75 76 [test06AttributeQueryFromConfig]77 uri = http://localhost:5000/AttributeAuthority/saml78 subject = https://openid.localhost/philip.kershaw79 80 attributeQuery.subjectIdFormat = urn:esg:openid81 attributeQuery.clockSkewTolerance = 0.82 attributeQuery.issuerName = /O=Site A/CN=Authorisation Service83 attributeQuery.queryAttributes.0 = urn:esg:first:name, FirstName, http://www.w3.org/2001/XMLSchema#string84 attributeQuery.queryAttributes.roles = urn:siteA:security:authz:1.0:attr, , http://www.w3.org/2001/XMLSchema#string85 86 [test07AttributeQuerySslSOAPBindingInterface]87 uri = http://localhost:5000/AttributeAuthority/saml88 subject = https://openid.localhost/philip.kershaw89 90 attributeQuery.subjectIdFormat = urn:esg:openid91 attributeQuery.clockSkewTolerance = 0.92 attributeQuery.issuerName = /O=Site A/CN=Authorisation Service93 attributeQuery.queryAttributes.0 = urn:esg:email:address, EmailAddress, http://www.w3.org/2001/XMLSchema#string94 attributeQuery.queryAttributes.roles = urn:siteA:security:authz:1.0:attr, , http://www.w3.org/2001/XMLSchema#string95 96 # SSL Context Proxy settings97 attributeQuery.sslCACertDir = $NDGSEC_TEST_CONFIG_DIR/ca98 attributeQuery.sslCertFilePath = $NDGSEC_TEST_CONFIG_DIR/pki/test.crt99 attributeQuery.sslPriKeyFilePath = $NDGSEC_TEST_CONFIG_DIR/pki/test.key100 attributeQuery.sslValidDNs = /C=UK/ST=Oxfordshire/O=BADC/OU=Security/CN=localhost, /O=Site A/CN=Attribute Authority101 102 50 [wsse] 103 51 # WS-Security settings for unit test AA clients -
TI12-security/trunk/NDGSecurity/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/test_attributeauthorityclient.py
r6570 r6571 1 1 #!/usr/bin/env python 2 """NDG Attribute Authority SOAP client unit tests2 """NDG Attribute Authority WSDL SOAP client unit tests 3 3 4 4 NERC DataGrid Project … … 17 17 18 18 from os.path import expandvars as xpdVars 19 from os.path import join as jnPath20 mkPath = lambda file: jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], file)21 22 from datetime import datetime23 from uuid import uuid424 from xml.etree import ElementTree25 26 from ndg.security.test.unit import BaseTestCase, mkDataDirPath27 28 from ndg.security.common.utils.etree import prettyPrint29 19 30 20 from ndg.security.common.attributeauthority import (AttributeAuthorityClient, 31 21 NoMatchingRoleInTrustedHosts) 32 22 from ndg.security.common.AttCert import AttCertRead 33 from ndg.security.common.X509 import X509CertParse, X509CertRead 34 from ndg.security.common.utils.configfileparsers import ( 35 CaseSensitiveConfigParser) 36 37 from saml.common.xml import SAMLConstants 38 from saml.saml2.core import (Attribute, SAMLVersion, Subject, NameID, Issuer, 39 AttributeQuery, XSStringAttributeValue, StatusCode) 40 from saml.xml.etree import ResponseElementTree 41 42 from ndg.security.common.saml_utils.binding.soap import SOAPBinding as \ 43 SamlSoapBinding 44 from ndg.security.common.saml_utils.binding.soap.attributequery import ( 45 AttributeQuerySslSOAPBinding, 46 AttributeQuerySOAPBinding) 47 from ndg.security.common.saml_utils.esg import (EsgSamlNamespaces, 48 XSGroupRoleAttributeValue, 49 EsgDefaultQueryAttributes) 50 51 52 class AttributeAuthorityClientBaseTestCase(BaseTestCase): 53 def __init__(self, *arg, **kw): 54 super(AttributeAuthorityClientBaseTestCase, self).__init__(*arg, **kw) 55 56 if 'NDGSEC_AACLNT_UNITTEST_DIR' not in os.environ: 57 os.environ['NDGSEC_AACLNT_UNITTEST_DIR' 58 ] = os.path.abspath(os.path.dirname(__file__)) 59 60 self.cfgParser = CaseSensitiveConfigParser() 61 self.cfgFilePath = jnPath(os.environ['NDGSEC_AACLNT_UNITTEST_DIR'], 62 'attAuthorityClientTest.cfg') 63 self.cfgParser.read(self.cfgFilePath) 64 65 self.cfg = {} 66 for section in self.cfgParser.sections(): 67 self.cfg[section] = dict(self.cfgParser.items(section)) 68 69 try: 70 self.sslCACertList = [X509CertRead(xpdVars(caFile)) 71 for caFile in self.cfg['setUp'][ 72 'sslcaCertFilePathList'].split()] 73 except KeyError: 74 self.sslCACertList = [] 75 76 self.startAttributeAuthorities() 23 from ndg.security.common.X509 import X509CertParse 24 25 from ndg.security.test.unit.attributeauthorityclient import \ 26 AttributeAuthorityClientBaseTestCase 77 27 78 28 79 29 class AttributeAuthorityClientTestCase(AttributeAuthorityClientBaseTestCase): 30 '''NDG Attribute Authority WSDL SOAP client unit tests''' 80 31 clntPriKeyPwd = None 81 32 pemPat = "-----BEGIN CERTIFICATE-----[^\-]*-----END CERTIFICATE-----" … … 126 77 for hostname, hostInfo in trustedHostInfo.items(): 127 78 self.assert_(hostname, "Hostname not set") 128 for k , v in hostInfo.items():79 for k in hostInfo.keys(): 129 80 self.assert_(k, "hostInfo value key unset") 130 81 … … 144 95 (_cfg['role'], e)) 145 96 146 147 97 def test04GetTrustedHostInfoWithNoRole(self): 148 98 """test04GetTrustedHostInfoWithNoRole: retrieve trusted host info … … 151 101 for hostname, hostInfo in trustedHostInfo.items(): 152 102 self.assert_(hostname, "Hostname not set") 153 for k , v in hostInfo.items():103 for k in hostInfo.keys(): 154 104 self.assert_(k, "hostInfo value key unset") 155 105 156 106 print "Trusted Host Info:\n %s" % trustedHostInfo 157 107 158 159 108 def test05GetAllHostsInfo(self): 160 109 """test05GetAllHostsInfo: retrieve info for all hosts""" … … 162 111 for hostname, hostInfo in allHostInfo.items(): 163 112 self.assert_(hostname, "Hostname not set") 164 for k , v in hostInfo.items():113 for k in hostInfo.keys(): 165 114 self.assert_(k, "hostInfo value key unset") 166 115 167 116 print "All Hosts Info:\n %s" % allHostInfo 168 169 117 170 118 def test06GetAttCert(self): … … 183 131 184 132 except IOError, ioErr: 185 raise Exception("Error reading certificate file \"%s\": %s" %186 133 raise IOError("Error reading certificate file \"%s\": %s" % 134 (ioErr.filename, ioErr.strerror)) 187 135 188 136 # Make attribute certificate request … … 192 140 193 141 attCert.filePath = xpdVars(_cfg['attCertFilePath']) 194 attCert.write() 195 142 attCert.write() 196 143 197 144 def test07GetAttCertWithUserIdSet(self): … … 211 158 212 159 except IOError, ioErr: 213 raise Exception("Error reading certificate file \"%s\": %s" %214 160 raise IOError("Error reading certificate file \"%s\": %s" % 161 (ioErr.filename, ioErr.strerror)) 215 162 216 163 # Make attribute certificate request … … 224 171 attCert.write() 225 172 226 227 173 def test08GetMappedAttCert(self): 228 174 """test08GetMappedAttCert: Request mapped attribute certificate from … … 240 186 241 187 except IOError, ioErr: 242 raise Exception("Error reading certificate file \"%s\": %s" % \243 188 raise IOError("Error reading certificate file \"%s\": %s" % 189 (ioErr.filename, ioErr.strerror)) 244 190 245 191 # Simlarly for Attribute Certificate … … 280 226 281 227 except IOError, ioErr: 282 raise Exception("Error reading certificate file \"%s\": %s" %283 228 raise IOError("Error reading certificate file \"%s\": %s" % 229 (ioErr.filename, ioErr.strerror)) 284 230 285 231 # Make client to site B Attribute Authority … … 288 234 cfg=self.cfgParser) 289 235 290 acFilePathList = [xpdVars( file) for file in \291 _cfg['userAttCertFilePathList'].split()]236 acFilePathList = [xpdVars(acFile) 237 for acFile in _cfg['userAttCertFilePathList'].split()] 292 238 293 239 for acFilePath in acFilePathList: … … 308 254 msgFile = open(outFilePfx+".msg", 'w') 309 255 msgFile.write('Failed for "%s": %s\n' % (acFilePath, e)) 310 311 312 class AttributeAuthoritySAMLInterfaceTestCase( 313 AttributeAuthorityClientBaseTestCase): 314 """Separate class for Attribute Authority SAML Attribute Query interface""" 315 316 def __init__(self, *arg, **kw): 317 super(AttributeAuthoritySAMLInterfaceTestCase, self).__init__(*arg, 318 **kw) 319 self.startSiteAAttributeAuthority(withSSL=True, port=5443) 320 321 def test01SAMLAttributeQuery(self): 322 _cfg = self.cfg['test01SAMLAttributeQuery'] 323 324 attributeQuery = AttributeQuery() 325 attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 326 attributeQuery.id = str(uuid4()) 327 attributeQuery.issueInstant = datetime.utcnow() 328 329 attributeQuery.issuer = Issuer() 330 attributeQuery.issuer.format = Issuer.X509_SUBJECT 331 attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A" 332 333 attributeQuery.subject = Subject() 334 attributeQuery.subject.nameID = NameID() 335 attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT 336 attributeQuery.subject.nameID.value = _cfg['subject'] 337 xsStringNs = SAMLConstants.XSD_NS+"#"+\ 338 XSStringAttributeValue.TYPE_LOCAL_NAME 339 fnAttribute = Attribute() 340 fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME 341 fnAttribute.nameFormat = xsStringNs 342 fnAttribute.friendlyName = "FirstName" 343 344 attributeQuery.attributes.append(fnAttribute) 345 346 lnAttribute = Attribute() 347 lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME 348 lnAttribute.nameFormat = xsStringNs 349 lnAttribute.friendlyName = "LastName" 350 351 attributeQuery.attributes.append(lnAttribute) 352 353 emailAddressAttribute = Attribute() 354 emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME 355 emailAddressAttribute.nameFormat = xsStringNs 356 emailAddressAttribute.friendlyName = "emailAddress" 357 358 attributeQuery.attributes.append(emailAddressAttribute) 359 360 siteAAttribute = Attribute() 361 siteAAttribute.name = _cfg['siteAttributeName'] 362 siteAAttribute.nameFormat = xsStringNs 363 364 attributeQuery.attributes.append(siteAAttribute) 365 366 binding = SamlSoapBinding() 367 response = binding.send(attributeQuery, _cfg['uri']) 368 369 self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI) 370 371 # Check Query ID matches the query ID the service received 372 self.assert_(response.inResponseTo == attributeQuery.id) 373 374 now = datetime.utcnow() 375 self.assert_(response.issueInstant < now) 376 self.assert_(response.assertions[-1].issueInstant < now) 377 self.assert_(response.assertions[-1].conditions.notBefore < now) 378 self.assert_(response.assertions[-1].conditions.notOnOrAfter > now) 379 380 samlResponseElem = ResponseElementTree.toXML(response) 381 382 print("SAML Response ...") 383 print(ElementTree.tostring(samlResponseElem)) 384 print("Pretty print SAML Response ...") 385 print(prettyPrint(samlResponseElem)) 386 387 def test02SAMLAttributeQueryInvalidIssuer(self): 388 _cfg = self.cfg['test02SAMLAttributeQueryInvalidIssuer'] 389 390 attributeQuery = AttributeQuery() 391 attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 392 attributeQuery.id = str(uuid4()) 393 attributeQuery.issueInstant = datetime.utcnow() 394 395 attributeQuery.issuer = Issuer() 396 attributeQuery.issuer.format = Issuer.X509_SUBJECT 397 attributeQuery.issuer.value = "Invalid Site" 398 399 attributeQuery.subject = Subject() 400 attributeQuery.subject.nameID = NameID() 401 attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT 402 attributeQuery.subject.nameID.value = _cfg['subject'] 403 xsStringNs = SAMLConstants.XSD_NS+"#"+\ 404 XSStringAttributeValue.TYPE_LOCAL_NAME 405 406 siteAAttribute = Attribute() 407 siteAAttribute.name = _cfg['siteAttributeName'] 408 siteAAttribute.nameFormat = xsStringNs 409 410 attributeQuery.attributes.append(siteAAttribute) 411 412 binding = SamlSoapBinding() 413 response = binding.send(attributeQuery, _cfg['uri']) 414 415 samlResponseElem = ResponseElementTree.toXML(response) 416 417 print("SAML Response ...") 418 print(ElementTree.tostring(samlResponseElem)) 419 print("Pretty print SAML Response ...") 420 print(prettyPrint(samlResponseElem)) 421 422 self.assert_( 423 response.status.statusCode.value==StatusCode.REQUEST_DENIED_URI) 424 425 def test03SAMLAttributeQueryUnknownSubject(self): 426 _cfg = self.cfg['test03SAMLAttributeQueryUnknownSubject'] 427 428 attributeQuery = AttributeQuery() 429 attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 430 attributeQuery.id = str(uuid4()) 431 attributeQuery.issueInstant = datetime.utcnow() 432 433 attributeQuery.issuer = Issuer() 434 attributeQuery.issuer.format = Issuer.X509_SUBJECT 435 attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A" 436 437 attributeQuery.subject = Subject() 438 attributeQuery.subject.nameID = NameID() 439 attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT 440 attributeQuery.subject.nameID.value = _cfg['subject'] 441 xsStringNs = SAMLConstants.XSD_NS+"#"+\ 442 XSStringAttributeValue.TYPE_LOCAL_NAME 443 444 siteAAttribute = Attribute() 445 siteAAttribute.name = _cfg['siteAttributeName'] 446 siteAAttribute.nameFormat = xsStringNs 447 448 attributeQuery.attributes.append(siteAAttribute) 449 450 binding = SamlSoapBinding() 451 response = binding.send(attributeQuery, _cfg['uri']) 452 453 samlResponseElem = ResponseElementTree.toXML(response) 454 print("SAML Response ...") 455 print(ElementTree.tostring(samlResponseElem)) 456 print("Pretty print SAML Response ...") 457 print(prettyPrint(samlResponseElem)) 458 459 self.assert_( 460 response.status.statusCode.value==StatusCode.UNKNOWN_PRINCIPAL_URI) 461 462 def test04SAMLAttributeQueryInvalidAttrName(self): 463 _cfg = self.cfg['test04SAMLAttributeQueryInvalidAttrName'] 464 465 attributeQuery = AttributeQuery() 466 attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 467 attributeQuery.id = str(uuid4()) 468 attributeQuery.issueInstant = datetime.utcnow() 469 470 attributeQuery.issuer = Issuer() 471 attributeQuery.issuer.format = Issuer.X509_SUBJECT 472 attributeQuery.issuer.value = "/CN=Authorisation Service/O=Site A" 473 474 attributeQuery.subject = Subject() 475 attributeQuery.subject.nameID = NameID() 476 attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT 477 attributeQuery.subject.nameID.value = _cfg['subject'] 478 xsStringNs = SAMLConstants.XSD_NS+"#"+\ 479 XSStringAttributeValue.TYPE_LOCAL_NAME 480 481 invalidAttribute = Attribute() 482 invalidAttribute.name = "myInvalidAttributeName" 483 invalidAttribute.nameFormat = xsStringNs 484 485 attributeQuery.attributes.append(invalidAttribute) 486 487 binding = SamlSoapBinding() 488 response = binding.send(attributeQuery, _cfg['uri']) 489 490 samlResponseElem = ResponseElementTree.toXML(response) 491 492 print("SAML Response ...") 493 print(ElementTree.tostring(samlResponseElem)) 494 print("Pretty print SAML Response ...") 495 print(prettyPrint(samlResponseElem)) 496 497 self.assert_(response.status.statusCode.value==\ 498 StatusCode.INVALID_ATTR_NAME_VALUE_URI) 499 500 def test05AttributeQuerySOAPBindingInterface(self): 501 _cfg = self.cfg['test05AttributeQuerySOAPBindingInterface'] 502 503 binding = AttributeQuerySOAPBinding() 504 505 binding.subjectID = AttributeAuthoritySAMLInterfaceTestCase.OPENID_URI 506 binding.issuerDN = \ 507 AttributeAuthoritySAMLInterfaceTestCase.VALID_REQUESTOR_IDS[0] 508 509 binding.queryAttributes = EsgDefaultQueryAttributes.ATTRIBUTES 510 511 response = binding.send(uri=_cfg['uri']) 512 samlResponseElem = ResponseElementTree.toXML(response) 513 514 print("SAML Response ...") 515 print(ElementTree.tostring(samlResponseElem)) 516 print("Pretty print SAML Response ...") 517 print(prettyPrint(samlResponseElem)) 518 519 self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI) 520 521 def test06AttributeQueryFromConfig(self): 522 thisSection = 'test06AttributeQueryFromConfig' 523 _cfg = self.cfg[thisSection] 524 525 binding = AttributeQuerySOAPBinding.fromConfig(self.cfgFilePath, 526 section=thisSection, 527 prefix='attributeQuery.') 528 binding.subjectID = _cfg['subject'] 529 response = binding.send(uri=_cfg['uri']) 530 samlResponseElem = ResponseElementTree.toXML(response) 531 532 print("SAML Response ...") 533 print(ElementTree.tostring(samlResponseElem)) 534 print("Pretty print SAML Response ...") 535 print(prettyPrint(samlResponseElem)) 536 537 self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI) 538 539 def test07AttributeQuerySslSOAPBindingInterface(self): 540 thisSection = 'test07AttributeQuerySslSOAPBindingInterface' 541 _cfg = self.cfg[thisSection] 542 543 binding = AttributeQuerySslSOAPBinding.fromConfig(self.cfgFilePath, 544 section=thisSection, 545 prefix='attributeQuery.') 546 547 binding.subjectID = _cfg['subject'] 548 response = binding.send(uri=_cfg['uri']) 549 samlResponseElem = ResponseElementTree.toXML(response) 550 551 print("SAML Response ...") 552 print(ElementTree.tostring(samlResponseElem)) 553 print("Pretty print SAML Response ...") 554 print(prettyPrint(samlResponseElem)) 555 556 self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI) 256 257 self.assert_(attCert) 258 557 259 558 260 if __name__ == "__main__":
Note: See TracChangeset
for help on using the changeset viewer.