Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'tenacity' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
return ret
S3_POOL = None
GC_POOL = None
def reset_connection_pools():
global S3_POOL
global GC_POOL
S3_POOL = keydefaultdict(lambda service: keydefaultdict(lambda bucket_name: S3ConnectionPool(service, bucket_name)))
GC_POOL = keydefaultdict(lambda bucket_name: GCloudBucketPool(bucket_name))
reset_connection_pools()
retry = tenacity.retry(
reraise=True,
stop=tenacity.stop_after_attempt(7),
wait=tenacity.wait_random_exponential(0.5, 60.0),
)
DEFAULT_THREADS = 20
class SimpleStorage(object):
"""
Access files stored in Google Storage (gs), Amazon S3 (s3),
or the local Filesystem (file).
e.g. with Storage('gs://bucket/dataset/layer') as stor:
files = stor.get_file('filename')
Required:
layer_path (str): A protocol prefixed path of the above format.
Accepts s3:// gs:// and file://. File paths are absolute.
retry=tenacity.retry_if_exception_type(CustomError))
def _retryable_test_with_exception_type_custom_attempt_limit(thing):
return thing.go()
def test_random_sleep(self):
r = Retrying(wait=tenacity.wait_random(min=1000, max=2000))
times = set()
times.add(r.wait(1, 6546))
times.add(r.wait(1, 6546))
times.add(r.wait(1, 6546))
times.add(r.wait(1, 6546))
# this is kind of non-deterministic...
self.assertTrue(len(times) > 1)
for t in times:
self.assertTrue(t >= 1000)
self.assertTrue(t <= 2000)
def test_incrementing_sleep(self):
r = Retrying(wait=tenacity.wait_incrementing(
start=500, increment=100))
self.assertEqual(500, r.wait(1, 6546))
self.assertEqual(600, r.wait(2, 6546))
self.assertEqual(700, r.wait(3, 6546))
def test_neutron_bgp_speaker_appears_on_agent(self):
openstack_utils.neutron_bgp_speaker_appears_on_agent.retry.stop = \
tenacity.stop_after_attempt(1)
self.assertEqual(
openstack_utils.neutron_bgp_speaker_appears_on_agent(
self.neutronclient, 'FAKE_AGENT_ID'),
self.bgp_speakers)
def test_retry_object(vault_prefix, mocker):
config = Konfig(
vault_backend=VaultBackend(
vault_prefix,
retry=Retrying(retry=retry_if_exception_type(KonfettiError), reraise=True, stop=stop_after_attempt(2)),
)
)
mocker.patch("requests.adapters.HTTPAdapter.send", side_effect=KonfettiError)
m = mocker.patch.object(config.vault_backend, "_call", wraps=config.vault_backend._call)
with pytest.raises(KonfettiError):
config.SECRET
assert m.called is True
assert m.call_count == 2
wait=tenacity.wait_fixed(10),
reraise=True)
def check_delete(test, service, obj, perform_delete=False):
if perform_delete:
obj.delete()
objs = service.list()
found_objs = [o for o in objs if o.id == obj.id]
test.assertTrue(
len(found_objs) == 0,
"Object %s in service %s should have been deleted but still exists."
% (found_objs, type(service).__name__))
@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),
reraise=True, stop=tenacity.stop_after_attempt(12))
def _target_get_object():
return target_client.get_object(_container, 'testfile')
_, target_content = _target_get_object()
def test_stop_after_delay(self):
r = Retrying(stop=tenacity.stop_after_delay(1000))
self.assertFalse(r.stop(2, 999))
self.assertTrue(r.stop(2, 1000))
self.assertTrue(r.stop(2, 1001))
api_client.post_to_add_batch(later_batch, sku, qty=10, eta='2011-01-03')
r = api_client.post_to_allocate(orderid, sku, 10)
assert r.ok
response = api_client.get_allocation(orderid)
assert response.json()[0]['batchref'] == earlier_batch
subscription = redis_client.subscribe_to('line_allocated')
# change quantity on allocated batch so it's less than our order
redis_client.publish_message('change_batch_quantity', {
'batchref': earlier_batch, 'qty': 5
})
# wait until we see a message saying the order has been reallocated
messages = []
for attempt in Retrying(stop=stop_after_delay(3), reraise=True):
with attempt:
message = subscription.get_message(timeout=1)
if message:
messages.append(message)
print(messages)
data = json.loads(messages[-1]['data'])
assert data['orderid'] == orderid
assert data['batchref'] == later_batch