Migration from celery3 to celery4

remotes/origin/issue/4217/improving-mail-design
Jesús Espino 2017-03-01 13:27:58 +01:00 committed by David Barragán Merino
parent 71e810b446
commit fd658895cb
17 changed files with 46 additions and 179 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@
*.log *.log
taiga/search taiga/search
settings/local.py settings/local.py
settings/celery_local.py
database.sqlite database.sqlite
logs logs
media media

View File

@ -21,7 +21,7 @@ requests-oauthlib==0.6.2
webcolors==1.5 webcolors==1.5
django-sr==0.0.4 django-sr==0.0.4
easy-thumbnails==2.3 easy-thumbnails==2.3
celery==3.1.24 celery==4.0.2
redis==2.10.5 redis==2.10.5
Unidecode==0.04.19 Unidecode==0.04.19
raven==5.32.0 raven==5.32.0

View File

@ -16,21 +16,22 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from kombu import Exchange, Queue from kombu import Queue
BROKER_URL = 'amqp://guest:guest@localhost:5672//' broker_url = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' result_backend = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['pickle',] # Values are 'pickle', 'json', 'msgpack' and 'yaml' accept_content = ['pickle',] # Values are 'pickle', 'json', 'msgpack' and 'yaml'
task_serializer = "pickle"
result_serializer = "pickle"
CELERY_TIMEZONE = 'Europe/Madrid' timezone = 'Europe/Madrid'
CELERY_ENABLE_UTC = True
CELERY_DEFAULT_QUEUE = 'tasks' task_default_queue = 'tasks'
CELERY_QUEUES = ( task_queues = (
Queue('tasks', routing_key='task.#'), Queue('tasks', routing_key='task.#'),
Queue('transient', routing_key='transient.#', delivery_mode=1) Queue('transient', routing_key='transient.#', delivery_mode=1)
) )
CELERY_DEFAULT_EXCHANGE = 'tasks' task_default_exchange = 'tasks'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' task_default_exchange_type = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'task.default' task_default_routing_key = 'task.default'

View File

@ -0,0 +1,4 @@
from .celery import *
# To use celery in memory
#task_always_eager = True

View File

@ -157,13 +157,10 @@ DATABASES = {
######################################### #########################################
## CELERY ## CELERY
######################################### #########################################
# Set to True to enable celery and work in async mode or False
#from .celery import * # to disable it and work in sync mode. You can find the celery
# settings in settings/celery.py and settings/celery-local.py
#CELERY_ENABLED = True #CELERY_ENABLED = True
#
# To use celery in memory
#CELERY_ENABLED = True
#CELERY_ALWAYS_EAGER = True
######################################### #########################################

View File

@ -19,7 +19,6 @@
from .development import * from .development import *
CELERY_ENABLED = False CELERY_ENABLED = False
CELERY_ALWAYS_EAGER = True
MEDIA_ROOT = "/tmp" MEDIA_ROOT = "/tmp"

View File

@ -20,11 +20,15 @@ import os
from celery import Celery from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings') os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
app = Celery('taiga') from django.conf import settings
app.config_from_object('django.conf:settings') try:
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, related_name="deferred") from settings import celery_local as celery_settings
except ImportError:
from settings import celery as celery_settings
app = Celery('taiga')
app.config_from_object(celery_settings)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

View File

@ -1,69 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014-2017 Andrey Antukh <niwi@niwi.nz>
# Copyright (C) 2014-2017 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014-2017 David Barragán <bameda@dbarragan.com>
# Copyright (C) 2014-2017 Alejandro Alonso <alejandro.alonso@kaleidos.net>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.conf import settings
from .celery import app
def _send_task(task, args, kwargs, **options):
if settings.CELERY_ALWAYS_EAGER:
return app.tasks[task].apply(args, kwargs, **options)
return app.send_task(task, args, kwargs, **options)
def defer(task: str, *args, **kwargs):
"""Defer the execution of a task.
Defer the execution of a task and returns a future objects with the following methods among
others:
- `failed()` Returns `True` if the task failed.
- `ready()` Returns `True` if the task has been executed.
- `forget()` Forget about the result.
- `get()` Wait until the task is ready and return its result.
- `result` When the task has been executed the result is in this attribute.
More info at Celery docs on `AsyncResult` object.
:param task: Name of the task to execute.
:return: A future object.
"""
return _send_task(task, args, kwargs, routing_key="transient.deferred")
def call_async(task: str, *args, **kwargs):
"""Run a task and ignore its result.
This is just a star argument version of `apply_async`.
:param task: Name of the task to execute.
:param args: Arguments for the task.
:param kwargs: Keyword arguments for the task.
"""
apply_async(task, args, kwargs)
def apply_async(task: str, args=None, kwargs=None, **options):
"""Run a task and ignore its result.
:param task: Name of the task to execute.
:param args: Tupple of arguments for the task.
:param kwargs: Dict of keyword arguments for the task.
:param options: Celery-specific options when running the task. See Celery docs on `apply_async`
"""
_send_task(task, args, kwargs, **options)

View File

@ -36,3 +36,5 @@ def pytest_runtest_setup(item):
def pytest_configure(config): def pytest_configure(config):
django.setup() django.setup()
from taiga.celery import app
app.conf.task_always_eager = True

View File

@ -40,8 +40,6 @@ def test_invalid_project_export(client):
def test_valid_project_export_with_celery_disabled(client, settings): def test_valid_project_export_with_celery_disabled(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create() user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user) project = f.ProjectFactory.create(owner=user)
f.MembershipFactory(project=project, user=user, is_admin=True) f.MembershipFactory(project=project, user=user, is_admin=True)
@ -57,8 +55,6 @@ def test_valid_project_export_with_celery_disabled(client, settings):
def test_valid_project_export_with_celery_disabled_and_gzip(client, settings): def test_valid_project_export_with_celery_disabled_and_gzip(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create() user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user) project = f.ProjectFactory.create(owner=user)
f.MembershipFactory(project=project, user=user, is_admin=True) f.MembershipFactory(project=project, user=user, is_admin=True)
@ -93,6 +89,7 @@ def test_valid_project_export_with_celery_enabled(client, settings):
args = (project.id, project.slug, response_data["export_id"], "plain") args = (project.id, project.slug, response_data["export_id"], "plain")
kwargs = {"countdown": settings.EXPORTS_TTL} kwargs = {"countdown": settings.EXPORTS_TTL}
delete_project_dump_mock.apply_async.assert_called_once_with(args, **kwargs) delete_project_dump_mock.apply_async.assert_called_once_with(args, **kwargs)
settings.CELERY_ENABLED = False
def test_valid_project_export_with_celery_enabled_and_gzip(client, settings): def test_valid_project_export_with_celery_enabled_and_gzip(client, settings):
@ -115,10 +112,10 @@ def test_valid_project_export_with_celery_enabled_and_gzip(client, settings):
args = (project.id, project.slug, response_data["export_id"], "gzip") args = (project.id, project.slug, response_data["export_id"], "gzip")
kwargs = {"countdown": settings.EXPORTS_TTL} kwargs = {"countdown": settings.EXPORTS_TTL}
delete_project_dump_mock.apply_async.assert_called_once_with(args, **kwargs) delete_project_dump_mock.apply_async.assert_called_once_with(args, **kwargs)
settings.CELERY_ENABLED = False
def test_valid_project_with_throttling(client, settings): def test_valid_project_with_throttling(client, settings):
settings.CELERY_ENABLED = False
settings.REST_FRAMEWORK["DEFAULT_THROTTLE_RATES"]["import-dump-mode"] = "1/minute" settings.REST_FRAMEWORK["DEFAULT_THROTTLE_RATES"]["import-dump-mode"] = "1/minute"
user = f.UserFactory.create() user = f.UserFactory.create()

View File

@ -1147,7 +1147,6 @@ def test_invalid_dump_import(client):
def test_valid_dump_import_without_enough_public_projects_slots(client, settings): def test_valid_dump_import_without_enough_public_projects_slots(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create(max_public_projects=0) user = f.UserFactory.create(max_public_projects=0)
client.login(user) client.login(user)
@ -1170,7 +1169,6 @@ def test_valid_dump_import_without_enough_public_projects_slots(client, settings
def test_valid_dump_import_without_enough_private_projects_slots(client, settings): def test_valid_dump_import_without_enough_private_projects_slots(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create(max_private_projects=0) user = f.UserFactory.create(max_private_projects=0)
client.login(user) client.login(user)
@ -1193,7 +1191,6 @@ def test_valid_dump_import_without_enough_private_projects_slots(client, setting
def test_valid_dump_import_without_enough_membership_private_project_slots_one_project(client, settings): def test_valid_dump_import_without_enough_membership_private_project_slots_one_project(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create(max_memberships_private_projects=5) user = f.UserFactory.create(max_memberships_private_projects=5)
client.login(user) client.login(user)
@ -1241,7 +1238,6 @@ def test_valid_dump_import_without_enough_membership_private_project_slots_one_p
def test_valid_dump_import_without_enough_membership_public_project_slots_one_project(client, settings): def test_valid_dump_import_without_enough_membership_public_project_slots_one_project(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create(max_memberships_public_projects=5) user = f.UserFactory.create(max_memberships_public_projects=5)
client.login(user) client.login(user)
@ -1289,7 +1285,6 @@ def test_valid_dump_import_without_enough_membership_public_project_slots_one_pr
def test_valid_dump_import_with_enough_membership_private_project_slots_multiple_projects(client, settings): def test_valid_dump_import_with_enough_membership_private_project_slots_multiple_projects(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create(max_memberships_private_projects=10) user = f.UserFactory.create(max_memberships_private_projects=10)
project = f.ProjectFactory.create(owner=user) project = f.ProjectFactory.create(owner=user)
@ -1344,8 +1339,6 @@ def test_valid_dump_import_with_enough_membership_private_project_slots_multiple
def test_valid_dump_import_with_enough_membership_public_project_slots_multiple_projects(client, settings): def test_valid_dump_import_with_enough_membership_public_project_slots_multiple_projects(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create(max_memberships_public_projects=10) user = f.UserFactory.create(max_memberships_public_projects=10)
project = f.ProjectFactory.create(owner=user) project = f.ProjectFactory.create(owner=user)
f.MembershipFactory.create(project=project) f.MembershipFactory.create(project=project)
@ -1400,7 +1393,6 @@ def test_valid_dump_import_with_enough_membership_public_project_slots_multiple_
def test_valid_dump_import_with_the_limit_of_membership_whit_you_for_private_project(client, settings): def test_valid_dump_import_with_the_limit_of_membership_whit_you_for_private_project(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create(max_memberships_private_projects=5) user = f.UserFactory.create(max_memberships_private_projects=5)
client.login(user) client.login(user)
@ -1443,7 +1435,6 @@ def test_valid_dump_import_with_the_limit_of_membership_whit_you_for_private_pro
def test_valid_dump_import_with_the_limit_of_membership_whit_you_for_public_project(client, settings): def test_valid_dump_import_with_the_limit_of_membership_whit_you_for_public_project(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create(max_memberships_public_projects=5) user = f.UserFactory.create(max_memberships_public_projects=5)
client.login(user) client.login(user)
@ -1486,8 +1477,6 @@ def test_valid_dump_import_with_the_limit_of_membership_whit_you_for_public_proj
def test_valid_dump_import_with_celery_disabled(client, settings): def test_valid_dump_import_with_celery_disabled(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create() user = f.UserFactory.create()
client.login(user) client.login(user)
@ -1508,7 +1497,6 @@ def test_valid_dump_import_with_celery_disabled(client, settings):
def test_invalid_dump_import_with_celery_disabled(client, settings): def test_invalid_dump_import_with_celery_disabled(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create(max_memberships_public_projects=5) user = f.UserFactory.create(max_memberships_public_projects=5)
client.login(user) client.login(user)
@ -1568,6 +1556,7 @@ def test_valid_dump_import_with_celery_enabled(client, settings):
assert response.status_code == 202 assert response.status_code == 202
assert "import_id" in response.data assert "import_id" in response.data
assert Project.objects.filter(slug="valid-project").count() == 1 assert Project.objects.filter(slug="valid-project").count() == 1
settings.CELERY_ENABLED = False
def test_invalid_dump_import_with_celery_enabled(client, settings): def test_invalid_dump_import_with_celery_enabled(client, settings):
@ -1611,6 +1600,7 @@ def test_invalid_dump_import_with_celery_enabled(client, settings):
assert response.status_code == 202 assert response.status_code == 202
assert "import_id" in response.data assert "import_id" in response.data
assert Project.objects.filter(slug="invalid-project").count() == 0 assert Project.objects.filter(slug="invalid-project").count() == 0
settings.CELERY_ENABLED = False
def test_dump_import_throttling(client, settings): def test_dump_import_throttling(client, settings):

View File

@ -207,6 +207,7 @@ def test_import_asana_project_without_project_id(client, settings):
response = client.post(url, content_type="application/json", data=json.dumps({"token": "token"})) response = client.post(url, content_type="application/json", data=json.dumps({"token": "token"}))
assert response.status_code == 400 assert response.status_code == 400
settings.CELERY_ENABLED = False
def test_import_asana_project_with_celery_enabled(client, settings): def test_import_asana_project_with_celery_enabled(client, settings):
@ -226,11 +227,10 @@ def test_import_asana_project_with_celery_enabled(client, settings):
assert response.status_code == 202 assert response.status_code == 202
assert "task_id" in response.data assert "task_id" in response.data
settings.CELERY_ENABLED = False
def test_import_asana_project_with_celery_disabled(client, settings): def test_import_asana_project_with_celery_disabled(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create() user = f.UserFactory.create()
project = f.ProjectFactory.create(slug="imported-project") project = f.ProjectFactory.create(slug="imported-project")
client.login(user) client.login(user)

View File

@ -195,6 +195,7 @@ def test_import_github_project_without_project_id(client, settings):
response = client.post(url, content_type="application/json", data=json.dumps({"token": "token"})) response = client.post(url, content_type="application/json", data=json.dumps({"token": "token"}))
assert response.status_code == 400 assert response.status_code == 400
settings.CELERY_ENABLED = False
def test_import_github_project_with_celery_enabled(client, settings): def test_import_github_project_with_celery_enabled(client, settings):
@ -214,11 +215,10 @@ def test_import_github_project_with_celery_enabled(client, settings):
assert response.status_code == 202 assert response.status_code == 202
assert "task_id" in response.data assert "task_id" in response.data
settings.CELERY_ENABLED = False
def test_import_github_project_with_celery_disabled(client, settings): def test_import_github_project_with_celery_disabled(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create() user = f.UserFactory.create()
project = f.ProjectFactory.create(slug="imported-project") project = f.ProjectFactory.create(slug="imported-project")
client.login(user) client.login(user)

View File

@ -203,6 +203,7 @@ def test_import_jira_project_without_project_id(client, settings):
response = client.post(url, content_type="application/json", data=json.dumps({"token": "access.secret", "url": "http://jiraserver"})) response = client.post(url, content_type="application/json", data=json.dumps({"token": "access.secret", "url": "http://jiraserver"}))
assert response.status_code == 400 assert response.status_code == 400
settings.CELERY_ENABLED = False
def test_import_jira_project_without_url(client, settings): def test_import_jira_project_without_url(client, settings):
@ -217,6 +218,7 @@ def test_import_jira_project_without_url(client, settings):
response = client.post(url, content_type="application/json", data=json.dumps({"token": "access.secret", "project_id": 1})) response = client.post(url, content_type="application/json", data=json.dumps({"token": "access.secret", "project_id": 1}))
assert response.status_code == 400 assert response.status_code == 400
settings.CELERY_ENABLED = False
def test_import_jira_project_with_celery_enabled(client, settings): def test_import_jira_project_with_celery_enabled(client, settings):
@ -236,11 +238,10 @@ def test_import_jira_project_with_celery_enabled(client, settings):
assert response.status_code == 202 assert response.status_code == 202
assert "task_id" in response.data assert "task_id" in response.data
settings.CELERY_ENABLED = False
def test_import_jira_project_with_celery_disabled(client, settings): def test_import_jira_project_with_celery_disabled(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create() user = f.UserFactory.create()
project = f.ProjectFactory.create(slug="imported-project") project = f.ProjectFactory.create(slug="imported-project")
client.login(user) client.login(user)

View File

@ -203,6 +203,7 @@ def test_import_trello_project_without_project_id(client, settings):
response = client.post(url, content_type="application/json", data=json.dumps({"token": "token"})) response = client.post(url, content_type="application/json", data=json.dumps({"token": "token"}))
assert response.status_code == 400 assert response.status_code == 400
settings.CELERY_ENABLED = False
def test_import_trello_project_with_celery_enabled(client, settings): def test_import_trello_project_with_celery_enabled(client, settings):
@ -222,11 +223,10 @@ def test_import_trello_project_with_celery_enabled(client, settings):
assert response.status_code == 202 assert response.status_code == 202
assert "task_id" in response.data assert "task_id" in response.data
settings.CELERY_ENABLED = False
def test_import_trello_project_with_celery_disabled(client, settings): def test_import_trello_project_with_celery_disabled(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create() user = f.UserFactory.create()
project = f.ProjectFactory.create(slug="imported-project") project = f.ProjectFactory.create(slug="imported-project")
client.login(user) client.login(user)

View File

@ -1861,11 +1861,10 @@ def test_delete_project_with_celery_enabled(client, settings):
assert project.memberships.count() == 0 assert project.memberships.count() == 0
assert project.blocked_code == BLOCKED_BY_DELETING assert project.blocked_code == BLOCKED_BY_DELETING
delete_project_mock.delay.assert_called_once_with(project.id) delete_project_mock.delay.assert_called_once_with(project.id)
settings.CELERY_ENABLED = False
def test_delete_project_with_celery_disabled(client, settings): def test_delete_project_with_celery_disabled(client, settings):
settings.CELERY_ENABLED = False
user = f.UserFactory.create() user = f.UserFactory.create()
project = f.ProjectFactory.create(owner=user) project = f.ProjectFactory.create(owner=user)
role = f.RoleFactory.create(project=project, permissions=["view_project"]) role = f.RoleFactory.create(project=project, permissions=["view_project"])

View File

@ -1,59 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014-2017 Andrey Antukh <niwi@niwi.nz>
# Copyright (C) 2014-2017 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014-2017 David Barragán <bameda@dbarragan.com>
# Copyright (C) 2014-2017 Alejandro Alonso <alejandro.alonso@kaleidos.net>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from unittest import mock
from taiga import celery
from taiga.deferred import defer, call_async, apply_async
def test_defer():
# settings.CELERY_ALWAYS_EAGER = True
name = "task name"
args = (1, 2)
kwargs = {"kw": "keyword argument"}
with mock.patch("taiga.deferred.app") as app:
defer(name, *args, **kwargs)
app.tasks[name].apply.assert_called_once_with(args, kwargs,
routing_key="transient.deferred")
def test_apply_async():
name = "task name"
args = (1, 2)
kwargs = {"kw": "keyword argument"}
with mock.patch("taiga.deferred.app") as app:
apply_async(name, args, kwargs)
app.tasks[name].apply.assert_called_once_with(args, kwargs)
def test_call_async():
name = "task name"
args = (1, 2)
kwargs = {"kw": "keyword argument"}
with mock.patch("taiga.deferred.app") as app:
call_async(name, *args, **kwargs)
app.tasks[name].apply.assert_called_once_with(args, kwargs)
def test_task_invocation():
celery.app.task(name="_test_task")(lambda: 1)
assert defer("_test_task").get() == 1