Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'botocore' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
params[p] = module.params.get(p)
if d_b_name:
params['d_b_name'] = d_b_name
try:
redshift.describe_clusters(ClusterIdentifier=identifier)['Clusters'][0]
changed = False
except is_boto3_error_code('ClusterNotFound'):
try:
redshift.create_cluster(ClusterIdentifier=identifier,
NodeType=node_type,
MasterUsername=username,
MasterUserPassword=password,
**snake_dict_to_camel_dict(params, capitalize_first=True))
except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e:
module.fail_json_aws(e, msg="Failed to create cluster")
except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: # pylint: disable=duplicate-except
module.fail_json_aws(e, msg="Failed to describe cluster")
if wait:
attempts = wait_timeout // 60
waiter = redshift.get_waiter('cluster_available')
try:
waiter.wait(
ClusterIdentifier=identifier,
WaiterConfig=dict(MaxAttempts=attempts)
)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
module.fail_json_aws(e, msg="Timeout waiting for the cluster creation")
try:
resource = redshift.describe_clusters(ClusterIdentifier=identifier)['Clusters'][0]
except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e:
def assert_presigned_url_matches(self, actual_url, expected_match):
"""Verify generated presigned URL matches expected dict.
This method compares an actual URL against a dict of expected
values. The reason that the "expected_match" is a dict instead
of the expected presigned URL is because the query params
are unordered so we can't guarantee an expected query param
ordering.
"""
parts = urlsplit(actual_url)
self.assertEqual(parts.netloc, expected_match['hostname'])
self.assertEqual(parts.path, expected_match['path'])
query_params = self.parse_query_string(parts.query)
self.assertEqual(query_params, expected_match['query_params'])
def test_stack_wait(packager):
stubber = Stubber(packager.client)
response = {
"Stacks": [
{
"StackName": EXPECTED_STACK_NAME,
"StackStatus": "CREATE_COMPLETE",
"CreationTime": FAKE_DATETIME,
}
]
}
stubber.add_response("describe_stacks", response)
with stubber:
packager.stack_wait(EXPECTED_STACK_NAME, "stack_create_complete")
stubber.assert_no_pending_responses()
def test_should_retry():
err_good = ClientError({
'Error': {
'Code': 'ProvisionedThroughputExceededException',
'Message': 'oops'
}
}, 'testing')
err_bad = ClientError({
'Error': {
'Code': 'Bangarang!',
'Message': 'oops'
}
}, 'testing')
assert should_retry(err_good) is True
assert should_retry(err_bad) is False
assert should_retry(AssertionError("foo")) is False
def test_get_s2_info_botoError(get_object):
"""Should work as expected
"""
get_object.side_effect = ClientError(
{'Error': {'Code': 500, 'Message': 'Error'}}, 'get_object')
bucket = 'sentinel-s2-l1c'
scene_path = 'tiles/38/S/NG/2017/10/9/1/'
full = True
s3 = None
request_pays = False
expected = {
'acquisition_date': '20171009',
'browseURL': 'https://roda.sentinel-hub.com/sentinel-s2-l1c/tiles/38/S/NG/2017/10/9/1/preview.jpg',
'grid_square': 'NG',
'latitude_band': 'S',
'num': '1',
'path': 'tiles/38/S/NG/2017/10/9/1/',
'sat': 'S2A',
def test_absent_with_failure(self):
self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}
self.conn.describe_elasticsearch_domain_config.return_value = {'DomainConfig': domain_ret}
self.conn.delete_elasticsearch_domain.side_effect = ClientError(error_content, 'delete_domain')
result = self.salt_states['boto_elasticsearch_domain.absent']('test', domain_ret['DomainName'])
self.assertFalse(result['result'])
self.assertTrue('An error occurred' in result['comment'])
def test_deploy_flow_register_task_definition(monkeypatch, runner_token):
boto3_client = MagicMock()
boto3_client.describe_task_definition.side_effect = ClientError({}, None)
boto3_client.run_task.return_value = {"tasks": [{"taskArn": "test"}]}
boto3_client.register_task_definition.return_value = {}
monkeypatch.setattr("boto3.client", MagicMock(return_value=boto3_client))
agent = FargateAgent()
agent.deploy_flow(
flow_run=GraphQLResult(
{
"flow": GraphQLResult(
{
"storage": Docker(
registry_url="test", image_name="name", image_tag="tag"
).serialize(),
"id": "id",
}
def test_that_when_deleting_api_resources_and_delete_resource_throws_error_the_delete_api_resources_method_returns_false(self):
'''
Tests False delete_resource side side_effect
'''
self.conn.get_resources.return_value = api_resources_ret
self.conn.delete_resource.side_effect = ClientError(error_content, 'delete_resource')
result = boto_apigateway.delete_api_resources(restApiId='rm06h9oac4', path='/api', **conn_parameters)
self.assertFalse(result.get('deleted'))
def test_upload_to_s3_with_multipart_upload_aborted_on_error(self):
self.args.bucket = self.bucket
self.args.key = self.key
self.bundle_mock.tell.return_value = (6 << 20)
self.bundle_mock.read.return_value = b'a' * (6 << 20)
self.push.s3.upload_part.side_effect = ClientError(
{'Error': {'Code': 'Error', 'Message': 'Error'}},
'UploadPart'
)
with self.assertRaises(ClientError):
self.push._upload_to_s3(self.args, self.bundle_mock)
self.assertFalse(self.push.s3.put_object.called)
self.push.s3.create_multipart_upload.assert_called_with(
Bucket=self.bucket,
Key=self.key
)
self.assertTrue(self.push.s3.upload_part.called)
self.assertFalse(self.push.s3.complete_multipart_upload.called)
self.push.s3.abort_multipart_upload.assert_called_with(
Bucket=self.bucket,
Key=self.key,
UploadId=self.upload_id
def _force_detach_volume(volume):
log.info("Force detaching all volume attachments.")
for attachment in volume.attachments:
try:
log.info("Volume has attachment: {}".format(attachment))
log.info("Detaching volume from instance: {}".format(attachment['InstanceId']))
volume.detach_from_instance(
DryRun=False,
InstanceId=attachment['InstanceId'],
Device=attachment['Device'],
Force=True)
except exceptions.ClientError as exc:
log.exception("Failed to detach volume")
# See the following link for the structure of the exception:
# https://github.com/boto/botocore/blob/4d4c86b2bdd4b7a8e110e02abd4367f07137ca47/botocore/exceptions.py#L346
err_message = exc.response['Error']['Message']
err_code = exc.response['Error']['Code']
# See the following link for details of the error message:
# https://jira.mesosphere.com/browse/DCOS-37441?focusedCommentId=156163&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-156163
available_msg = "is in the 'available' state"
if err_code == 'IncorrectState' and available_msg in err_message:
log.info("Ignoring benign exception")
return
raise