Changeset 6674 for TI12-security/branches
- Timestamp:
- 04/03/10 15:15:56 (11 years ago)
- Location:
- TI12-security/branches/ndg-security-1.5.x
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
TI12-security/branches/ndg-security-1.5.x/.pydevproject
r6669 r6674 13 13 <pydev_pathproperty name="org.python.pydev.PROJECT_EXTERNAL_SOURCE_PATH"> 14 14 <path>/home/pjkersha/workspace/ndg_security_saml_0_2_x</path> 15 <path>/home/pjkersha/workspace/AuthKit/trunk</path> 15 16 </pydev_pathproperty> 16 17 </pydev_project> -
TI12-security/branches/ndg-security-1.5.x/ndg_security_common/ndg/security/common/saml_utils/bindings.py
r6069 r6674 243 243 ISSUER_NAME_OPTNAME = 'issuerName' 244 244 CLOCK_SKEW_OPTNAME = 'clockSkew' 245 VERIFY_TIME_CONDITIONS_OPTNAME = 'verifyTimeConditions' 245 246 246 247 CONFIG_FILE_OPTNAMES = ( 247 248 SUBJECT_ID_OPTNAME, 248 249 ISSUER_NAME_OPTNAME, 249 CLOCK_SKEW_OPTNAME 250 CLOCK_SKEW_OPTNAME, 251 VERIFY_TIME_CONDITIONS_OPTNAME 250 252 ) 251 253 … … 265 267 self.__queryAttributes = TypedList(Attribute) 266 268 self.__clockSkew = timedelta(seconds=0.) 267 269 self.__verifyTimeConditions = True 270 268 271 super(AttributeQuerySOAPBinding, self).__init__(**kw) 269 272 … … 339 342 raise 340 343 344 def _getVerifyTimeConditions(self): 345 return self.__verifyTimeConditions 346 347 def _setVerifyTimeConditions(self, value): 348 if isinstance(value, bool): 349 self.__verifyTimeConditions = value 350 351 if isinstance(value, basestring): 352 self.__verifyTimeConditions = str2Bool(value) 353 else: 354 raise TypeError('Expecting bool or string type for ' 355 '"verifyTimeConditions"; got %r instead' % 356 type(value)) 357 358 verifyTimeConditions = property(_getVerifyTimeConditions, 359 _setVerifyTimeConditions, 360 doc='Set to True to verify any time ' 361 'Conditions set in the returned ' 362 'response assertions') 363 341 364 def _getSubjectID(self): 342 365 return self.__subjectID … … 426 449 427 450 return attributeQuery 428 451 452 def _verifyTimeConditions(self, response): 453 """Verify time conditions set in a response 454 @param response: SAML Response returned from remote service 455 @type response: ndg.saml.saml2.core.Response 456 @raise SubjectQueryResponseError: if a timestamp is invalid 457 """ 458 459 if not self.verifyTimeConditions: 460 log.debug("Skipping verification of SAML Response time conditions") 461 462 utcNow = datetime.utcnow() 463 nowMinusSkew = utcNow - self.clockSkew 464 nowPlusSkew = utcNow + self.clockSkew 465 466 if response.issueInstant > nowPlusSkew: 467 msg = ('SAML Attribute Response issueInstant [%s] is after ' 468 'the clock time [%s] (skewed +%s)' % 469 (response.issueInstant, 470 SAMLDateTime.toString(nowPlusSkew), 471 self.clockSkew)) 472 473 samlRespError = AttributeQueryResponseError(msg) 474 samlRespError.response = response 475 raise samlRespError 476 477 for assertion in response.assertions: 478 if assertion.issueInstant is None: 479 samlRespError = AttributeQueryResponseError("No issueInstant " 480 "set in response " 481 "assertion") 482 samlRespError.response = response 483 raise samlRespError 484 485 elif nowPlusSkew < assertion.issueInstant: 486 msg = ('The clock time [%s] (skewed +%s) is before the ' 487 'SAML Attribute Response assertion issue instant [%s]' % 488 (SAMLDateTime.toString(utcNow), 489 self.clockSkew, 490 assertion.issueInstant)) 491 samlRespError = AttributeQueryResponseError(msg) 492 samlRespError.response = response 493 raise samlRespError 494 495 if assertion.conditions is not None: 496 if nowPlusSkew < assertion.conditions.notBefore: 497 msg = ('The clock time [%s] (skewed +%s) is before the ' 498 'SAML Attribute Response assertion conditions not ' 499 'before time [%s]' % 500 (SAMLDateTime.toString(utcNow), 501 self.clockSkew, 502 assertion.conditions.notBefore)) 503 504 samlRespError = AttributeQueryResponseError(msg) 505 samlRespError.response = response 506 raise samlRespError 507 508 if nowMinusSkew >= assertion.conditions.notOnOrAfter: 509 msg = ('The clock time [%s] (skewed -%s) is on or after ' 510 'the SAML Attribute Response assertion conditions ' 511 'not on or after time [%s]' % 512 (SAMLDateTime.toString(utcNow), 513 self.clockSkew, 514 assertion.conditions.notOnOrAfter)) 515 516 samlRespError = AttributeQueryResponseError(msg) 517 samlRespError.response = response 518 raise samlRespError 519 429 520 def send(self, **kw): 430 521 '''Make an attribute query to a remote SAML service … … 458 549 raise samlRespError 459 550 460 utcNow = datetime.utcnow() + self.clockSkew 461 if response.issueInstant > utcNow: 462 msg = ('SAML Attribute Response issueInstant [%s] is after ' 463 'the current clock time [%s]' % 464 (attributeQuery.issueInstant, SAMLDateTime.toString(utcNow))) 465 466 samlRespError = AttributeQueryResponseError(msg) 467 samlRespError.response = response 468 raise samlRespError 469 470 for assertion in response.assertions: 471 if utcNow < assertion.conditions.notBefore: 472 msg = ('The current clock time [%s] is before the SAML ' 473 'Attribute Response assertion conditions not before ' 474 'time [%s]' % 475 (SAMLDateTime.toString(utcNow), 476 assertion.conditions.notBefore)) 477 478 samlRespError = AttributeQueryResponseError(msg) 479 samlRespError.response = response 480 raise samlRespError 481 482 if utcNow >= assertion.conditions.notOnOrAfter: 483 msg = ('The current clock time [%s] is on or after the SAML ' 484 'Attribute Response assertion conditions not on or ' 485 'after time [%s]' % 486 (SAMLDateTime.toString(utcNow), 487 response.assertion.conditions.notOnOrAfter)) 488 489 samlRespError = AttributeQueryResponseError(msg) 490 samlRespError.response = response 491 raise samlRespError 492 551 self._verifyTimeConditions(response) 552 493 553 return response 494 554 -
TI12-security/branches/ndg-security-1.5.x/ndg_security_server/ndg/security/server/wsgi/openid/provider/__init__.py
r6440 r6674 1316 1316 @return: WSGI response''' 1317 1317 """ 1318 if not isinstance(oidResponse, server.OpenIDResponse): 1319 log.error("OpenID Response is %r type, expecting %r", 1320 type(oidResponse), server.OpenIDResponse) 1321 return self._render.errorPage(environ, start_response, 1322 "Error setting a response. Please " 1323 "report this fault to your site " 1324 "administrator.", 1325 code=500) 1326 1318 1327 try: 1319 1328 webresponse = self.oidserver.encodeResponse(oidResponse) -
TI12-security/branches/ndg-security-1.5.x/ndg_security_test/ndg/security/test/unit/saml/test_samlinterface.py
r6069 r6674 20 20 from xml.etree import ElementTree 21 21 22 from saml.utils import SAMLDateTime 22 23 from saml.saml2.core import (Response, Assertion, Attribute, 23 24 AttributeStatement, SAMLVersion, Subject, NameID, … … 31 32 from ndg.security.common.soap.etree import SOAPEnvelope 32 33 from ndg.security.common.utils.etree import QName, prettyPrint 34 from ndg.security.common.saml_utils.bindings import (AttributeQuerySOAPBinding, 35 AttributeQueryResponseError) 33 36 from ndg.security.common.saml_utils.esg import (EsgSamlNamespaces, 34 37 XSGroupRoleAttributeValue) … … 160 163 """TODO: test SAML Attribute Authority interface""" 161 164 thisDir = os.path.dirname(os.path.abspath(__file__)) 162 163 def __init__(self, *args, **kwargs): 164 wsgiApp = SamlSoapBindingApp() 165 self.app = paste.fixture.TestApp(wsgiApp) 166 167 BaseTestCase.__init__(self, *args, **kwargs) 168 169 170 def test01AttributeQuery(self): 171 attributeQuery = AttributeQuery() 172 attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 173 attributeQuery.id = str(uuid4()) 174 attributeQuery.issueInstant = datetime.utcnow() 175 176 attributeQuery.issuer = Issuer() 177 attributeQuery.issuer.format = Issuer.X509_SUBJECT 178 attributeQuery.issuer.value = \ 179 "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk" 180 181 182 attributeQuery.subject = Subject() 183 attributeQuery.subject.nameID = NameID() 184 attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT 185 attributeQuery.subject.nameID.value = \ 186 "https://openid.localhost/philip.kershaw" 187 188 # special case handling for 'FirstName' attribute 189 fnAttribute = Attribute() 190 fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME 191 fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 192 fnAttribute.friendlyName = "FirstName" 193 194 attributeQuery.attributes.append(fnAttribute) 195 196 # special case handling for 'LastName' attribute 197 lnAttribute = Attribute() 198 lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME 199 lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 200 lnAttribute.friendlyName = "LastName" 201 202 attributeQuery.attributes.append(lnAttribute) 203 204 # special case handling for 'LastName' attribute 205 emailAddressAttribute = Attribute() 206 emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME 207 emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 208 XSStringAttributeValue.TYPE_LOCAL_NAME 209 emailAddressAttribute.friendlyName = "emailAddress" 210 211 attributeQuery.attributes.append(emailAddressAttribute) 212 213 elem = AttributeQueryElementTree.toXML(attributeQuery) 214 soapRequest = SOAPEnvelope() 215 soapRequest.create() 216 soapRequest.body.elem.append(elem) 217 218 request = soapRequest.serialize() 219 220 header = { 221 'soapAction': "http://www.oasis-open.org/committees/security", 222 'Content-length': str(len(request)), 223 'Content-type': 'text/xml' 224 } 225 response = self.app.post('/attributeauthority', 226 params=request, 227 headers=header, 228 status=200) 229 print("Response status=%d" % response.status) 230 231 soapResponse = SOAPEnvelope() 232 233 responseStream = StringIO() 234 responseStream.write(response.body) 235 responseStream.seek(0) 236 237 soapResponse.parse(responseStream) 238 239 print("Parsed response ...") 240 print(soapResponse.serialize()) 241 # print(prettyPrint(soapResponse.elem)) 242 243 response = ResponseElementTree.fromXML(soapResponse.body.elem[0]) 244 self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI) 245 self.assert_(response.inResponseTo == attributeQuery.id) 246 self.assert_(response.assertions[0].subject.nameID.value == \ 247 attributeQuery.subject.nameID.value) 248 249 def test02AttributeQueryWithSOAPClient(self): 250 251 # Thread a separate attribute authority instance 252 self.startSiteAAttributeAuthority() 253 254 client = UrlLib2SOAPClient() 255 256 # ElementTree based envelope class 257 client.responseEnvelopeClass = SOAPEnvelope 258 259 request = UrlLib2SOAPRequest() 260 request.url = 'http://localhost:5000/AttributeAuthority/saml' 261 request.envelope = SOAPEnvelope() 262 request.envelope.create() 263 264 # Make an attribute query 265 attributeQuery = AttributeQuery() 266 attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 267 attributeQuery.id = str(uuid4()) 268 attributeQuery.issueInstant = datetime.utcnow() 269 270 attributeQuery.issuer = Issuer() 271 attributeQuery.issuer.format = Issuer.X509_SUBJECT 272 attributeQuery.issuer.value = \ 273 "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk" 274 275 attributeQuery.subject = Subject() 276 attributeQuery.subject.nameID = NameID() 277 attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT 278 attributeQuery.subject.nameID.value = \ 279 "https://esg.prototype.ucar.edu/myopenid/testUser" 280 281 # special case handling for 'FirstName' attribute 282 fnAttribute = Attribute() 283 fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME 284 fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 285 fnAttribute.friendlyName = "FirstName" 286 287 attributeQuery.attributes.append(fnAttribute) 288 289 # special case handling for 'LastName' attribute 290 lnAttribute = Attribute() 291 lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME 292 lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 293 lnAttribute.friendlyName = "LastName" 294 295 attributeQuery.attributes.append(lnAttribute) 296 297 # special case handling for 'LastName' attribute 298 emailAddressAttribute = Attribute() 299 emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME 300 emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 301 XSStringAttributeValue.TYPE_LOCAL_NAME 302 emailAddressAttribute.friendlyName = "emailAddress" 303 304 attributeQuery.attributes.append(emailAddressAttribute) 305 306 attributeQueryElem = AttributeQueryElementTree.toXML(attributeQuery) 307 308 # Attach query to SOAP body 309 request.envelope.body.elem.append(attributeQueryElem) 310 311 from M2Crypto.m2urllib2 import HTTPSHandler 312 from urllib2 import URLError 313 314 client.openerDirector.add_handler(HTTPSHandler()) 315 try: 316 response = client.send(request) 317 except URLError, e: 318 self.fail("Error calling Attribute Service") 319 320 print("Response from server:\n\n%s" % response.envelope.serialize()) 321 322 if len(response.envelope.body.elem) != 1: 323 self.fail("Expecting single child element is SOAP body") 324 325 if QName.getLocalPart(response.envelope.body.elem[0].tag)!='Response': 326 self.fail('Expecting "Response" element in SOAP body') 327 328 toSAMLTypeMap = [XSGroupRoleAttributeValueElementTree.factoryMatchFunc] 329 response = ResponseElementTree.fromXML(response.envelope.body.elem[0], 330 customToSAMLTypeMap=toSAMLTypeMap) 331 self.assert_(response) 332 333 def test03ParseResponse(self): 334 response = \ 335 '''<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"> 165 RESPONSE = '''\ 166 <SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"> 336 167 <SOAP-ENV:Body> 337 <samlp:Response ID="05680cb2-4973-443d-9d31-7bc99bea87c1" InResponseTo="e3183380-ae82-4285-8827-8c40613842de" IssueInstant=" 2009-08-17T12:28:37.325Z" Version="2.0" xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol">168 <samlp:Response ID="05680cb2-4973-443d-9d31-7bc99bea87c1" InResponseTo="e3183380-ae82-4285-8827-8c40613842de" IssueInstant="%(issueInstant)s" Version="2.0" xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"> 338 169 <saml:Issuer Format="urn:esg:issuer" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">ESG-NCAR</saml:Issuer> 339 170 <samlp:Status> 340 171 <samlp:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Success" /> 341 172 </samlp:Status> 342 <saml:Assertion ID="192c67d9-f9cd-457a-9242-999e7b943166" IssueInstant=" 2009-08-17T12:28:37.347Z" Version="2.0" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion">173 <saml:Assertion ID="192c67d9-f9cd-457a-9242-999e7b943166" IssueInstant="%(assertionIssueInstant)s" Version="2.0" xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion"> 343 174 <saml:Issuer Format="urn:esg:issuer">ESG-NCAR</saml:Issuer> 344 175 <saml:Subject> 345 176 <saml:NameID Format="urn:esg:openid">https://esg.prototype.ucar.edu/myopenid/testUser</saml:NameID> 346 177 </saml:Subject> 347 <saml:Conditions NotBefore=" 2009-08-17T12:28:37.347Z" NotOnOrAfter="2009-08-18T12:28:37.347Z" />178 <saml:Conditions NotBefore="%(notBefore)s" NotOnOrAfter="%(notOnOrAfter)s" /> 348 179 <saml:AttributeStatement> 349 180 <saml:Attribute FriendlyName="FirstName" Name="urn:esg:first:name" NameFormat="http://www.w3.org/2001/XMLSchema#string"> … … 371 202 </samlp:Response> 372 203 </SOAP-ENV:Body> 373 </SOAP-ENV:Envelope>''' 374 204 </SOAP-ENV:Envelope> 205 ''' 206 207 def __init__(self, *args, **kwargs): 208 wsgiApp = SamlSoapBindingApp() 209 self.app = paste.fixture.TestApp(wsgiApp) 210 211 BaseTestCase.__init__(self, *args, **kwargs) 212 213 214 def test01AttributeQuery(self): 215 attributeQuery = AttributeQuery() 216 attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 217 attributeQuery.id = str(uuid4()) 218 attributeQuery.issueInstant = datetime.utcnow() 219 220 attributeQuery.issuer = Issuer() 221 attributeQuery.issuer.format = Issuer.X509_SUBJECT 222 attributeQuery.issuer.value = \ 223 "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk" 224 225 226 attributeQuery.subject = Subject() 227 attributeQuery.subject.nameID = NameID() 228 attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT 229 attributeQuery.subject.nameID.value = \ 230 "https://openid.localhost/philip.kershaw" 231 232 # special case handling for 'FirstName' attribute 233 fnAttribute = Attribute() 234 fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME 235 fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 236 fnAttribute.friendlyName = "FirstName" 237 238 attributeQuery.attributes.append(fnAttribute) 239 240 # special case handling for 'LastName' attribute 241 lnAttribute = Attribute() 242 lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME 243 lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 244 lnAttribute.friendlyName = "LastName" 245 246 attributeQuery.attributes.append(lnAttribute) 247 248 # special case handling for 'LastName' attribute 249 emailAddressAttribute = Attribute() 250 emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME 251 emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 252 XSStringAttributeValue.TYPE_LOCAL_NAME 253 emailAddressAttribute.friendlyName = "emailAddress" 254 255 attributeQuery.attributes.append(emailAddressAttribute) 256 257 elem = AttributeQueryElementTree.toXML(attributeQuery) 258 soapRequest = SOAPEnvelope() 259 soapRequest.create() 260 soapRequest.body.elem.append(elem) 261 262 request = soapRequest.serialize() 263 264 header = { 265 'soapAction': "http://www.oasis-open.org/committees/security", 266 'Content-length': str(len(request)), 267 'Content-type': 'text/xml' 268 } 269 response = self.app.post('/attributeauthority', 270 params=request, 271 headers=header, 272 status=200) 273 print("Response status=%d" % response.status) 274 375 275 soapResponse = SOAPEnvelope() 376 276 377 277 responseStream = StringIO() 378 responseStream.write(response) 278 responseStream.write(response.body) 279 responseStream.seek(0) 280 281 soapResponse.parse(responseStream) 282 283 print("Parsed response ...") 284 print(soapResponse.serialize()) 285 # print(prettyPrint(soapResponse.elem)) 286 287 response = ResponseElementTree.fromXML(soapResponse.body.elem[0]) 288 self.assert_(response.status.statusCode.value==StatusCode.SUCCESS_URI) 289 self.assert_(response.inResponseTo == attributeQuery.id) 290 self.assert_(response.assertions[0].subject.nameID.value == \ 291 attributeQuery.subject.nameID.value) 292 293 def test02AttributeQueryWithSOAPClient(self): 294 295 # Thread a separate attribute authority instance 296 self.startSiteAAttributeAuthority() 297 298 client = UrlLib2SOAPClient() 299 300 # ElementTree based envelope class 301 client.responseEnvelopeClass = SOAPEnvelope 302 303 request = UrlLib2SOAPRequest() 304 request.url = 'http://localhost:5000/AttributeAuthority/saml' 305 request.envelope = SOAPEnvelope() 306 request.envelope.create() 307 308 # Make an attribute query 309 attributeQuery = AttributeQuery() 310 attributeQuery.version = SAMLVersion(SAMLVersion.VERSION_20) 311 attributeQuery.id = str(uuid4()) 312 attributeQuery.issueInstant = datetime.utcnow() 313 314 attributeQuery.issuer = Issuer() 315 attributeQuery.issuer.format = Issuer.X509_SUBJECT 316 attributeQuery.issuer.value = \ 317 "/O=NDG/OU=BADC/CN=attributeauthority.badc.rl.ac.uk" 318 319 attributeQuery.subject = Subject() 320 attributeQuery.subject.nameID = NameID() 321 attributeQuery.subject.nameID.format = EsgSamlNamespaces.NAMEID_FORMAT 322 attributeQuery.subject.nameID.value = \ 323 "https://esg.prototype.ucar.edu/myopenid/testUser" 324 325 # special case handling for 'FirstName' attribute 326 fnAttribute = Attribute() 327 fnAttribute.name = EsgSamlNamespaces.FIRSTNAME_ATTRNAME 328 fnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 329 fnAttribute.friendlyName = "FirstName" 330 331 attributeQuery.attributes.append(fnAttribute) 332 333 # special case handling for 'LastName' attribute 334 lnAttribute = Attribute() 335 lnAttribute.name = EsgSamlNamespaces.LASTNAME_ATTRNAME 336 lnAttribute.nameFormat = "http://www.w3.org/2001/XMLSchema#string" 337 lnAttribute.friendlyName = "LastName" 338 339 attributeQuery.attributes.append(lnAttribute) 340 341 # special case handling for 'LastName' attribute 342 emailAddressAttribute = Attribute() 343 emailAddressAttribute.name = EsgSamlNamespaces.EMAILADDRESS_ATTRNAME 344 emailAddressAttribute.nameFormat = XMLConstants.XSD_NS+"#"+\ 345 XSStringAttributeValue.TYPE_LOCAL_NAME 346 emailAddressAttribute.friendlyName = "emailAddress" 347 348 attributeQuery.attributes.append(emailAddressAttribute) 349 350 attributeQueryElem = AttributeQueryElementTree.toXML(attributeQuery) 351 352 # Attach query to SOAP body 353 request.envelope.body.elem.append(attributeQueryElem) 354 355 from M2Crypto.m2urllib2 import HTTPSHandler 356 from urllib2 import URLError 357 358 client.openerDirector.add_handler(HTTPSHandler()) 359 try: 360 response = client.send(request) 361 except URLError, e: 362 self.fail("Error calling Attribute Service") 363 364 print("Response from server:\n\n%s" % response.envelope.serialize()) 365 366 if len(response.envelope.body.elem) != 1: 367 self.fail("Expecting single child element is SOAP body") 368 369 if QName.getLocalPart(response.envelope.body.elem[0].tag)!='Response': 370 self.fail('Expecting "Response" element in SOAP body') 371 372 toSAMLTypeMap = [XSGroupRoleAttributeValueElementTree.factoryMatchFunc] 373 response = ResponseElementTree.fromXML(response.envelope.body.elem[0], 374 customToSAMLTypeMap=toSAMLTypeMap) 375 self.assert_(response) 376 377 def _parseResponse(self, responseStr): 378 """Helper to parse a response from a string""" 379 soapResponse = SOAPEnvelope() 380 381 responseStream = StringIO() 382 responseStream.write(responseStr) 379 383 responseStream.seek(0) 380 384 … … 387 391 response = ResponseElementTree.fromXML(soapResponse.body.elem[0], 388 392 customToSAMLTypeMap=toSAMLTypeMap) 393 return response 394 395 def test03ParseResponse(self): 396 utcNow = datetime.utcnow() 397 respDict = { 398 'issueInstant': SAMLDateTime.toString(utcNow), 399 'assertionIssueInstant': SAMLDateTime.toString(utcNow), 400 'notBefore': SAMLDateTime.toString(utcNow), 401 'notOnOrAfter': SAMLDateTime.toString(utcNow + timedelta( 402 seconds=60*60*8)) 403 } 404 responseStr = SamlAttributeAuthorityInterfaceTestCase.RESPONSE % \ 405 respDict 406 response = self._parseResponse(responseStr) 389 407 self.assert_(response) 390 408 409 def test04AssertionConditionExpired(self): 410 utcNow = datetime.utcnow() + timedelta(seconds=60*60*9) 411 respDict = { 412 'issueInstant': SAMLDateTime.toString(utcNow), 413 'assertionIssueInstant': SAMLDateTime.toString(utcNow), 414 'notBefore': SAMLDateTime.toString(utcNow), 415 'notOnOrAfter': SAMLDateTime.toString(utcNow + timedelta( 416 seconds=60*60*8)) 417 } 418 responseStr = SamlAttributeAuthorityInterfaceTestCase.RESPONSE % \ 419 respDict 420 response = self._parseResponse(responseStr) 421 binding = AttributeQuerySOAPBinding() 422 try: 423 binding._verifyTimeConditions(response) 424 self.fail("Expecting not on or after timestamp error") 425 except AttributeQueryResponseError, e: 426 print("PASSED: %s" % e) 427 428 def test05ResponseIssueInstantInvalid(self): 429 utcNow = datetime.utcnow() 430 respDict = { 431 'issueInstant': SAMLDateTime.toString(utcNow + timedelta( 432 seconds=1)), 433 'assertionIssueInstant': SAMLDateTime.toString(utcNow), 434 'notBefore': SAMLDateTime.toString(utcNow), 435 'notOnOrAfter': SAMLDateTime.toString(utcNow + timedelta( 436 seconds=60*60*8)) 437 } 438 responseStr = SamlAttributeAuthorityInterfaceTestCase.RESPONSE % \ 439 respDict 440 response = self._parseResponse(responseStr) 441 binding = AttributeQuerySOAPBinding() 442 try: 443 binding._verifyTimeConditions(response) 444 self.fail("Expecting issue instant timestamp error") 445 except AttributeQueryResponseError, e: 446 print("PASSED: %s" % e) 447 448 def test06NotBeforeConditionInvalid(self): 449 utcNow = datetime.utcnow() 450 respDict = { 451 'issueInstant': SAMLDateTime.toString(utcNow), 452 'assertionIssueInstant': SAMLDateTime.toString(utcNow), 453 'notBefore': SAMLDateTime.toString(utcNow + timedelta(seconds=1)), 454 'notOnOrAfter': SAMLDateTime.toString(utcNow + timedelta( 455 seconds=60*60*8)) 456 } 457 responseStr = SamlAttributeAuthorityInterfaceTestCase.RESPONSE % \ 458 respDict 459 response = self._parseResponse(responseStr) 460 binding = AttributeQuerySOAPBinding() 461 try: 462 binding._verifyTimeConditions(response) 463 self.fail("Expecting issue instant timestamp error") 464 except AttributeQueryResponseError, e: 465 print("PASSED: %s" % e) 466 467 def test07AssertionIssueInstantInvalid(self): 468 utcNow = datetime.utcnow() 469 respDict = { 470 'issueInstant': SAMLDateTime.toString(utcNow), 471 'assertionIssueInstant': SAMLDateTime.toString(utcNow + timedelta( 472 seconds=1)), 473 'notBefore': SAMLDateTime.toString(utcNow), 474 'notOnOrAfter': SAMLDateTime.toString(utcNow + timedelta( 475 seconds=60*60*8)) 476 } 477 responseStr = SamlAttributeAuthorityInterfaceTestCase.RESPONSE % \ 478 respDict 479 response = self._parseResponse(responseStr) 480 binding = AttributeQuerySOAPBinding() 481 try: 482 binding._verifyTimeConditions(response) 483 self.fail("Expecting issue instant timestamp error") 484 except AttributeQueryResponseError, e: 485 print("PASSED: %s" % e) 486 487 def test07ClockSkewCorrectedAssertionIssueInstantInvalid(self): 488 utcNow = datetime.utcnow() 489 respDict = { 490 'issueInstant': SAMLDateTime.toString(utcNow), 491 'assertionIssueInstant': SAMLDateTime.toString(utcNow + timedelta( 492 seconds=1)), 493 'notBefore': SAMLDateTime.toString(utcNow), 494 'notOnOrAfter': SAMLDateTime.toString(utcNow + timedelta( 495 seconds=60*60*8)) 496 } 497 responseStr = SamlAttributeAuthorityInterfaceTestCase.RESPONSE % \ 498 respDict 499 response = self._parseResponse(responseStr) 500 binding = AttributeQuerySOAPBinding() 501 502 # Set a skew to correct the error 503 binding.clockSkew = 1 504 505 try: 506 binding._verifyTimeConditions(response) 507 except AttributeQueryResponseError, e: 508 self.fail("issue instant timestamp error should be corrected for") 509 510 def test08ClockSkewCorrectedAssertionConditionExpired(self): 511 utcNow = datetime.utcnow() + timedelta(seconds=60*60*9) 512 respDict = { 513 'issueInstant': SAMLDateTime.toString(utcNow), 514 'assertionIssueInstant': SAMLDateTime.toString(utcNow), 515 'notBefore': SAMLDateTime.toString(utcNow), 516 'notOnOrAfter': SAMLDateTime.toString(utcNow + timedelta( 517 seconds=60*60*8)) 518 } 519 responseStr = SamlAttributeAuthorityInterfaceTestCase.RESPONSE % \ 520 respDict 521 response = self._parseResponse(responseStr) 522 binding = AttributeQuerySOAPBinding() 523 524 # Set a skew to correct the error 525 binding.clockSkew = 60*60*9 526 527 try: 528 binding._verifyTimeConditions(response) 391 529 530 except AttributeQueryResponseError, e: 531 self.fail("Not on or after timestamp error should be corrected for") 532 392 533 if __name__ == "__main__": 393 534 unittest.main()
Note: See TracChangeset
for help on using the changeset viewer.