Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'retrying' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
@retrying.retry(wait_fixed=5000, stop_max_delay=timeout * 1000,
retry_on_result=lambda ret: ret is None,
retry_on_exception=lambda x: False)
def _poll_marathon_for_app_deployment(app_id):
Endpoint = collections.namedtuple("Endpoint", ["host", "port", "ip"])
# Some of the counters need to be explicitly enabled now and/or in
# future versions of Marathon:
req_params = (('embed', 'apps.lastTaskFailure'),
('embed', 'apps.counts'))
log.info('Waiting for application to be deployed...')
r = self.get(path_join('v2/apps', app_id), params=req_params)
r.raise_for_status()
data = r.json()
log.debug('Current application state data: {}'.format(repr(data)))
@retrying.retry(
wait_fixed=STD_INTERVAL,
stop_max_delay=METRICS_WAITTIME,
retry_on_exception=lambda e: isinstance(e, AssertionError)
)
def check_adminrouter_metrics():
measurements = set()
expect_dropped = set([
'nginx_vts_filter',
'nginx_vts_upstream',
'nginx_vts_server',
])
unexpected_samples = []
response = get_metrics_prom(dcos_api_session, node)
for family in text_string_to_metric_families(response.text):
for sample in family.samples:
@retrying.retry(wait_fixed=1000,
retry_on_result=lambda ret: ret is False,
retry_on_exception=lambda x: False)
def _wait_for_slaves_to_join(self):
r = self.get('/mesos/master/slaves')
if r.status_code != 200:
msg = "Mesos master returned status code {} != 200 "
msg += "continuing to wait..."
log.info(msg.format(r.status_code))
return False
data = r.json()
# Check that there are all the slaves the test knows about. They are all
# needed to pass the test.
num_slaves = len(data['slaves'])
if num_slaves >= len(self.all_slaves):
msg = "Sufficient ({} >= {}) number of slaves have joined the cluster"
log.info(msg.format(num_slaves, self.all_slaves))
def _pool_for_mesos_dns():
r = dcos_api_session.get('/mesos_dns/v1/services/_{}._tcp.marathon.mesos'.format(
app_definition['id'].lstrip('/')))
assert r.status_code == 200
r_data = r.json()
if r_data == [{'host': '', 'port': '', 'service': '', 'ip': ''}] or len(r_data) < len(service_points):
logging.info("Waiting for Mesos-DNS to update entries")
return None
else:
logging.info("Mesos-DNS entries have been updated!")
return r_data
try:
r_data = _pool_for_mesos_dns()
except retrying.RetryError:
msg = "Mesos DNS has failed to update entries in {} seconds."
pytest.fail(msg.format(DNS_ENTRY_UPDATE_TIMEOUT))
marathon_provided_servicepoints = sorted((x.host, x.port) for x in service_points)
mesosdns_provided_servicepoints = sorted((x['ip'], int(x['port'])) for x in r_data)
assert marathon_provided_servicepoints == mesosdns_provided_servicepoints
# Verify if containers themselves confirm what Marathon says:
payload = {"reflector_ip": service_points[1].host,
"reflector_port": service_points[1].port}
r = requests.post('http://{}:{}/your_ip'.format(
service_points[0].host, service_points[0].port), payload)
if r.status_code != 200:
msg = "Test server replied with non-200 reply: '{status_code} {reason}. "
msg += "Detailed explanation of the problem: {text}"
pytest.fail(msg.format(status_code=r.status_code, reason=r.reason, text=r.text))
# the pod's status to become STABLE is sufficient. In the future,
# if test pod deployments become more complex, we should switch to
# using Marathon's event bus and listen for specific events.
# See DCOS_OSS-1056.
r = self.get('v2/pods' + pod_id + '::status')
r.raise_for_status()
data = r.json()
if 'status' in data and data['status'] == 'STABLE':
# deployment complete
return data
log.info('Waiting for pod to be deployed %r', data)
return False
try:
return _wait_for_pod_deployment(pod_definition['id'])
except retrying.RetryError as ex:
raise Exception("Pod deployment failed - operation was not "
"completed in {} seconds.".format(timeout)) from ex
log.info('Job run {} finished.'.format(r_id))
return True
else:
raise requests.HTTPError(
'Waiting for job run {} to be finished, but history for that job run is not available'
.format(r_id), response=rc)
else:
raise requests.HTTPError(
'Waiting for job run {} to be finished, but getting HTTP status code {}'
.format(r_id, rc.status_code), response=rc)
try:
# wait for the run to complete and then return the
# run's result
_wait_for_run_completion(job_id, run_id)
except retrying.RetryError as ex:
raise Exception("Job run failed - operation was not "
"completed in {} seconds.".format(timeout)) from ex
r = self.get('v2/deployments')
assert r.ok, 'status_code: {} content: {}'.format(r.status_code, r.content)
for deployment in r.json():
if deployment_id == deployment.get('id'):
log.info('Waiting for pod to be destroyed')
return False
log.info('Pod destroyed')
return True
r = self.delete('v2/pods' + pod_id)
assert r.ok, 'status_code: {} content: {}'.format(r.status_code, r.content)
try:
_destroy_pod_complete(r.headers['Marathon-Deployment-Id'])
except retrying.RetryError as ex:
raise Exception("Pod destroy failed - operation was not "
"completed in {} seconds.".format(timeout)) from ex
@staticmethod
def retry(method_to_check, retry_timeout=RETRY_TIMEOUT,
retry_interval=RETRY_INTERVAL):
return Retrying(stop_max_delay=retry_timeout * 1000,
wait_fixed=retry_interval * 1000).call(method_to_check)
def wrapped_f(*args, **kw):
if _retry_init:
rargs, rkw = _retry_init(dargs, dkw)
else:
rargs, rkw = dargs, dkw
return Retrying(*rargs, **rkw).call(_warn_about_exceptions(f), *args, **kw)
def call(self, fn, *args, **kwargs):
start_time = int(round(workflow_time.time() * 1000))
attempt_number = 1
while True:
try:
val = yield fn(*args, **kwargs)
attempt = retrying.Attempt(val, attempt_number, False)
except Exception:
val = sys.exc_info()
attempt = retrying.Attempt(val, attempt_number, True)
if not self.should_reject(attempt):
return_(attempt.get(self._wrap_exception))
delay_since_first_attempt_ms = int(round(workflow_time.time() * 1000)) - start_time
if self.stop(attempt_number, delay_since_first_attempt_ms):
if not self._wrap_exception and attempt.has_exception:
# get() on an attempt with an exception should cause it to be raised, but raise just in case
raise attempt.get()
else:
raise RetryError(attempt)
else:
# use ceil since SWF timer resolution is in seconds
sleep = self.wait(attempt_number, delay_since_first_attempt_ms)
yield workflow_time.sleep(ceil(sleep / 1000.0))