Merge pull request #54 from taigaio/taskqueues

Integration with RabbitMQ and Celery
remotes/origin/enhancement/email-actions
Andrey Antukh 2014-07-03 09:21:07 +02:00
commit ec398a1d43
11 changed files with 195 additions and 0 deletions

View File

@ -26,3 +26,5 @@ enum34==1.0
django-reversion==1.8.1 django-reversion==1.8.1
easy-thumbnails==2.0 easy-thumbnails==2.0
celery==3.1.12
redis==2.10

View File

@ -17,6 +17,8 @@
from __future__ import absolute_import, print_function from __future__ import absolute_import, print_function
import os, sys import os, sys
from .celery import *
try: try:
print("Trying import local.py settings...", file=sys.stderr) print("Trying import local.py settings...", file=sys.stderr)
from .local import * from .local import *

16
settings/celery.py Normal file
View File

@ -0,0 +1,16 @@
from kombu import Exchange, Queue
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TIMEZONE = 'Europe/Madrid'
CELERY_ENABLE_UTC = True
CELERY_DEFAULT_QUEUE = 'tasks'
CELERY_QUEUES = (
Queue('tasks', routing_key='task.#'),
Queue('transient', routing_key='transient.#', delivery_mode=1)
)
CELERY_DEFAULT_EXCHANGE = 'tasks'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'task.default'

View File

@ -18,3 +18,5 @@ from .development import *
SKIP_SOUTH_TESTS = True SKIP_SOUTH_TESTS = True
SOUTH_TESTS_MIGRATE = False SOUTH_TESTS_MIGRATE = False
CELERY_ALWAYS_EAGER = True

View File

@ -0,0 +1 @@
from . import celery

12
taiga/celery.py Normal file
View File

@ -0,0 +1,12 @@
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
app = Celery('taiga')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, related_name="deferred")

51
taiga/deferred.py Normal file
View File

@ -0,0 +1,51 @@
from django.conf import settings
from .celery import app
def _send_task(task, args, kwargs, **options):
if settings.CELERY_ALWAYS_EAGER:
return app.tasks[task].apply(args, kwargs, **options)
return app.send_task(task, args, kwargs, **options)
def defer(task: str, *args, **kwargs):
"""Defer the execution of a task.
Defer the execution of a task and returns a future objects with the following methods among
others:
- `failed()` Returns `True` if the task failed.
- `ready()` Returns `True` if the task has been executed.
- `forget()` Forget about the result.
- `get()` Wait until the task is ready and return its result.
- `result` When the task has been executed the result is in this attribute.
More info at Celery docs on `AsyncResult` object.
:param task: Name of the task to execute.
:return: A future object.
"""
return _send_task(task, args, kwargs, routing_key="transient.deferred")
def call_async(task: str, *args, **kwargs):
"""Run a task and ignore its result.
This is just a star argument version of `apply_async`.
:param task: Name of the task to execute.
:param args: Arguments for the task.
:param kwargs: Keyword arguments for the task.
"""
apply_async(task, args, kwargs)
def apply_async(task: str, args=None, kwargs=None, **options):
"""Run a task and ignore its result.
:param task: Name of the task to execute.
:param args: Tupple of arguments for the task.
:param kwargs: Dict of keyword arguments for the task.
:param options: Celery-specific options when running the task. See Celery docs on `apply_async`
"""
_send_task(task, args, kwargs, **options)

View File

8
taiga/services/celery.py Normal file
View File

@ -0,0 +1,8 @@
from celery import Celery
from django.conf import settings
app = Celery('taiga')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

View File

@ -0,0 +1,49 @@
from unittest import mock
from taiga import celery
from taiga.deferred import defer, call_async, apply_async
from ..utils import set_settings
@set_settings(CELERY_ALWAYS_EAGER=False)
@mock.patch("taiga.deferred.app")
def test_defer(app):
name = "task name"
args = (1, 2)
kwargs = {"kw": "keyword argument"}
defer(name, *args, **kwargs)
app.send_task.assert_called_once_with(name, args, kwargs, routing_key="transient.deferred")
@set_settings(CELERY_ALWAYS_EAGER=False)
@mock.patch("taiga.deferred.app")
def test_apply_async(app):
name = "task name"
args = (1, 2)
kwargs = {"kw": "keyword argument"}
apply_async(name, args, kwargs)
app.send_task.assert_called_once_with(name, args, kwargs)
@set_settings(CELERY_ALWAYS_EAGER=False)
@mock.patch("taiga.deferred.app")
def test_call_async(app):
name = "task name"
args = (1, 2)
kwargs = {"kw": "keyword argument"}
call_async(name, *args, **kwargs)
app.send_task.assert_called_once_with(name, args, kwargs)
@set_settings(CELERY_ALWAYS_EAGER=True)
def test_task_invocation():
celery.app.task(name="_test_task")(lambda: 1)
assert defer("_test_task").get() == 1

View File

@ -14,7 +14,9 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import functools
from django.conf import settings
from django.db.models import signals from django.db.models import signals
@ -34,3 +36,53 @@ def signals_switch():
disconnect_signals, reconnect_signals = signals_switch() disconnect_signals, reconnect_signals = signals_switch()
def set_settings(**new_settings):
"""Decorator for set django settings that will be only available during the
wrapped-function execution.
For example:
@set_settings(FOO='bar')
def myfunc():
...
@set_settings(FOO='bar')
class TestCase:
...
"""
def decorator(testcase):
if type(testcase) is type:
namespace = {"OVERRIDE_SETTINGS": new_settings, "ORIGINAL_SETTINGS": {}}
wrapper = type(testcase.__name__, (SettingsTestCase, testcase), namespace)
else:
@functools.wraps(testcase)
def wrapper(*args, **kwargs):
old_settings = override_settings(new_settings)
try:
testcase(*args, **kwargs)
finally:
override_settings(old_settings)
return wrapper
return decorator
def override_settings(new_settings):
old_settings = {}
for name, new_value in new_settings.items():
old_settings[name] = getattr(settings, name, None)
setattr(settings, name, new_value)
return old_settings
class SettingsTestCase(object):
@classmethod
def setup_class(cls):
cls.ORIGINAL_SETTINGS = override_settings(cls.OVERRIDE_SETTINGS)
@classmethod
def teardown_class(cls):
override_settings(cls.ORIGINAL_SETTINGS)
cls.OVERRIDE_SETTINGS.clear()