Dive into secure and efficient coding practices with our curated list of the top 10 examples showcasing 'celery' in functional components in Python. Our advanced machine learning engine meticulously scans each line of code, cross-referencing millions of open source libraries to ensure your implementation is not just functional, but also robust and secure. Elevate your React applications to new heights by mastering the art of handling side effects, API calls, and asynchronous operations with confidence and precision.
def sample(x, n, k=0):
"""Given a list `x` a sample of length ``n`` of that list is returned.
E.g. if `n` is 10, and `x` has 100 items, a list of every 10th
item is returned.
``k`` can be used as offset.
"""
j = len(x) // n
for _ in range(n):
try:
yield x[k]
except IndexError:
break
k += j
from __future__ import division
from time import sleep
from celery import Celery
from datetime import datetime, timedelta
from ..celery_dashboard import init
from ..celery_dashboard.utils import set_progress
celery_app = Celery('test_app', broker='redis://localhost', backend='redis://localhost')
celery_app.conf.update(accept_content=['json', 'pickle'],
CELERY_ACCEPT_CONTENT=['json', 'pickle'], # celery 3
worker_prefetch_multiplier=1)
init(celery_app, "postgresql://docker:docker@localhost:5432/docker", db_echo="debug")
@celery_app.task(name="retry_with_countdown", bind=True)
def retry_with_countdown(self, countdown):
self.retry(countdown=countdown)
@celery_app.task(name="retry_with_eta", bind=True)
def retry_with_eta(self, countdown):
self.retry(eta=datetime.utcnow() + timedelta(countdown))
def runtest(self, fun, n=50, index=0):
with blockdetection(self.block_timeout):
t = time()
i = 0
failed = False
marker('{0}: {1}({2})'.format(index, fun.__name__, n))
try:
for i in range(n):
print('{0} ({1})'.format(i, fun.__name__), end=' ')
try:
fun()
print('-> done')
except Exception as exc:
print('-> {}'.format(exc))
except Exception:
failed = True
raise
finally:
print('{0} {1} iterations in {2}s'.format(
'failed after' if failed else 'completed',
i + 1, humanize_seconds(time() - t),
))
def run(self, names=None, iterations=50, offset=0,
numtests=None, list_all=False, repeat=0, group='all',
diag=False, no_join=False, **kw):
self.no_join = no_join
self.fbi.enable(diag)
tests = self.filtertests(group, names)[offset:numtests or None]
if list_all:
return print(self.testlist(tests))
print(self.banner(tests))
print('+ Enabling events')
self.app.control.enable_events()
it = count() if repeat == Inf else range(int(repeat) or 1)
for i in it:
marker(
'Stresstest suite start (repetition {0})'.format(i + 1),
'+',
)
for j, test in enumerate(tests):
self.runtest(test, iterations, j + 1, i + 1)
marker(
'Stresstest suite end (repetition {0})'.format(i + 1),
'+',
)
def test_config3():
"""Test passing in config."""
c = Celery('mycurrent')
c.set_current()
app = Flask("myapp")
app.config.from_object(eager_conf)
celery = create_celery_app(app)
assert celery
assert celery.flask_app == app
assert celery.conf.CELERY_ALWAYS_EAGER
assert celery.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS
assert celery.conf.CELERY_RESULT_BACKEND == 'cache'
assert celery.conf.CELERY_CACHE_BACKEND == 'memory'
def test_without_request(self, monkeypatch):
async_result = pretend.stub()
apply_async = pretend.call_recorder(lambda *a, **kw: async_result)
get_current_request = pretend.call_recorder(lambda: None)
monkeypatch.setattr(tasks, "get_current_request", get_current_request)
task = tasks.WarehouseTask()
task.app = Celery()
monkeypatch.setattr(Task, "apply_async", apply_async)
assert task.apply_async() is async_result
assert apply_async.calls == [pretend.call(task)]
assert get_current_request.calls == [pretend.call()]
@task(name='simple_example', base=QueueOnce)
def simple_example():
return "simple"
def test_nested_group_chain(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])
c = chain(
add.si(1, 0),
group(
add.si(1, 100),
chain(
add.si(1, 200),
group(
add.si(1, 1000),
add.si(1, 2000),
),
),
),
add.si(1, 10),
)
res = c()
assert res.get(timeout=TIMEOUT) == 11
def test_chain_of_chords_with_two_tasks(self, manager):
try:
manager.app.backend.ensure_chords_allowed()
except NotImplementedError as e:
raise pytest.skip(e.args[0])
c = add.si(1, 0)
c = c | group(add.s(1), add.s(1))
c = c | tsum.s()
c = c | add.s(1)
c = c | chord(group(add.s(1), add.s(1)), tsum.s())
res = c()
assert res.get(timeout=TIMEOUT) == 12
def test_callbacks__only_groups(self, group_, maybe_signature):
sig1 = group([Mock(name='g1'), Mock(name='g2')], app=self.app)
sig2 = group([Mock(name='g3'), Mock(name='g4')], app=self.app)
sig1.apply_async = Mock(name='gapply')
sig2.apply_async = Mock(name='gapply')
request = {'callbacks': [sig1, sig2], 'root_id': 'root'}
def passt(s, *args, **kwargs):
return s
maybe_signature.side_effect = passt
retval, _ = self.trace(self.add, (2, 2), {}, request=request)
sig1.apply_async.assert_called_with(
(4,), parent_id='id-1', root_id='root', priority=None
)
sig2.apply_async.assert_called_with(
(4,), parent_id='id-1', root_id='root', priority=None
)