diff --git a/requirements.txt b/requirements.txt index 0beaec04..32159b61 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,5 @@ enum34==1.0 django-reversion==1.8.1 easy-thumbnails==2.0 +celery==3.1.12 +redis==2.10 diff --git a/settings/__init__.py b/settings/__init__.py index a43a344c..dd336c53 100644 --- a/settings/__init__.py +++ b/settings/__init__.py @@ -17,6 +17,8 @@ from __future__ import absolute_import, print_function import os, sys +from .celery import * + try: print("Trying import local.py settings...", file=sys.stderr) from .local import * diff --git a/settings/celery.py b/settings/celery.py new file mode 100644 index 00000000..227c52ae --- /dev/null +++ b/settings/celery.py @@ -0,0 +1,16 @@ +from kombu import Exchange, Queue + +BROKER_URL = 'amqp://guest:guest@localhost:5672//' +CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' + +CELERY_TIMEZONE = 'Europe/Madrid' +CELERY_ENABLE_UTC = True + +CELERY_DEFAULT_QUEUE = 'tasks' +CELERY_QUEUES = ( + Queue('tasks', routing_key='task.#'), + Queue('transient', routing_key='transient.#', delivery_mode=1) +) +CELERY_DEFAULT_EXCHANGE = 'tasks' +CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' +CELERY_DEFAULT_ROUTING_KEY = 'task.default' diff --git a/settings/testing.py b/settings/testing.py index b7756fa3..4448be54 100644 --- a/settings/testing.py +++ b/settings/testing.py @@ -18,3 +18,5 @@ from .development import * SKIP_SOUTH_TESTS = True SOUTH_TESTS_MIGRATE = False + +CELERY_ALWAYS_EAGER = True diff --git a/taiga/__init__.py b/taiga/__init__.py index e69de29b..a10bec52 100644 --- a/taiga/__init__.py +++ b/taiga/__init__.py @@ -0,0 +1 @@ +from . import celery diff --git a/taiga/celery.py b/taiga/celery.py new file mode 100644 index 00000000..01a93494 --- /dev/null +++ b/taiga/celery.py @@ -0,0 +1,12 @@ +import os + +from celery import Celery + +from django.conf import settings + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings') + +app = Celery('taiga') + +app.config_from_object('django.conf:settings') +app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, related_name="deferred") diff --git a/taiga/deferred.py b/taiga/deferred.py new file mode 100644 index 00000000..eec20909 --- /dev/null +++ b/taiga/deferred.py @@ -0,0 +1,51 @@ +from django.conf import settings + +from .celery import app + + +def _send_task(task, args, kwargs, **options): + if settings.CELERY_ALWAYS_EAGER: + return app.tasks[task].apply(args, kwargs, **options) + return app.send_task(task, args, kwargs, **options) + + +def defer(task: str, *args, **kwargs): + """Defer the execution of a task. + + Defer the execution of a task and returns a future objects with the following methods among + others: + - `failed()` Returns `True` if the task failed. + - `ready()` Returns `True` if the task has been executed. + - `forget()` Forget about the result. + - `get()` Wait until the task is ready and return its result. + - `result` When the task has been executed the result is in this attribute. + More info at Celery docs on `AsyncResult` object. + + :param task: Name of the task to execute. + + :return: A future object. + """ + return _send_task(task, args, kwargs, routing_key="transient.deferred") + + +def call_async(task: str, *args, **kwargs): + """Run a task and ignore its result. + + This is just a star argument version of `apply_async`. + + :param task: Name of the task to execute. + :param args: Arguments for the task. + :param kwargs: Keyword arguments for the task. + """ + apply_async(task, args, kwargs) + + +def apply_async(task: str, args=None, kwargs=None, **options): + """Run a task and ignore its result. + + :param task: Name of the task to execute. + :param args: Tupple of arguments for the task. + :param kwargs: Dict of keyword arguments for the task. + :param options: Celery-specific options when running the task. See Celery docs on `apply_async` + """ + _send_task(task, args, kwargs, **options) diff --git a/taiga/services/__init__.py b/taiga/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/taiga/services/celery.py b/taiga/services/celery.py new file mode 100644 index 00000000..f3a3b890 --- /dev/null +++ b/taiga/services/celery.py @@ -0,0 +1,8 @@ +from celery import Celery + +from django.conf import settings + +app = Celery('taiga') + +app.config_from_object('django.conf:settings') +app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) diff --git a/tests/unit/test_deferred.py b/tests/unit/test_deferred.py new file mode 100644 index 00000000..e71e75d2 --- /dev/null +++ b/tests/unit/test_deferred.py @@ -0,0 +1,49 @@ +from unittest import mock + +from taiga import celery +from taiga.deferred import defer, call_async, apply_async + +from ..utils import set_settings + + +@set_settings(CELERY_ALWAYS_EAGER=False) +@mock.patch("taiga.deferred.app") +def test_defer(app): + name = "task name" + args = (1, 2) + kwargs = {"kw": "keyword argument"} + + defer(name, *args, **kwargs) + + app.send_task.assert_called_once_with(name, args, kwargs, routing_key="transient.deferred") + + +@set_settings(CELERY_ALWAYS_EAGER=False) +@mock.patch("taiga.deferred.app") +def test_apply_async(app): + name = "task name" + args = (1, 2) + kwargs = {"kw": "keyword argument"} + + apply_async(name, args, kwargs) + + app.send_task.assert_called_once_with(name, args, kwargs) + + +@set_settings(CELERY_ALWAYS_EAGER=False) +@mock.patch("taiga.deferred.app") +def test_call_async(app): + name = "task name" + args = (1, 2) + kwargs = {"kw": "keyword argument"} + + call_async(name, *args, **kwargs) + + app.send_task.assert_called_once_with(name, args, kwargs) + + +@set_settings(CELERY_ALWAYS_EAGER=True) +def test_task_invocation(): + celery.app.task(name="_test_task")(lambda: 1) + + assert defer("_test_task").get() == 1 diff --git a/tests/utils.py b/tests/utils.py index b20f2bce..8367b2cf 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -14,7 +14,9 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import functools +from django.conf import settings from django.db.models import signals @@ -34,3 +36,53 @@ def signals_switch(): disconnect_signals, reconnect_signals = signals_switch() + + +def set_settings(**new_settings): + """Decorator for set django settings that will be only available during the + wrapped-function execution. + + For example: + @set_settings(FOO='bar') + def myfunc(): + ... + + @set_settings(FOO='bar') + class TestCase: + ... + """ + def decorator(testcase): + if type(testcase) is type: + namespace = {"OVERRIDE_SETTINGS": new_settings, "ORIGINAL_SETTINGS": {}} + wrapper = type(testcase.__name__, (SettingsTestCase, testcase), namespace) + else: + @functools.wraps(testcase) + def wrapper(*args, **kwargs): + old_settings = override_settings(new_settings) + try: + testcase(*args, **kwargs) + finally: + override_settings(old_settings) + + return wrapper + + return decorator + + +def override_settings(new_settings): + old_settings = {} + for name, new_value in new_settings.items(): + old_settings[name] = getattr(settings, name, None) + setattr(settings, name, new_value) + return old_settings + + +class SettingsTestCase(object): + @classmethod + def setup_class(cls): + cls.ORIGINAL_SETTINGS = override_settings(cls.OVERRIDE_SETTINGS) + + @classmethod + def teardown_class(cls): + override_settings(cls.ORIGINAL_SETTINGS) + cls.OVERRIDE_SETTINGS.clear()