Changeset 6062
- Timestamp:
- 27/11/09 11:34:30 (11 years ago)
- Location:
- TI12-security/trunk/python
- Files:
-
- 24 edited
Legend:
- Unmodified
- Added
- Removed
-
TI12-security/trunk/python/ndg_security_common/ndg/security/common/attributeauthority.py
r5436 r6062 22 22 import logging 23 23 log = logging.getLogger(__name__) 24 25 import traceback 24 26 25 27 # Determine https http transport … … 305 307 306 308 except Exception, e: 309 log.error("AttributeAuthority.getHostInfo: %s", 310 traceback.format_exc()) 311 307 312 # Try to detect exception type from SOAP fault message 308 313 errMsg = str(e) … … 349 354 350 355 except Exception, e: 356 log.error("AttributeAuthority.getTrustedHostInfo: %s", 357 traceback.format_exc()) 358 351 359 # Try to detect exception type from SOAP fault message 352 360 errMsg = str(e) … … 395 403 396 404 except Exception, e: 405 log.error("AttributeAuthority.getAllHostsInfo: %s", 406 traceback.format_exc()) 407 397 408 # Try to detect exception type from SOAP fault message 398 409 errMsg = str(e) … … 469 480 470 481 except Exception, e: 482 log.error("AttributeAuthority.getAttCert: %s", 483 traceback.format_exc()) 484 471 485 # Try to detect exception type from SOAP fault message 472 486 errMsg = str(e) -
TI12-security/trunk/python/ndg_security_common/ndg/security/common/authz/msi.py
r6043 r6062 14 14 log = logging.getLogger(__name__) 15 15 16 import traceback 16 17 import warnings 17 18 from elementtree import ElementTree … … 336 337 else: 337 338 raise AttributeParseError("Invalid Attribute element name: %s" % 338 localName)339 localName) 339 340 340 341 @classmethod … … 351 352 namespaces = () 352 353 def __init__(self, **attributes): 353 invalidAttributes = [attr for attr in attributes \354 invalidAttributes = [attr for attr in attributes 354 355 if attr not in self.__class__.namespaces] 355 356 if len(invalidAttributes) > 0: … … 376 377 377 378 dict.update(self, d, **kw) 379 378 380 379 381 class Subject(_AttrDict): … … 387 389 (USERID_NS, SESSIONID_NS, SESSIONMANAGERURI_NS, ROLES_NS) = namespaces 388 390 391 389 392 class Resource(_AttrDict): 390 393 '''Resource designator''' … … 393 396 ) 394 397 (URI_NS,) = namespaces 398 395 399 396 400 class Request(object): … … 400 404 self.resource = resource 401 405 406 def _getSubject(self): 407 return self.__subject 408 409 def _setSubject(self, subject): 410 if not isinstance(subject, Subject,): 411 raise TypeError("Expecting %s type for Request subject; got %r" % 412 (Subject.__class__.__name__, subject)) 413 self.__subject = subject 414 415 subject = property(fget=_getSubject, 416 fset=_setSubject, 417 doc="Subject type object representing subject accessing " 418 "a resource") 419 420 def _getResource(self): 421 return self.__resource 422 423 def _setResource(self, resource): 424 if not isinstance(resource, Resource): 425 raise TypeError("Expecting %s for Request Resource; got %r" % 426 (Resource.__class__.__name__, resource)) 427 self.__resource = resource 428 429 resource = property(fget=_getResource, 430 fset=_setResource, 431 doc="Resource to be protected") 432 433 402 434 class Response(object): 403 435 '''Response from a PDP''' 404 436 decisionValues = range(4) 405 437 (DECISION_PERMIT, 406 DECISION_DENY,407 DECISION_INDETERMINATE,408 DECISION_NOT_APPLICABLE) = decisionValues438 DECISION_DENY, 439 DECISION_INDETERMINATE, 440 DECISION_NOT_APPLICABLE) = decisionValues 409 441 410 442 # string versions of the 4 Decision types used for encoding … … 414 446 415 447 def __init__(self, status, message=None): 448 self.__status = None 449 self.__message = None 416 450 417 451 self.status = status … … 422 456 raise TypeError("Status %s not recognised" % status) 423 457 424 self._ status = status458 self.__status = status 425 459 426 460 def _getStatus(self): 427 return getattr(self, '_status', Response.DECISION_INDETERMINATE)461 return self.__status 428 462 429 463 status = property(fget=_getStatus, 430 464 fset=_setStatus, 431 465 doc="Integer response code; one of %r" % decisionValues) 466 467 def _setMessage(self, message): 468 if not isinstance(message, (basestring, type(None))): 469 raise TypeError('Expecting string or None type for "message"; got ' 470 '%r' % type(message)) 471 472 self.__message = message 473 474 def _getMessage(self): 475 return self.__message 476 477 message = property(fget=_getMessage, 478 fset=_setMessage, 479 doc="Optional message associated with response") 480 432 481 433 482 from ndg.security.common.AttCert import (AttCertInvalidSignature, … … 745 794 sslCACertFilePathList=self.sslCACertFilePathList, 746 795 cfg=self.wssecurityCfg) 747 except Exception, e: 748 log.error("Creating Attribute Authority client: %s" % e) 796 except Exception: 797 log.error("Creating Attribute Authority client: %s", 798 traceback.format_exc()) 749 799 raise InitSessionCtxError() 750 800 … … 755 805 756 806 757 except AA_AttributeRequestDenied, e: 758 log.error("Request for attribute certificate denied: %s" % e) 807 except AA_AttributeRequestDenied: 808 log.error("Request for attribute certificate denied: %s", 809 traceback.format_exc()) 759 810 raise PDPUserAccessDenied() 760 811 … … 765 816 log.error("Request to Attribute Authority [%s] for attribute " 766 817 "certificate: %s: %s", attributeAuthorityURI, 767 e.__class__, e)818 e.__class__, traceback.format_exc()) 768 819 raise AttributeCertificateRequestError() 769 820 … … 815 866 def evaluate(self, request): 816 867 '''Make access control decision''' 868 869 if not isinstance(request, Request): 870 raise TypeError("Expecting %s type for request; got %r" % 871 (Request.__class__.__name__, request)) 817 872 818 873 # Look for matching targets to the given resource … … 853 908 continue 854 909 855 attributeQuery[PIPAttributeQuery.ATTRIBUTEAUTHORITY_NS] = \ 856 attribute.attributeAuthorityURI 910 attributeQuery[ 911 PIPAttributeQuery.ATTRIBUTEAUTHORITY_NS 912 ] = attribute.attributeAuthorityURI 857 913 858 914 # Exit from function returning indeterminate status if a … … 864 920 # i.e. a defined exception within the scope of this 865 921 # module 866 log.exception(e) 922 log.error("SAML Attribute Query %s: %s", 923 type(e), traceback.format_exc()) 867 924 return Response(Response.DECISION_INDETERMINATE, 868 message= str(e))925 message=traceback.format_exc()) 869 926 870 927 except Exception, e: 871 log.exception(e) 928 log.error("SAML Attribute Query: %s", 929 type(e), traceback.format_exc()) 872 930 return Response(Response.DECISION_INDETERMINATE, 873 931 message="An internal error occurred") -
TI12-security/trunk/python/ndg_security_common/ndg/security/common/credentialwallet.py
r6059 r6062 1244 1244 1245 1245 if bUpdateCred: 1246 1247 1246 thisCredential = CredentialContainer(AttCert) 1248 1247 thisCredential.credential = attCert -
TI12-security/trunk/python/ndg_security_common/ndg/security/common/saml_utils/bindings.py
r6052 r6062 236 236 """ 237 237 SUBJECT_ID_OPTNAME = 'subjectID' 238 ISSUER_ DN_OPTNAME = 'issuerDN'238 ISSUER_NAME_OPTNAME = 'issuerName' 239 239 CLOCK_SKEW_OPTNAME = 'clockSkew' 240 240 241 241 CONFIG_FILE_OPTNAMES = ( 242 242 SUBJECT_ID_OPTNAME, 243 ISSUER_ DN_OPTNAME,243 ISSUER_NAME_OPTNAME, 244 244 CLOCK_SKEW_OPTNAME 245 245 ) … … 259 259 def __init__(self, **kw): 260 260 '''Create SOAP Client for SAML Attribute Query''' 261 self.__issuer DN= None261 self.__issuerName = None 262 262 self.__queryAttributes = TypedList(Attribute) 263 263 self.__clockSkew = timedelta(seconds=0.) … … 366 366 "Attribute Authority") 367 367 368 def _getIssuerDN(self): 369 return self.__issuerDN 370 371 def _setIssuerDN(self, value): 372 if isinstance(value, basestring): 373 self.__issuerDN = X500DN.fromString(value) 374 375 elif isinstance(value, X500DN): 376 self.__issuerDN = value 377 else: 378 raise TypeError('Expecting string or X500DN type for "issuerDN"; ' 368 def _getIssuerName(self): 369 return self.__issuerName 370 371 def _setIssuerName(self, value): 372 if not isinstance(value, basestring): 373 raise TypeError('Expecting string type for "issuerName"; ' 379 374 'got %r instead' % type(value)) 380 381 issuerDN = property(_getIssuerDN, _setIssuerDN, 375 376 self.__issuerName = value 377 378 issuerName = property(_getIssuerName, _setIssuerName, 382 379 doc="Distinguished Name of issuer of SAML Attribute " 383 380 "Query to Attribute Authority") … … 408 405 attributeQuery.issueInstant = datetime.utcnow() 409 406 410 if self.issuer DNis None:407 if self.issuerName is None: 411 408 raise AttributeError('No issuer DN has been set for SAML Attribute ' 412 409 'Query') … … 414 411 attributeQuery.issuer = Issuer() 415 412 attributeQuery.issuer.format = Issuer.X509_SUBJECT 416 attributeQuery.issuer.value = s tr(self.issuerDN)413 attributeQuery.issuer.value = self.issuerName 417 414 418 415 attributeQuery.subject = Subject() -
TI12-security/trunk/python/ndg_security_saml/saml/saml2/core.py
r5982 r6062 1078 1078 TYPE_LOCAL_NAME, 1079 1079 SAMLConstants.XSD_PREFIX) 1080 1081 DEFAULT_FORMAT = "%s#%s" % (SAMLConstants.XSD_NS, TYPE_LOCAL_NAME) 1080 1082 1081 1083 def __init__(self): -
TI12-security/trunk/python/ndg_security_saml/saml/xml/__init__.py
r5663 r6062 36 36 37 37 # Configuration namespace 38 XMLTOOLING_CONFIG_NS = "http: #www.opensaml.org/xmltooling-config"38 XMLTOOLING_CONFIG_NS = "http://www.opensaml.org/xmltooling-config" 39 39 40 40 # Configuration namespace prefix … … 48 48 49 49 # XML core namespace 50 XML_NS = "http: #www.w3.org/XML/1998/namespace"50 XML_NS = "http://www.w3.org/XML/1998/namespace" 51 51 52 52 # XML core prefix for xml attributes -
TI12-security/trunk/python/ndg_security_saml/saml/xml/etree.py
r6034 r6062 51 51 from saml.utils import SAMLDateTime 52 52 53 from ndg.security.common.saml_utils.esg import XSGroupRoleAttributeValue54 55 53 56 54 # Generic ElementTree Helper classes -
TI12-security/trunk/python/ndg_security_server/ndg/security/server/attributeauthority.py
r6040 r6062 1564 1564 1565 1565 1566 class AttributeInterfaceConfigError( Exception):1566 class AttributeInterfaceConfigError(AttributeInterfaceError): 1567 1567 """Invalid configuration set for Attribute interface""" 1568 1568 1569 1569 1570 class AttributeInterfaceRetrieveError( Exception):1570 class AttributeInterfaceRetrieveError(AttributeInterfaceError): 1571 1571 """Error retrieving attributes for Attribute interface class""" 1572 1572 … … 1590 1590 class InvalidUserId(AttributeInterfaceError): 1591 1591 """User Id passed to getAttributes is invalid""" 1592 1593 1594 class InvalidAttributeFormat(AttributeInterfaceError): 1595 """Format for Attribute requested is invalid or not supported""" 1592 1596 1593 1597 … … 1921 1925 1922 1926 __slots__ = ( 1927 ISSUER_NAME_OPTNAME, 1923 1928 CONNECTION_STRING_OPTNAME, 1924 1929 ATTRIBUTE_SQLQUERY_OPTNAME, … … 2266 2271 # TODO: check name format requested - only XSString is currently 2267 2272 # supported 2273 if (requestedAttribute.nameFormat != 2274 XSStringAttributeValue.DEFAULT_FORMAT): 2275 raise InvalidAttributeFormat('Requested attribute type %r but ' 2276 'only %r type is supported' % 2277 (requestedAttribute.nameFormat, 2278 XSStringAttributeValue.DEFAULT_FORMAT)) 2279 2268 2280 attribute.nameFormat = requestedAttribute.nameFormat 2269 2281 … … 2352 2364 queryTmpl = self.samlAttribute2SqlQuery.get(attributeName) 2353 2365 if queryTmpl is None: 2354 # TODO: how to handle this error?2355 raise Exception()2366 raise AttributeInterfaceConfigError('No SQL query set for attribute ' 2367 '%r' % attributeName) 2356 2368 2357 2369 try: … … 2362 2374 2363 2375 except KeyError, e: 2364 raise AttributeInterfaceConfigError("Invalid key for SAML "2376 raise AttributeInterfaceConfigError("Invalid key %s for SAML " 2365 2377 "attribute query string. The valid key is %r" % 2366 SQLAlchemyAttributeInterface.SQLQUERY_USERID_KEYNAME)2367 2378 (e, 2379 SQLAlchemyAttributeInterface.SQLQUERY_USERID_KEYNAME)) 2368 2380 2369 2381 log.debug('Checking for SAML attributes with SQL Query = "%s"', query) … … 2384 2396 except (IndexError, TypeError): 2385 2397 raise AttributeInterfaceRetrieveError("Error with result set: " 2386 "%s" % 2387 traceback.format_exc()) 2398 "%s" % traceback.format_exc()) 2388 2399 2389 2400 log.debug('Database results for SAML Attribute query user=%r ' … … 2394 2405 def __getstate__(self): 2395 2406 '''Explicit pickling required with __slots__''' 2396 return dict([(attrName, getattr(self, attrName)) \2407 return dict([(attrName, getattr(self, attrName)) 2397 2408 for attrName in SQLAlchemyAttributeInterface.__slots__]) 2398 2409 -
TI12-security/trunk/python/ndg_security_server/ndg/security/server/wsgi/authz.py
r6059 r6062 18 18 19 19 from ndg.security.common.utils.classfactory import importClass 20 from ndg.security.common.X509 import X50 0DN21 from ndg.security.common. utils.m2crypto import SSLContextProxy20 from ndg.security.common.X509 import X509Cert 21 from ndg.security.common.saml_utils.bindings import AttributeQuerySslSOAPBinding 22 22 23 23 from ndg.security.common.credentialwallet import (NDGCredentialWallet, … … 436 436 # Check for existing credentials cached in wallet 437 437 credentialItem = credentialWallet.credentialsKeyedByURI.get( 438 attributeAuthorityURI, {}) 439 440 attrCert = credentialItem.credential 441 if attrCert is not None: 438 attributeAuthorityURI) 439 if credentialItem is not None: 442 440 log.debug("NdgPIPMiddleware._getAttributeCertificate: retrieved " 443 441 "existing Attribute Certificate cached in Credential " … … 447 445 # Existing cached credential found - skip call to remote Session 448 446 # Manager / Attribute Authority and return this certificate instead 449 return attrCert447 return credentialItem.credential 450 448 else: 451 449 attrCert = PIP._getAttributeCertificate(self, … … 491 489 SessionHandlerMiddleware.CREDENTIAL_WALLET_SESSION_KEYNAME 492 490 USERNAME_SESSION_KEYNAME = \ 493 SessionHandlerMiddleware.USERNAME_SESSION_KEYNAME 491 SessionHandlerMiddleware.USERNAME_SESSION_KEYNAME 492 493 ATTRIBUTE_QUERY_ATTRNAME = 'attributeQuery' 494 LEN_ATTRIBUTE_QUERY_ATTRNAME = len(ATTRIBUTE_QUERY_ATTRNAME) 494 495 495 496 def __init__(self, app, global_conf, prefix='', **local_conf): … … 506 507 ''' 507 508 self.session = None 508 509 # Hold SSL Attribute Authority connection settings in SSL Context Proxy 510 # object 511 self.__sslCtxProxy = SSLContextProxy() 509 self.__attributeQueryBinding = AttributeQuerySslSOAPBinding() 512 510 513 511 nameOffset = len(prefix) … … 516 514 val = local_conf.pop(k) 517 515 name = k[nameOffset:] 518 setattr(self.__sslCtxProxy, name, val) 516 setattr(self, name, val) 517 518 if not self.__attributeQueryBinding.issuerName: 519 issuerX509Cert = X509Cert.Read( 520 self.__attributeQueryBinding.sslCtxProxy.sslCertFilePath) 521 self.__attributeQueryBinding.issuerName = str(issuerX509Cert.dn) 519 522 520 523 NDGSecurityMiddlewareBase.__init__(self, app, {}) 521 524 525 def __setattr__(self, name, value): 526 """Enable setting of AttributeQuerySslSOAPBinding attributes from 527 names starting with attributeQuery.* / attributeQuery_*. Addition for 528 setting these values from ini file 529 """ 530 531 # Coerce into setting AttributeQuerySslSOAPBinding attributes - 532 # names must start with 'attributeQuery\W' e.g. 533 # attributeQuery.clockSkew or attributeQuery_issuerDN 534 if name.startswith(SamlPIPMiddleware.ATTRIBUTE_QUERY_ATTRNAME): 535 setattr(self.__attributeQueryBinding, 536 name[SamlPIPMiddleware.LEN_ATTRIBUTE_QUERY_ATTRNAME+1:], 537 value) 538 else: 539 super(SamlPIPMiddleware, self).__setattr__(name, value) 540 522 541 @property 523 def sslCtxProxy(self): 524 """SSL Context Proxy object used for setting up an SSL Context for 525 queries to the Attribute Authority 526 """ 527 return self.__sslCtxProxy 528 529 542 def attributeQueryBinding(self): 543 """SAML SOAP Attribute Query client binding object""" 544 return self.__attributeQueryBinding 545 530 546 def __call__(self, environ, start_response): 531 547 """Take a copy of the session object so that it is in scope for … … 558 574 @return: response containing the attributes retrieved from the 559 575 Attribute Authority""" 560 576 if not isinstance(attributeQuery, PIPAttributeQuery): 577 raise TypeError('Expecting %r type for input "attributeQuery"; ' 578 'got %r' % (AttributeQuery, type(attributeQuery))) 579 561 580 attributeAuthorityURI = attributeQuery[ 562 PIPAttributeQuery.ATTRIBUTEAUTHORITY_NS]581 PIPAttributeQuery.ATTRIBUTEAUTHORITY_NS] 563 582 564 583 log.debug("SamlPIPMiddleware: received attribute query: %r", … … 580 599 credentialWallet = SAMLCredentialWallet() 581 600 credentialWallet.userId = self.session[usernameKeyName] 582 credentialWallet.sslCtxProxy.copy(self.sslCtxProxy)583 601 584 602 self.session[credentialWalletKeyName] = credentialWallet … … 592 610 attributeAuthorityURI) 593 611 if credentialItem is None: 594 # No assertion is cached - make a fresh query 595 credentialWallet.attributeAuthorityURI = attributeAuthorityURI 596 credentialWallet.attributeQuery() 597 credentialItem = credentialWallet.credentialsKeyedByURI.get( 598 attributeAuthorityURI) 612 # No assertion is cached - make a fresh SAML Attribute Query 613 self.attributeQueryBinding.subjectID = credentialWallet.userId 614 response = self.attributeQueryBinding.send( 615 uri=attributeAuthorityURI) 616 for assertion in response.assertions: 617 credentialWallet.addCredential(assertion) 599 618 600 619 log.debug("SamlPIPMiddleware.attributeQuery: updating Credential " 601 620 "Wallet with retrieved SAML Attribute Assertion " 602 "for user session [%s]", 603 self.session[usernameKeyName]) 621 "for user session [%s]", self.session[usernameKeyName]) 604 622 else: 605 623 log.debug("SamlPIPMiddleware.attributeQuery: retrieved existing " 606 624 "SAML Attribute Assertion cached in Credential Wallet " 607 "for user session [%s]", 608 self.session[usernameKeyName]) 625 "for user session [%s]", self.session[usernameKeyName]) 609 626 610 627 attributeResponse = PIPAttributeResponse() 611 attributeResponse[Subject.ROLES_NS] = [ 612 attribute.value 613 for attribute in credentialItem.credential.attributes 614 ] 628 attributeResponse[Subject.ROLES_NS] = [] 629 630 # Unpack assertion attribute values and add to the response object 631 for credentialItem in credentialWallet.credentials.values(): 632 for statement in credentialItem.credential.attributeStatements: 633 for attribute in statement.attributes: 634 attributeResponse[Subject.ROLES_NS] += [ 635 attributeValue.value 636 for attributeValue in attribute.attributeValues 637 if attributeValue.value not in attributeResponse[ 638 Subject.ROLES_NS] 639 ] 615 640 616 641 log.debug("SamlPIPMiddleware.attributeQuery response: %r", -
TI12-security/trunk/python/ndg_security_server/ndg/security/server/wsgi/openid/provider/axinterface/sqlalchemy_ax.py
r6034 r6062 26 26 making use of the SQLAlchemy database package''' 27 27 28 IDENTITY_URI_SESSION_KEYNAME = \ 29 OpenIDProviderMiddleware.IDENTITY_URI_SESSION_KEYNAME 30 USERNAME_SESSION_KEYNAME = \ 31 OpenIDProviderMiddleware.USERNAME_SESSION_KEYNAME 28 USERNAME_SESSION_KEYNAME = OpenIDProviderMiddleware.USERNAME_SESSION_KEYNAME 32 29 33 30 CONNECTION_STRING_OPTNAME = 'connectionString' … … 147 144 raise AXInterfaceConfigError("No username set in session context") 148 145 149 identityURI = authnCtx.get(150 SQLAlchemyAXInterface.IDENTITY_URI_SESSION_KEYNAME)151 if identityURI is None:152 raise AXInterfaceConfigError("No OpenID user identifier set in "153 "session context")154 155 146 requiredAttributeURIs = ax_req.getRequiredAttrs() 156 147 -
TI12-security/trunk/python/ndg_security_server/ndg/security/server/wsgi/openid/relyingparty/validation.py
r5828 r6062 23 23 from openid.yadis.manager import Discovery 24 24 25 from ndg.security.common.X509 import X509Cert 25 26 from ndg.security.common.utils.etree import QName 26 27 from ndg.security.common.utils.classfactory import instantiateClass … … 346 347 by maintaining its own error storage managed by verify_callback. 347 348 ''' 348 x509Cert = x509StoreCtx.get_current_cert() 349 dnTxt = x509Cert.get_subject().as_text() 350 commonName = X500DN(dn=dnTxt)['CN'] 349 x509Cert = X509Cert.fromM2Crypto(x509StoreCtx.get_current_cert()) 350 commonName = x509Cert.dn['CN'] 351 351 352 352 # If all is OK preVerifyOK will be 1. Return this to the caller to -
TI12-security/trunk/python/ndg_security_test/ndg/security/test/config/attributeauthority/sitea/siteAUserRoles.py
r6052 r6062 84 84 VALID_REQUESTOR_IDS = ( 85 85 str(X500DN.fromString("/O=Site A/CN=Authorisation Service")), 86 str(X500DN.fromString("/O=Site B/CN=Authorisation Service")) 86 str(X500DN.fromString("/O=Site B/CN=Authorisation Service")), 87 str(X500DN.fromString('/CN=test/O=NDG/OU=BADC')) 87 88 ) 88 89 -
TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_attributeauthority.py
r6034 r6062 29 29 from ndg.security.server.attributeauthority import (AttributeAuthority, 30 30 AttributeAuthorityNoMatchingRoleInTrustedHosts, 31 SQLAlchemyAttributeInterface )31 SQLAlchemyAttributeInterface, InvalidAttributeFormat) 32 32 33 33 from ndg.security.common.AttCert import AttCert … … 252 252 253 253 class SQLAlchemyAttributeInterfaceTestCase(BaseTestCase): 254 SAML_SUBJECT_SQLQUERY = ("select count(*) from users where '%s' || " 255 "openid_identifier = '${userId}'" % 256 BaseTestCase.OPENID_URI_STEM) 257 258 SAML_FIRSTNAME_SQLQUERY = ("select firstname from users where '%s' || " 259 "openid_identifier = '${userId}'" % 260 BaseTestCase.OPENID_URI_STEM) 261 262 SAML_LASTNAME_SQLQUERY = ("select lastname from users where '%s' || " 263 "openid_identifier = '${userId}'" % 264 BaseTestCase.OPENID_URI_STEM) 265 266 SAML_EMAILADDRESS_SQLQUERY = ("select emailaddress from users where '%s' ||" 267 " openid_identifier = '${userId}'" % 268 BaseTestCase.OPENID_URI_STEM) 254 SAML_SUBJECT_SQLQUERY = ("select count(*) from users where openid = " 255 "'${userId}'") 256 257 SAML_FIRSTNAME_SQLQUERY = ("select firstname from users where openid = " 258 "'${userId}'") 259 260 SAML_LASTNAME_SQLQUERY = ("select lastname from users where openid = " 261 "'${userId}'") 262 263 SAML_EMAILADDRESS_SQLQUERY = ("select emailaddress from users where " 264 "openid = '${userId}'") 269 265 270 266 SAML_ATTRIBUTES_SQLQUERY = ("select attributename from attributes, users " 271 "where '%s' || users.openid_identifier = " 272 "'${userId}' and attributes.username = " 273 "users.username" % BaseTestCase.OPENID_URI_STEM) 274 # 275 # SAML_ATTRIBUTES_SQLQUERY = ("select attributename from attributes " 276 # "where username = 'pjk'") 267 "where users.openid = '${userId}' and " 268 "attributes.username = users.username") 277 269 278 270 def __init__(self, *arg, **kw): … … 403 395 attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT 404 396 attributeQuery.subject.nameID.value = \ 405 "https://openid.localhost/philip.kershaw"397 SQLAlchemyAttributeInterfaceTestCase.OPENID_URI 406 398 407 399 fnAttribute = Attribute() 408 400 fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME 409 fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"401 fnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT 410 402 fnAttribute.friendlyName = "FirstName" 411 403 … … 414 406 lnAttribute = Attribute() 415 407 lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME 416 lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string"408 lnAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT 417 409 lnAttribute.friendlyName = "LastName" 418 410 … … 421 413 emailAddressAttribute = Attribute() 422 414 emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME 423 emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 424 XSStringAttributeValue.TYPE_LOCAL_NAME 425 emailAddressAttribute.friendlyName = "emailAddress" 415 emailAddressAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT 416 emailAddressAttribute.friendlyName = "EmailAddress" 426 417 427 418 attributeQuery.attributes.append(emailAddressAttribute) … … 430 421 authzAttribute.name = \ 431 422 SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0] 432 authzAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 433 XSStringAttributeValue.TYPE_LOCAL_NAME 423 authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT 434 424 authzAttribute.friendlyName = "authz" 435 425 … … 482 472 '/O=ESG/OU=NCAR/CN=Gateway') 483 473 484 attributeInterface.setProperties(samlAssertionLifetime=28800.) 474 attributeInterface.setProperties(samlAssertionLifetime=28800., 475 issuerName='/CN=Attribute Authority/O=Site A') 485 476 486 477 attributeInterface.samlSubjectSqlQuery = ( … … 504 495 SQLAlchemyAttributeInterfaceTestCase.N_ATTRIBUTE_VALUES) 505 496 497 def test04SamlAttributeQuery(self): 498 if self.skipTests: 499 return 500 501 # Prepare a client query 502 attributeQuery = AttributeQuery() 503 attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 504 attributeQuery.id = str(uuid4()) 505 attributeQuery.issueInstant = datetime.utcnow() 506 507 attributeQuery.issuer = Issuer() 508 attributeQuery.issuer.format = Issuer.X509_SUBJECT 509 attributeQuery.issuer.value = '/O=ESG/OU=NCAR/CN=Gateway' 510 511 512 attributeQuery.subject = Subject() 513 attributeQuery.subject.nameID = NameID() 514 attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT 515 attributeQuery.subject.nameID.value = \ 516 SQLAlchemyAttributeInterfaceTestCase.OPENID_URI 517 518 emailAddressAttribute = Attribute() 519 emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME 520 emailAddressAttribute.nameFormat = "InvalidFormat" 521 emailAddressAttribute.friendlyName = "EmailAddress" 522 523 attributeQuery.attributes.append(emailAddressAttribute) 524 525 authzAttribute = Attribute() 526 authzAttribute.name = \ 527 SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0] 528 authzAttribute.nameFormat = XSStringAttributeValue.DEFAULT_FORMAT 529 authzAttribute.friendlyName = "authz" 530 531 attributeQuery.attributes.append(authzAttribute) 532 533 # Add the response - the interface will populate with an assertion as 534 # appropriate 535 samlResponse = Response() 536 537 samlResponse.issueInstant = datetime.utcnow() 538 samlResponse.id = str(uuid4()) 539 samlResponse.issuer = Issuer() 540 541 # Initialise to success status but reset on error 542 samlResponse.status = Status() 543 samlResponse.status.statusCode = StatusCode() 544 samlResponse.status.statusMessage = StatusMessage() 545 samlResponse.status.statusCode.value = StatusCode.SUCCESS_URI 546 547 # Nb. SAML 2.0 spec says issuer format must be omitted 548 samlResponse.issuer.value = "CEDA" 549 550 samlResponse.inResponseTo = attributeQuery.id 551 552 # Set up the interface object 553 554 # Define queries for SAML attribute names 555 samlAttribute2SqlQuery = { 556 EsgSamlNamespaces.FIRSTNAME_ATTRNAME: 557 SQLAlchemyAttributeInterfaceTestCase.SAML_FIRSTNAME_SQLQUERY, 558 559 EsgSamlNamespaces.LASTNAME_ATTRNAME: 560 SQLAlchemyAttributeInterfaceTestCase.SAML_LASTNAME_SQLQUERY, 561 562 EsgSamlNamespaces.EMAILADDRESS_ATTRNAME: 563 SQLAlchemyAttributeInterfaceTestCase.SAML_EMAILADDRESS_SQLQUERY, 564 565 SQLAlchemyAttributeInterfaceTestCase.ATTRIBUTE_NAMES[0]: 566 SQLAlchemyAttributeInterfaceTestCase.SAML_ATTRIBUTES_SQLQUERY 567 } 568 569 attributeInterface = SQLAlchemyAttributeInterface( 570 samlAttribute2SqlQuery=samlAttribute2SqlQuery) 571 572 attributeInterface.connectionString = \ 573 SQLAlchemyAttributeInterfaceTestCase.DB_CONNECTION_STR 574 575 attributeInterface.samlValidRequestorDNs = ( 576 '/O=STFC/OU=CEDA/CN=AuthorisationService', 577 '/O=ESG/OU=NCAR/CN=Gateway') 578 579 attributeInterface.setProperties(samlAssertionLifetime=28800., 580 issuerName='/CN=Attribute Authority/O=Site A') 581 582 attributeInterface.samlSubjectSqlQuery = ( 583 SQLAlchemyAttributeInterfaceTestCase.SAML_SUBJECT_SQLQUERY) 584 585 # Make the query 586 try: 587 attributeInterface.getAttributes(attributeQuery, samlResponse) 588 except InvalidAttributeFormat: 589 print("PASSED: caught InvalidAttributeFormat exception") 590 else: 591 self.fail("Expecting InvalidAttributeFormat exception") 506 592 507 593 if __name__ == "__main__": -
TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthority/test_sqlalchemyattributeinterface.cfg
r6039 r6062 1 1 [DEFAULT] 2 openIdStem = https://openid.localhost/3 2 attributeInterface.issuerName = /O=Site A/CN=Attribute Authority 4 attributeInterface.samlSubjectSqlQuery = select count(*) from users where '%(openIdStem)s' || openid_identifier= '${userId}'5 attributeInterface.samlAttribute2SqlQuery.1 = "urn:esg:first:name" "select firstname from users where '%(openIdStem)s' || openid_identifier= '${userId}'"6 attributeInterface.samlAttribute2SqlQuery.lastName = "urn:esg:last:name" "select lastname from users where '%(openIdStem)s' || openid_identifier= '${userId}'"7 attributeInterface.samlAttribute2SqlQuery.emailAddress = "urn:esg:email:address" "select emailaddress from users where '%(openIdStem)s' || openid_identifier= '${userId}'"8 attributeInterface.samlAttribute2SqlQuery.4 = "urn:siteA:security:authz:1.0:attr" "select attributename from attributes where '%(openIdStem)s' || openid_identifier= '${userId}'"3 attributeInterface.samlSubjectSqlQuery = select count(*) from users where openid = '${userId}' 4 attributeInterface.samlAttribute2SqlQuery.1 = "urn:esg:first:name" "select firstname from users where openid = '${userId}'" 5 attributeInterface.samlAttribute2SqlQuery.lastName = "urn:esg:last:name" "select lastname from users where openid = '${userId}'" 6 attributeInterface.samlAttribute2SqlQuery.emailAddress = "urn:esg:email:address" "select emailaddress from users where openid = '${userId}'" 7 attributeInterface.samlAttribute2SqlQuery.4 = "urn:siteA:security:authz:1.0:attr" "select attributename from attributes where openid = '${userId}'" 9 8 attributeInterface.samlValidRequestorDNs = /O=Site A/CN=Authorisation Service,/O=Site A/CN=Attribute Authority, 10 9 /O=Site B/CN=Authorisation Service -
TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/attributeauthorityclient/attAuthorityClientTest.cfg
r6050 r6062 78 78 subject = https://openid.localhost/philip.kershaw 79 79 attributeQuery.clockSkew = 0. 80 attributeQuery.issuer DN= /O=Site A/CN=Authorisation Service80 attributeQuery.issuerName = /O=Site A/CN=Authorisation Service 81 81 attributeQuery.queryAttributes.0 = urn:esg:first:name, FirstName, http://www.w3.org/2001/XMLSchema#string 82 82 attributeQuery.queryAttributes.roles = urn:siteA:security:authz:1.0:attr, , http://www.w3.org/2001/XMLSchema#string … … 87 87 88 88 attributeQuery.clockSkew = 0. 89 attributeQuery.issuer DN= /O=Site A/CN=Authorisation Service89 attributeQuery.issuerName = /O=Site A/CN=Authorisation Service 90 90 attributeQuery.queryAttributes.0 = urn:esg:email:address, EmailAddress, http://www.w3.org/2001/XMLSchema#string 91 91 attributeQuery.queryAttributes.roles = urn:siteA:security:authz:1.0:attr, , http://www.w3.org/2001/XMLSchema#string -
TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/credentialwallet/test_credentialwallet.py
r6059 r6062 298 298 299 299 self.assert_(len(wallet.credentials) == 1) 300 sleep( 1)300 sleep(2) 301 301 wallet.audit() 302 302 self.assert_(len(wallet.credentials) == 0) -
TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/myproxy/certificate_extapp/config.ini
r6052 r6062 13 13 openIdSqlQuery = select openid from users where username = '${username}' 14 14 attributeAuthorityURI = https://localhost:5443/AttributeAuthority/saml 15 attributeQuery.issuer DN= /O=Site A/CN=Authorisation Service15 attributeQuery.issuerName = /O=Site A/CN=Authorisation Service 16 16 attributeQuery.clockSkew = 0 17 17 attributeQuery_queryAttributes.0 = urn:esg:email:address, EmailAddress, http://www.w3.org/2001/XMLSchema#string -
TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/openid/provider/axinterface/test_axinterface.py
r6009 r6062 34 34 SQLAlchemyAXInterfaceTestCase.DB_CONNECTION_STR 35 35 36 interface.sqlQuery = ("select firstname from users where '%s' || " 37 "openid_identifier = '${invalidUsernameKey}'" % 38 SQLAlchemyAXInterfaceTestCase.OPENID_URI_STEM) 36 interface.sqlQuery = ("select firstname from users where username = " 37 "'${invalidUsernameKey}'") 39 38 40 39 axReq = FetchRequest() … … 42 41 43 42 authnCtx = { 44 SQLAlchemyAXInterface. IDENTITY_URI_SESSION_KEYNAME:45 SQLAlchemyAXInterfaceTestCase. OPENID_URI43 SQLAlchemyAXInterface.USERNAME_SESSION_KEYNAME: 44 SQLAlchemyAXInterfaceTestCase.USERNAME 46 45 } 47 46 … … 62 61 63 62 interface.sqlQuery = ("select firstname, lastname, emailAddress from " 64 "users where '%s' || " 65 "openid_identifier = '${username}'" % 66 SQLAlchemyAXInterfaceTestCase.OPENID_URI_STEM) 63 "users where username = '${username}'") 67 64 68 65 axReq = FetchRequest() … … 74 71 75 72 authnCtx = { 76 SQLAlchemyAXInterface. IDENTITY_URI_SESSION_KEYNAME:77 SQLAlchemyAXInterfaceTestCase. OPENID_URI73 SQLAlchemyAXInterface.USERNAME_SESSION_KEYNAME: 74 SQLAlchemyAXInterfaceTestCase.USERNAME 78 75 } 79 76 -
TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/README
r5330 r6062 1 1 WSGI Authorization Middleware Unit Tests 2 2 ======================================== 3 These tests call ndg.security.server.wsgi.authz.AuthorizationMiddleware via 3 These tests call ndg.security.server.wsgi.authz.SAMLAuthorizationMiddleware 4 and ndg.security.server.wsgi.authz.NDGAuthorizationMiddleware via 4 5 paste.fixture. An attribute authority service needs to be running in order 5 6 for the middleware to check user attributes, In a separate terminal start -
TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/ndg-policy.xml
r6061 r6062 1 1 <?xml version="1.0" encoding="UTF-8"?> 2 <Policy PolicyId=" AuthZUnitTests" xmlns="urn:ndg:security:authz:1.0:policy">3 <Description>Restrict access for Authorization unit tests</Description>2 <Policy PolicyId="NDGAuthZUnitTests" xmlns="urn:ndg:security:authz:1.1:policy"> 3 <Description>Restrict access for NDG based Authorization unit tests</Description> 4 4 5 5 <Target> 6 6 <URIPattern>^/test_accessGrantedToSecuredURI$</URIPattern> 7 7 <Attributes> 8 <Attribute>urn:siteA:security:authz:1.0:attr:staff</Attribute> 8 <Attribute> 9 <Name>urn:siteA:security:authz:1.0:attr:staff</Name> 10 <!-- Endpoint is for SOAP/WSDL based NDG Interface --> 11 <AttributeAuthorityURI>http://localhost:5000/AttributeAuthority</AttributeAuthorityURI> 12 </Attribute> 9 13 </Attributes> 10 <AttributeAuthority>11 <uri>http://localhost:5000/AttributeAuthority</uri>12 </AttributeAuthority>13 14 </Target> 14 15 <Target> 15 16 <URIPattern>^/test_accessDeniedToSecuredURI$</URIPattern> 16 17 <Attributes> 17 <Attribute>urn:siteA:security:authz:1.0:attr:forbidden</Attribute> 18 <Attribute>urn:siteA:security:authz:1.0:attr:keepout</Attribute> 18 <Attribute> 19 <Name>urn:siteA:security:authz:1.0:attr:forbidden</Name> 20 <AttributeAuthorityURI>http://localhost:5000/AttributeAuthority</AttributeAuthorityURI> 21 </Attribute> 22 <Attribute> 23 <Name>urn:siteA:security:authz:1.0:attr:keepout</Name> 24 <AttributeAuthorityURI>http://localhost:5000/AttributeAuthority</AttributeAuthorityURI> 25 </Attribute> 19 26 </Attributes> 20 <AttributeAuthority>21 <uri>http://localhost:5000/AttributeAuthority</uri>22 </AttributeAuthority>23 27 </Target> 24 28 <Target> … … 29 33 <URIPattern>^/test_accessGrantedToSecuredURI\?admin=1$</URIPattern> 30 34 <Attributes> 31 <Attribute>urn:siteA:security:authz:1.0:attr:admin</Attribute> 35 <Attribute> 36 <Name>urn:siteA:security:authz:1.0:attr:admin</Name> 37 <AttributeAuthorityURI>http://localhost:5000/AttributeAuthority</AttributeAuthorityURI> 38 </Attribute> 32 39 </Attributes> 33 <AttributeAuthority>34 <uri>http://localhost:5000/AttributeAuthority</uri>35 </AttributeAuthority>36 40 </Target> 37 41 </Policy> -
TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/ndg-test.ini
r6061 r6062 19 19 20 20 [filter:AuthZFilter] 21 paste.filter_app_factory=ndg.security.server.wsgi.authz: SAMLAuthorizationMiddleware.filter_app_factory21 paste.filter_app_factory=ndg.security.server.wsgi.authz:NDGAuthorizationMiddleware.filter_app_factory 22 22 prefix = authz. 23 policy.filePath = %(here)s/ policy.xml23 policy.filePath = %(here)s/ndg-policy.xml 24 24 25 25 authz.pepResultHandler = ndg.security.test.unit.wsgi.authz.test_authz.RedirectFollowingAccessDenied 26 26 27 # Settings for Policy Information Point used by the Policy Decision Point to 28 # retrieve subject attributes from the Attribute Authority associated with the 29 # resource to be accessed 30 pip.sslCACertDir=%(testConfigDir)s/ca 31 pip.sslCertFilePath=%(testConfigDir)s/pki/test.crt 32 pip.sslPriKeyFilePath=%(testConfigDir)s/pki/test.key 27 # List of CA certificates used to verify the signatures of 28 # Attribute Certificates retrieved 29 pip.caCertFilePathList=%(testConfigDir)s/ca/ndg-test-ca.crt 30 31 # 32 # WS-Security Settings for call to Session Manager 33 34 # Signature of an outbound message 35 36 # Certificate associated with private key used to sign a message. The sign 37 # method will add this to the BinarySecurityToken element of the WSSE header. 38 # binSecTokValType attribute must be set to 'X509' or 'X509v3' ValueType. 39 # As an alternative, use signingCertChain - see below... 40 41 # PEM encode cert 42 pip.wssecurity.signingCertFilePath=%(testConfigDir)s/pki/wsse-server.crt 43 44 # PEM encoded private key file 45 pip.wssecurity.signingPriKeyFilePath=%(testConfigDir)s/pki/wsse-server.key 46 47 # Password protecting private key. Leave blank if there is no password. 48 pip.wssecurity.signingPriKeyPwd= 49 50 # For signature verification. Provide a space separated list of file paths 51 pip.wssecurity.caCertFilePathList=%(testConfigDir)s/ca/ndg-test-ca.crt 52 53 # ValueType for the BinarySecurityToken added to the WSSE header 54 pip.wssecurity.reqBinSecTokValType=X509v3 55 56 # Add a timestamp element to an outbound message 57 pip.wssecurity.addTimestamp=True -
TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/saml-policy.xml
r6061 r6062 1 1 <?xml version="1.0" encoding="UTF-8"?> 2 <Policy PolicyId=" AuthZUnitTests" xmlns="urn:ndg:security:authz:1.0:policy">3 <Description>Restrict access for Authorization unit tests</Description>2 <Policy PolicyId="SAMLAuthZUnitTests" xmlns="urn:ndg:security:authz:1.1:policy"> 3 <Description>Restrict access for SAML based Authorization unit tests</Description> 4 4 5 5 <Target> 6 6 <URIPattern>^/test_accessGrantedToSecuredURI$</URIPattern> 7 7 <Attributes> 8 <Attribute>urn:siteA:security:authz:1.0:attr:staff</Attribute> 8 <Attribute> 9 <Name>urn:siteA:security:authz:1.0:attr:staff</Name> 10 <!-- Endpoint is for SOAP/SAML based ESG Interface --> 11 <AttributeAuthorityURI>https://localhost:5443/AttributeAuthority/saml</AttributeAuthorityURI> 12 </Attribute> 9 13 </Attributes> 10 <AttributeAuthority>11 <uri>http://localhost:5000/AttributeAuthority</uri>12 </AttributeAuthority>13 14 </Target> 14 15 <Target> 15 16 <URIPattern>^/test_accessDeniedToSecuredURI$</URIPattern> 16 17 <Attributes> 17 <Attribute>urn:siteA:security:authz:1.0:attr:forbidden</Attribute> 18 <Attribute>urn:siteA:security:authz:1.0:attr:keepout</Attribute> 18 <Attribute> 19 <Name>urn:siteA:security:authz:1.0:attr:forbidden</Name> 20 <AttributeAuthorityURI>https://localhost:5443/AttributeAuthority/saml</AttributeAuthorityURI> 21 </Attribute> 22 <Attribute> 23 <Name>urn:siteA:security:authz:1.0:attr:keepout</Name> 24 <AttributeAuthorityURI>https://localhost:5443/AttributeAuthority/saml</AttributeAuthorityURI> 25 </Attribute> 19 26 </Attributes> 20 <AttributeAuthority>21 <uri>http://localhost:5000/AttributeAuthority</uri>22 </AttributeAuthority>23 27 </Target> 24 28 <Target> … … 29 33 <URIPattern>^/test_accessGrantedToSecuredURI\?admin=1$</URIPattern> 30 34 <Attributes> 31 <Attribute>urn:siteA:security:authz:1.0:attr:admin</Attribute> 35 <Attribute> 36 <Name>urn:siteA:security:authz:1.0:attr:admin</Name> 37 <AttributeAuthorityURI>https://localhost:5443/AttributeAuthority/saml</AttributeAuthorityURI> 38 </Attribute> 32 39 </Attributes> 33 <AttributeAuthority>34 <uri>http://localhost:5000/AttributeAuthority</uri>35 </AttributeAuthority>36 40 </Target> 37 41 </Policy> -
TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/saml-test.ini
r6061 r6062 21 21 paste.filter_app_factory=ndg.security.server.wsgi.authz:SAMLAuthorizationMiddleware.filter_app_factory 22 22 prefix = authz. 23 policy.filePath = %(here)s/ policy.xml23 policy.filePath = %(here)s/saml-policy.xml 24 24 25 25 authz.pepResultHandler = ndg.security.test.unit.wsgi.authz.test_authz.RedirectFollowingAccessDenied … … 28 28 # retrieve subject attributes from the Attribute Authority associated with the 29 29 # resource to be accessed 30 pip.sslCACertDir=%(testConfigDir)s/ca 31 pip.sslCertFilePath=%(testConfigDir)s/pki/test.crt 32 pip.sslPriKeyFilePath=%(testConfigDir)s/pki/test.key 30 31 # If omitted, DN of SSL Cert is used 32 pip.attributeQuery.issuerName = 33 pip.attributeQuery.clockSkew = 0. 34 pip.attributeQuery.queryAttributes.0 = urn:siteA:security:authz:1.0:attr, , http://www.w3.org/2001/XMLSchema#string 35 pip.attributeQuery.sslCACertDir=%(testConfigDir)s/ca 36 pip.attributeQuery.sslCertFilePath=%(testConfigDir)s/pki/test.crt 37 pip.attributeQuery.sslPriKeyFilePath=%(testConfigDir)s/pki/test.key -
TI12-security/trunk/python/ndg_security_test/ndg/security/test/unit/wsgi/authz/test_authz.py
r6060 r6062 24 24 import paste.fixture 25 25 from paste.deploy import loadapp 26 27 from ndg.security.test.unit import BaseTestCase 26 28 from ndg.security.server.wsgi import NDGSecurityMiddlewareBase 27 29 from ndg.security.server.wsgi.authz import (NdgPIPMiddlewareConfigError, … … 94 96 95 97 96 class NdgWSGIAuthZTestCase( unittest.TestCase):98 class NdgWSGIAuthZTestCase(BaseTestCase): 97 99 98 100 def __init__(self, *args, **kwargs): 101 BaseTestCase.__init__(self, *args, **kwargs) 102 99 103 here_dir = os.path.dirname(os.path.abspath(__file__)) 100 wsgiapp = loadapp('config: test.ini', relative_to=here_dir)104 wsgiapp = loadapp('config:ndg-test.ini', relative_to=here_dir) 101 105 self.app = paste.fixture.TestApp(wsgiapp) 102 103 unittest.TestCase.__init__(self, *args, **kwargs) 106 107 self.startSiteAAttributeAuthority() 108 104 109 105 110 … … 111 116 response = self.app.get('/test_200') 112 117 except NdgPIPMiddlewareConfigError, e: 113 print(" PASS- expected: %s exception: %s" % (e.__class__, e))118 print("ok - expected: %s exception: %s" % (e.__class__, e)) 114 119 115 120 def test02Ensure200WithNotLoggedInAndUnsecuredURI(self): … … 255 260 256 261 257 class SamlWSGIAuthZTestCase(unittest.TestCase): 258 259 def __init__(self, *args, **kwargs): 262 class SamlWSGIAuthZTestCase(BaseTestCase): 263 264 def __init__(self, *args, **kwargs): 265 BaseTestCase.__init__(self, *args, **kwargs) 266 260 267 here_dir = os.path.dirname(os.path.abspath(__file__)) 261 268 wsgiapp = loadapp('config:saml-test.ini', relative_to=here_dir) 262 269 self.app = paste.fixture.TestApp(wsgiapp) 263 264 unittest.TestCase.__init__(self, *args, **kwargs) 270 271 self.startSiteAAttributeAuthority(withSSL=True, 272 port=SamlWSGIAuthZTestCase.SITEA_SSL_ATTRIBUTEAUTHORITY_PORTNUM) 265 273 266 274 … … 272 280 response = self.app.get('/test_200') 273 281 except SamlPIPMiddlewareConfigError, e: 274 print(" PASS- expected: %s exception: %s" % (e.__class__, e))282 print("ok - expected: %s exception: %s" % (e.__class__, e)) 275 283 276 284 def test02Ensure200WithNotLoggedInAndUnsecuredURI(self): … … 290 298 # even though a user is set in the session 291 299 292 extra_environ={'beaker.session.ndg.security': 293 BeakerSessionStub(username='testuser')} 300 extra_environ = { 301 'beaker.session.ndg.security': 302 BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI) 303 } 294 304 response = self.app.get('/test_401', 295 305 extra_environ=extra_environ, … … 302 312 # even though a user is set in the session 303 313 304 extra_environ={'beaker.session.ndg.security': 305 BeakerSessionStub(username='testuser')} 314 extra_environ = { 315 'beaker.session.ndg.security': 316 BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI) 317 } 306 318 response = self.app.get('/test_403', 307 319 extra_environ=extra_environ, … … 324 336 # User is logged in but doesn't have the required credentials for 325 337 # access 326 extra_environ={'beaker.session.ndg.security': 327 BeakerSessionStub(username='testuser')} 338 extra_environ = { 339 'beaker.session.ndg.security': 340 BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI) 341 } 328 342 329 343 response = self.app.get('/test_accessDeniedToSecuredURI', … … 338 352 # User is logged in and has credentials for access to a URI secured 339 353 # by the policy file 340 extra_environ={'beaker.session.ndg.security': 341 BeakerSessionStub(username='testuser')} 354 extra_environ = { 355 'beaker.session.ndg.security': 356 BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI) 357 } 342 358 343 359 response = self.app.get('/test_accessGrantedToSecuredURI', … … 351 367 # User is logged in but doesn't have the required credentials for 352 368 # access 353 extra_environ={'beaker.session.ndg.security': 354 BeakerSessionStub(username='testuser')} 369 extra_environ = { 370 'beaker.session.ndg.security': 371 BeakerSessionStub(username=SamlWSGIAuthZTestCase.OPENID_URI) 372 } 355 373 356 374 # Try this URI with the query arg admin=1. This will be picked up
Note: See TracChangeset
for help on using the changeset viewer.