taiga-back/taiga/importers/asana/api.py

136 lines
5.4 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2014-2016 Taiga Agile LLC <support@taiga.io>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.utils.translation import ugettext as _
from django.conf import settings
from taiga.base.api import viewsets
from taiga.base import response
from taiga.base import exceptions as exc
from taiga.base.decorators import list_route
from taiga.users.models import AuthData, User
from taiga.users.services import get_user_photo_url
from taiga.users.gravatar import get_user_gravatar_id
from taiga.projects.serializers import ProjectSerializer
from taiga.importers import permissions, exceptions
from .importer import AsanaImporter
from . import tasks
class AsanaImporterViewSet(viewsets.ViewSet):
permission_classes = (permissions.ImporterPermission,)
@list_route(methods=["POST"])
def list_users(self, request, *args, **kwargs):
self.check_permissions(request, "list_users", None)
token = request.DATA.get('token', None)
project_id = request.DATA.get('project', None)
if not project_id:
raise exc.WrongArguments(_("The project param is needed"))
importer = AsanaImporter(request.user, token)
try:
users = importer.list_users(project_id)
except exceptions.InvalidRequest:
raise exc.BadRequest(_('Invalid asana api request'))
except exceptions.FailedRequest:
raise exc.BadRequest(_('Failed to make the request to asana api'))
for user in users:
if user['detected_user']:
user['user'] = {
'id': user['detected_user'].id,
'full_name': user['detected_user'].get_full_name(),
'gravatar_id': get_user_gravatar_id(user['detected_user']),
'photo': get_user_photo_url(user['detected_user']),
}
del(user['detected_user'])
return response.Ok(users)
@list_route(methods=["POST"])
def list_projects(self, request, *args, **kwargs):
self.check_permissions(request, "list_projects", None)
token = request.DATA.get('token', None)
importer = AsanaImporter(request.user, token)
try:
projects = importer.list_projects()
except exceptions.InvalidRequest:
raise exc.BadRequest(_('Invalid asana api request'))
except exceptions.FailedRequest:
raise exc.BadRequest(_('Failed to make the request to asana api'))
return response.Ok(projects)
@list_route(methods=["POST"])
def import_project(self, request, *args, **kwargs):
self.check_permissions(request, "import_project", None)
token = request.DATA.get('token', None)
project_id = request.DATA.get('project', None)
if not project_id:
raise exc.WrongArguments(_("The project param is needed"))
options = {
"name": request.DATA.get('name', None),
"description": request.DATA.get('description', None),
"template": request.DATA.get('template', "scrum"),
"users_bindings": request.DATA.get("users_bindings", {}),
"keep_external_reference": request.DATA.get("keep_external_reference", False),
"is_private": request.DATA.get("is_private", False),
}
if settings.CELERY_ENABLED:
task = tasks.import_project.delay(request.user.id, token, project_id, options)
return response.Accepted({"task_id": task.id})
importer = AsanaImporter(request.user, token)
project = importer.import_project(project_id, options)
project_data = {
"slug": project.slug,
"my_permissions": ["view_us"],
"is_backlog_activated": project.is_backlog_activated,
"is_kanban_activated": project.is_kanban_activated,
}
return response.Ok(project_data)
@list_route(methods=["GET"])
def auth_url(self, request, *args, **kwargs):
self.check_permissions(request, "auth_url", None)
url = AsanaImporter.get_auth_url(settings.ASANA_APP_ID, settings.ASANA_APP_SECRET, settings.ASANA_APP_CALLBACK_URL)
return response.Ok({"url": url})
@list_route(methods=["POST"])
def authorize(self, request, *args, **kwargs):
self.check_permissions(request, "authorize", None)
code = request.DATA.get('code', None)
if code is None:
raise exc.BadRequest(_("Code param needed"))
try:
asana_token = AsanaImporter.get_access_token(code, settings.ASANA_APP_ID, settings.ASANA_APP_SECRET, settings.ASANA_APP_CALLBACK_URL)
except exceptions.InvalidRequest:
raise exc.BadRequest(_('Invalid asana api request'))
except exceptions.FailedRequest:
raise exc.BadRequest(_('Failed to make the request to asana api'))
return response.Ok({"token": asana_token})