diff --git a/requirements.txt b/requirements.txt index 0beaec04..32159b61 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,5 @@ enum34==1.0 django-reversion==1.8.1 easy-thumbnails==2.0 +celery==3.1.12 +redis==2.10 diff --git a/settings/__init__.py b/settings/__init__.py index a43a344c..dd336c53 100644 --- a/settings/__init__.py +++ b/settings/__init__.py @@ -17,6 +17,8 @@ from __future__ import absolute_import, print_function import os, sys +from .celery import * + try: print("Trying import local.py settings...", file=sys.stderr) from .local import * diff --git a/settings/celery.py b/settings/celery.py new file mode 100644 index 00000000..227c52ae --- /dev/null +++ b/settings/celery.py @@ -0,0 +1,16 @@ +from kombu import Exchange, Queue + +BROKER_URL = 'amqp://guest:guest@localhost:5672//' +CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' + +CELERY_TIMEZONE = 'Europe/Madrid' +CELERY_ENABLE_UTC = True + +CELERY_DEFAULT_QUEUE = 'tasks' +CELERY_QUEUES = ( + Queue('tasks', routing_key='task.#'), + Queue('transient', routing_key='transient.#', delivery_mode=1) +) +CELERY_DEFAULT_EXCHANGE = 'tasks' +CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' +CELERY_DEFAULT_ROUTING_KEY = 'task.default' diff --git a/taiga/__init__.py b/taiga/__init__.py index e69de29b..a10bec52 100644 --- a/taiga/__init__.py +++ b/taiga/__init__.py @@ -0,0 +1 @@ +from . import celery diff --git a/taiga/celery.py b/taiga/celery.py new file mode 100644 index 00000000..01a93494 --- /dev/null +++ b/taiga/celery.py @@ -0,0 +1,12 @@ +import os + +from celery import Celery + +from django.conf import settings + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings') + +app = Celery('taiga') + +app.config_from_object('django.conf:settings') +app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, related_name="deferred") diff --git a/taiga/deferred.py b/taiga/deferred.py new file mode 100644 index 00000000..e7e9b3e8 --- /dev/null +++ b/taiga/deferred.py @@ -0,0 +1,43 @@ +from .celery import app + + +def defer(task: str, *args, **kwargs): + """Defer the execution of a task. + + Defer the execution of a task and returns a future objects with the following methods among + others: + - `failed()` Returns `True` if the task failed. + - `ready()` Returns `True` if the task has been executed. + - `forget()` Forget about the result. + - `get()` Wait until the task is ready and return its result. + - `result` When the task has been executed the result is in this attribute. + More info at Celery docs on `AsyncResult` object. + + :param task: Name of the task to execute. + + :return: A future object. + """ + return app.send_task(task, args, kwargs, routing_key="transient.deferred") + + +def call_async(task: str, *args, **kwargs): + """Run a task and ignore its result. + + This is just a star argument version of `apply_async`. + + :param task: Name of the task to execute. + :param args: Arguments for the task. + :param kwargs: Keyword arguments for the task. + """ + apply_async(task, args, kwargs) + + +def apply_async(task: str, args=None, kwargs=None, **options): + """Run a task and ignore its result. + + :param task: Name of the task to execute. + :param args: Tupple of arguments for the task. + :param kwargs: Dict of keyword arguments for the task. + :param options: Celery-specific options when running the task. See Celery docs on `apply_async` + """ + app.send_task(task, args, kwargs, **options) diff --git a/taiga/services/__init__.py b/taiga/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/taiga/services/celery.py b/taiga/services/celery.py new file mode 100644 index 00000000..f3a3b890 --- /dev/null +++ b/taiga/services/celery.py @@ -0,0 +1,8 @@ +from celery import Celery + +from django.conf import settings + +app = Celery('taiga') + +app.config_from_object('django.conf:settings') +app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) diff --git a/tests/unit/test_deferred.py b/tests/unit/test_deferred.py new file mode 100644 index 00000000..29836421 --- /dev/null +++ b/tests/unit/test_deferred.py @@ -0,0 +1,35 @@ +from unittest import mock +from taiga.deferred import defer, call_async, apply_async + + +@mock.patch("taiga.deferred.app") +def test_defer(app): + name = "task name" + args = (1, 2) + kwargs = {"kw": "keyword argument"} + + defer(name, *args, **kwargs) + + app.send_task.assert_called_once_with(name, args, kwargs, routing_key="transient.deferred") + + +@mock.patch("taiga.deferred.app") +def test_apply_async(app): + name = "task name" + args = (1, 2) + kwargs = {"kw": "keyword argument"} + + apply_async(name, args, kwargs) + + app.send_task.assert_called_once_with(name, args, kwargs) + + +@mock.patch("taiga.deferred.app") +def test_call_async(app): + name = "task name" + args = (1, 2) + kwargs = {"kw": "keyword argument"} + + call_async(name, *args, **kwargs) + + app.send_task.assert_called_once_with(name, args, kwargs)