Fixed issue #298: Now permissions on API listings works

remotes/origin/enhancement/email-actions
Jesús Espino 2014-08-08 13:42:59 +02:00
parent b033419977
commit 26e462e182
9 changed files with 69 additions and 75 deletions

View File

@ -17,12 +17,14 @@ import operator
from functools import reduce from functools import reduce
from django.db.models import Q from django.db.models import Q
from django.db.models.sql.where import ExtraWhere, OR from django.db.models.sql.where import ExtraWhere, OR, AND
from rest_framework import filters from rest_framework import filters
from taiga.base import tags from taiga.base import tags
from taiga.users.models import Role
class QueryParamsFilterMixin(filters.BaseFilterBackend): class QueryParamsFilterMixin(filters.BaseFilterBackend):
_special_values_dict = { _special_values_dict = {
@ -89,19 +91,31 @@ class PermissionBasedFilterBackend(FilterBackend):
permission = None permission = None
def filter_queryset(self, request, queryset, view): def filter_queryset(self, request, queryset, view):
# TODO: Make permissions aware of members permissions, now only check membership. project_id = None
if hasattr(view, "filter_fields") and "project" in view.filter_fields:
project_id = request.QUERY_PARAMS.get("project", None)
qs = queryset qs = queryset
if request.user.is_authenticated() and request.user.is_superuser: if request.user.is_authenticated() and request.user.is_superuser:
qs = qs qs = qs
elif request.user.is_authenticated(): elif request.user.is_authenticated():
qs = qs.filter(Q(project__owner=request.user) | roles_qs = Role.objects.filter(memberships__user=request.user)
Q(project__members=request.user) | roles_qs = roles_qs.extra(where=["users_role.permissions @> ARRAY['{}']".format(self.permission)])
Q(project__is_private=False)) if project_id:
roles_qs = roles_qs.filter(project_id=project_id)
projects_list = [role.project_id for role in roles_qs]
if len(projects_list) == 0:
qs = qs.filter(Q(project__owner=request.user))
elif len(projects_list) == 1:
qs = qs.filter(Q(project__owner=request.user) | Q(project=projects_list[0]))
else:
qs = qs.filter(Q(project__owner=request.user) | Q(project__in=projects_list))
qs.query.where.add(ExtraWhere(["projects_project.public_permissions @> ARRAY['{}']".format(self.permission)], []), OR) qs.query.where.add(ExtraWhere(["projects_project.public_permissions @> ARRAY['{}']".format(self.permission)], []), OR)
else: else:
qs = qs.filter(project__is_private=False) qs = qs.exclude(project__owner=-1)
qs.query.where.add(ExtraWhere(["projects_project.anon_permissions @> ARRAY['{}']".format(self.permission)], []), OR) qs.query.where.add(ExtraWhere(["projects_project.anon_permissions @> ARRAY['{}']".format(self.permission)], []), AND)
return super().filter_queryset(request, qs.distinct(), view) return super().filter_queryset(request, qs.distinct(), view)
@ -133,28 +147,15 @@ class CanViewWikiLinksFilterBackend(PermissionBasedFilterBackend):
class CanViewMilestonesFilterBackend(PermissionBasedFilterBackend): class CanViewMilestonesFilterBackend(PermissionBasedFilterBackend):
permission = "view_milestones" permission = "view_milestones"
class PermissionBasedAttachmentFilterBackend(FilterBackend):
class PermissionBasedAttachmentFilterBackend(PermissionBasedFilterBackend):
permission = None permission = None
def filter_queryset(self, request, queryset, view): def filter_queryset(self, request, queryset, view):
# TODO: Make permissions aware of members permissions, now only check membership. qs = super().filter_queryset(request, queryset, view)
qs = queryset
if request.user.is_authenticated() and request.user.is_superuser:
qs = qs
elif request.user.is_authenticated():
qs = qs.filter(Q(project__owner=request.user) |
Q(project__members=request.user) |
Q(project__is_private=False))
qs.query.where.add(ExtraWhere(["projects_project.public_permissions @> ARRAY['{}']".format(self.permission)], []), OR)
else:
qs = qs.filter(project__is_private=False)
qs.query.where.add(ExtraWhere(["projects_project.anon_permissions @> ARRAY['{}']".format(self.permission)], []), OR)
ct = view.get_content_type() ct = view.get_content_type()
qs = qs.filter(content_type=ct) return qs.filter(content_type=ct)
return super().filter_queryset(request, qs.distinct(), view)
class CanViewUserStoryAttachmentFilterBackend(PermissionBasedAttachmentFilterBackend): class CanViewUserStoryAttachmentFilterBackend(PermissionBasedAttachmentFilterBackend):
@ -175,18 +176,30 @@ class CanViewWikiAttachmentFilterBackend(PermissionBasedAttachmentFilterBackend)
class CanViewProjectObjFilterBackend(FilterBackend): class CanViewProjectObjFilterBackend(FilterBackend):
def filter_queryset(self, request, queryset, view): def filter_queryset(self, request, queryset, view):
project_id = None
if hasattr(view, "filter_fields") and "project" in view.filter_fields:
project_id = request.QUERY_PARAMS.get("project", None)
qs = queryset qs = queryset
if request.user.is_authenticated() and request.user.is_superuser: if request.user.is_authenticated() and request.user.is_superuser:
qs = qs qs = qs
elif request.user.is_authenticated(): elif request.user.is_authenticated():
qs = qs.filter(Q(owner=request.user) | roles_qs = Role.objects.filter(memberships__user=request.user)
Q(members=request.user) | roles_qs = roles_qs.extra(where=["users_role.permissions @> ARRAY['view_project']"])
Q(is_private=False)) if project_id:
roles_qs = roles_qs.filter(project_id=project_id)
projects_list = [role.project_id for role in roles_qs]
if len(projects_list) == 0:
qs = qs.filter(Q(owner=request.user))
elif len(projects_list) == 1:
qs = qs.filter(Q(owner=request.user) | Q(id=projects_list[0]))
else:
qs = qs.filter(Q(owner=request.user) | Q(id__in=projects_list))
qs.query.where.add(ExtraWhere(["projects_project.public_permissions @> ARRAY['view_project']"], []), OR) qs.query.where.add(ExtraWhere(["projects_project.public_permissions @> ARRAY['view_project']"], []), OR)
else: else:
qs = qs.filter(is_private=False) qs.query.where.add(ExtraWhere(["projects_project.anon_permissions @> ARRAY['view_project']"], []), AND)
qs.query.where.add(ExtraWhere(["projects_project.anon_permissions @> ARRAY['view_project']"], []), OR)
return super().filter_queryset(request, qs.distinct(), view) return super().filter_queryset(request, qs.distinct(), view)

View File

@ -152,10 +152,14 @@ class ProjectSerializer(ModelSerializer):
return getattr(obj, "stars_count", 0) return getattr(obj, "stars_count", 0)
def get_my_permissions(self, obj): def get_my_permissions(self, obj):
return get_user_project_permissions(self.context['request'].user, obj) if "request" in self.context:
return get_user_project_permissions(self.context["request"].user, obj)
return []
def get_i_am_owner(self, obj): def get_i_am_owner(self, obj):
return is_project_owner(self.context['request'].user, obj) if "request" in self.context:
return is_project_owner(self.context["request"].user, obj)
return False
class ProjectDetailSerializer(ProjectSerializer): class ProjectDetailSerializer(ProjectSerializer):

View File

@ -71,7 +71,7 @@ class RoleFactory(Factory):
FACTORY_FOR = get_model("users", "Role") FACTORY_FOR = get_model("users", "Role")
name = "Tester" name = "Tester"
slug = "tester" slug = factory.Sequence(lambda n: "test-role-{}".format(n))
project = factory.SubFactory("tests.factories.ProjectFactory") project = factory.SubFactory("tests.factories.ProjectFactory")

View File

@ -546,7 +546,7 @@ def test_user_story_attachment_list(client, data):
] ]
results = helper_test_http_method_and_count(client, 'get', url, None, users) results = helper_test_http_method_and_count(client, 'get', url, None, users)
assert results == [(200, 2), (200, 2), (200, 3), (200, 3), (200, 3)] assert results == [(200, 2), (200, 2), (200, 2), (200, 3), (200, 3)]
def test_task_attachment_list(client, data): def test_task_attachment_list(client, data):
@ -561,7 +561,7 @@ def test_task_attachment_list(client, data):
] ]
results = helper_test_http_method_and_count(client, 'get', url, None, users) results = helper_test_http_method_and_count(client, 'get', url, None, users)
assert results == [(200, 2), (200, 2), (200, 3), (200, 3), (200, 3)] assert results == [(200, 2), (200, 2), (200, 2), (200, 3), (200, 3)]
def test_issue_attachment_list(client, data): def test_issue_attachment_list(client, data):
@ -576,7 +576,7 @@ def test_issue_attachment_list(client, data):
] ]
results = helper_test_http_method_and_count(client, 'get', url, None, users) results = helper_test_http_method_and_count(client, 'get', url, None, users)
assert results == [(200, 2), (200, 2), (200, 3), (200, 3), (200, 3)] assert results == [(200, 2), (200, 2), (200, 2), (200, 3), (200, 3)]
def test_wiki_attachment_list(client, data): def test_wiki_attachment_list(client, data):
@ -591,4 +591,4 @@ def test_wiki_attachment_list(client, data):
] ]
results = helper_test_http_method_and_count(client, 'get', url, None, users) results = helper_test_http_method_and_count(client, 'get', url, None, users)
assert results == [(200, 2), (200, 2), (200, 3), (200, 3), (200, 3)] assert results == [(200, 2), (200, 2), (200, 2), (200, 3), (200, 3)]

View File

@ -146,29 +146,6 @@ def test_milestone_delete(client, data):
assert results == [401, 403, 403, 204] assert results == [401, 403, 403, 204]
def test_milestone_delete_last_milestone(client, data):
url_not_last_milestone = reverse('milestones-detail', kwargs={"pk": data.public_milestone.pk})
data.public_milestone2 = f.MilestoneFactory(project=data.public_project)
url_last_milestone = reverse('milestones-detail', kwargs={"pk": data.public_milestone2.pk})
data.public_membership.role.permissions = list(filter(lambda x: x != "delete_milestone", data.public_membership.role.permissions))
data.public_membership.role.save()
users = [
None,
data.registered_user,
data.project_member_without_perms,
data.project_member_with_perms,
]
results = helper_test_http_method(client, 'delete', url_not_last_milestone, None, users)
assert results == [401, 403, 403, 403]
results = helper_test_http_method(client, 'delete', url_last_milestone, None, users)
assert results == [401, 403, 403, 204]
results = helper_test_http_method(client, 'delete', url_not_last_milestone, None, users)
assert results == [401, 403, 403, 204]
def test_milestone_list(client, data): def test_milestone_list(client, data):
url = reverse('milestones-list') url = reverse('milestones-list')

View File

@ -182,7 +182,7 @@ def test_roles_list(client, data):
client.login(data.project_member_without_perms) client.login(data.project_member_without_perms)
response = client.get(url) response = client.get(url)
projects_data = json.loads(response.content.decode('utf-8')) projects_data = json.loads(response.content.decode('utf-8'))
assert len(projects_data) == 5 assert len(projects_data) == 3
assert response.status_code == 200 assert response.status_code == 200
client.login(data.project_member_with_perms) client.login(data.project_member_with_perms)
@ -310,7 +310,7 @@ def test_points_list(client, data):
client.login(data.project_member_without_perms) client.login(data.project_member_without_perms)
response = client.get(url) response = client.get(url)
projects_data = json.loads(response.content.decode('utf-8')) projects_data = json.loads(response.content.decode('utf-8'))
assert len(projects_data) == 3 assert len(projects_data) == 2
assert response.status_code == 200 assert response.status_code == 200
client.login(data.project_member_with_perms) client.login(data.project_member_with_perms)
@ -471,7 +471,7 @@ def test_user_story_status_list(client, data):
client.login(data.project_member_without_perms) client.login(data.project_member_without_perms)
response = client.get(url) response = client.get(url)
projects_data = json.loads(response.content.decode('utf-8')) projects_data = json.loads(response.content.decode('utf-8'))
assert len(projects_data) == 3 assert len(projects_data) == 2
assert response.status_code == 200 assert response.status_code == 200
client.login(data.project_member_with_perms) client.login(data.project_member_with_perms)
@ -632,7 +632,7 @@ def test_task_status_list(client, data):
client.login(data.project_member_without_perms) client.login(data.project_member_without_perms)
response = client.get(url) response = client.get(url)
projects_data = json.loads(response.content.decode('utf-8')) projects_data = json.loads(response.content.decode('utf-8'))
assert len(projects_data) == 3 assert len(projects_data) == 2
assert response.status_code == 200 assert response.status_code == 200
client.login(data.project_member_with_perms) client.login(data.project_member_with_perms)
@ -793,7 +793,7 @@ def test_issue_status_list(client, data):
client.login(data.project_member_without_perms) client.login(data.project_member_without_perms)
response = client.get(url) response = client.get(url)
projects_data = json.loads(response.content.decode('utf-8')) projects_data = json.loads(response.content.decode('utf-8'))
assert len(projects_data) == 3 assert len(projects_data) == 2
assert response.status_code == 200 assert response.status_code == 200
client.login(data.project_member_with_perms) client.login(data.project_member_with_perms)
@ -954,7 +954,7 @@ def test_issue_type_list(client, data):
client.login(data.project_member_without_perms) client.login(data.project_member_without_perms)
response = client.get(url) response = client.get(url)
projects_data = json.loads(response.content.decode('utf-8')) projects_data = json.loads(response.content.decode('utf-8'))
assert len(projects_data) == 3 assert len(projects_data) == 2
assert response.status_code == 200 assert response.status_code == 200
client.login(data.project_member_with_perms) client.login(data.project_member_with_perms)
@ -1115,7 +1115,7 @@ def test_priority_list(client, data):
client.login(data.project_member_without_perms) client.login(data.project_member_without_perms)
response = client.get(url) response = client.get(url)
projects_data = json.loads(response.content.decode('utf-8')) projects_data = json.loads(response.content.decode('utf-8'))
assert len(projects_data) == 3 assert len(projects_data) == 2
assert response.status_code == 200 assert response.status_code == 200
client.login(data.project_member_with_perms) client.login(data.project_member_with_perms)
@ -1276,7 +1276,7 @@ def test_severity_list(client, data):
client.login(data.project_member_without_perms) client.login(data.project_member_without_perms)
response = client.get(url) response = client.get(url)
projects_data = json.loads(response.content.decode('utf-8')) projects_data = json.loads(response.content.decode('utf-8'))
assert len(projects_data) == 3 assert len(projects_data) == 2
assert response.status_code == 200 assert response.status_code == 200
client.login(data.project_member_with_perms) client.login(data.project_member_with_perms)
@ -1437,7 +1437,7 @@ def test_membership_list(client, data):
client.login(data.project_member_without_perms) client.login(data.project_member_without_perms)
response = client.get(url) response = client.get(url)
projects_data = json.loads(response.content.decode('utf-8')) projects_data = json.loads(response.content.decode('utf-8'))
assert len(projects_data) == 5 assert len(projects_data) == 3
assert response.status_code == 200 assert response.status_code == 200
client.login(data.project_member_with_perms) client.login(data.project_member_with_perms)
@ -1575,7 +1575,7 @@ def test_membership_action_resend_invitation(client, data):
assert results == [401, 403, 403, 403, 204] assert results == [401, 403, 403, 403, 204]
results = helper_test_http_method(client, 'post', private2_url, None, users) results = helper_test_http_method(client, 'post', private2_url, None, users)
assert results == [404, 404, 403, 403, 204] assert results == [404, 404, 404, 403, 204]
def test_project_template_retrieve(client, data): def test_project_template_retrieve(client, data):

View File

@ -302,7 +302,7 @@ def test_project_action_fans(client, data):
results = helper_test_http_method_and_count(client, 'get', private1_url, None, users) results = helper_test_http_method_and_count(client, 'get', private1_url, None, users)
assert results == [(200, 2), (200, 2), (200, 2), (200, 2), (200, 2)] assert results == [(200, 2), (200, 2), (200, 2), (200, 2), (200, 2)]
results = helper_test_http_method_and_count(client, 'get', private2_url, None, users) results = helper_test_http_method_and_count(client, 'get', private2_url, None, users)
assert results == [(404, 1), (404, 1), (403, 2), (200, 2), (200, 2)] assert results == [(404, 1), (404, 1), (404, 1), (200, 2), (200, 2)]
def test_user_action_starred(client, data): def test_user_action_starred(client, data):
@ -350,7 +350,7 @@ def test_project_action_create_template(client, data):
results = helper_test_http_method(client, 'post', private1_url, template_data, users) results = helper_test_http_method(client, 'post', private1_url, template_data, users)
assert results == [401, 403, 403, 403, 403, 201] assert results == [401, 403, 403, 403, 403, 201]
results = helper_test_http_method(client, 'post', private2_url, template_data, users) results = helper_test_http_method(client, 'post', private2_url, template_data, users)
assert results == [404, 404, 403, 403, 403, 201] assert results == [404, 404, 404, 403, 403, 201]
def test_invitations_list(client, data): def test_invitations_list(client, data):

View File

@ -48,7 +48,7 @@ def test_project_owner_unstar_project(client):
def test_project_member_star_project(client): def test_project_member_star_project(client):
user = f.UserFactory.create() user = f.UserFactory.create()
project = f.ProjectFactory.create() project = f.ProjectFactory.create()
role = f.RoleFactory.create(project=project) role = f.RoleFactory.create(project=project, permissions=["view_project"])
f.MembershipFactory.create(project=project, user=user, role=role) f.MembershipFactory.create(project=project, user=user, role=role)
url = reverse("projects-star", args=(project.id,)) url = reverse("projects-star", args=(project.id,))
@ -61,7 +61,7 @@ def test_project_member_star_project(client):
def test_project_member_unstar_project(client): def test_project_member_unstar_project(client):
user = f.UserFactory.create() user = f.UserFactory.create()
project = f.ProjectFactory.create() project = f.ProjectFactory.create()
role = f.RoleFactory.create(project=project) role = f.RoleFactory.create(project=project, permissions=["view_project"])
f.MembershipFactory.create(project=project, user=user, role=role) f.MembershipFactory.create(project=project, user=user, role=role)
url = reverse("projects-unstar", args=(project.id,)) url = reverse("projects-unstar", args=(project.id,))

View File

@ -85,7 +85,7 @@ def test_api_user_action_change_email_no_token(client):
response = client.post(url, json.dumps(data), content_type="application/json") response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400 assert response.status_code == 400
assert response.data['_error_message'] == 'Token is invalid' assert response.data['_error_message'] == 'Invalid, are you sure the token is correct and you didn\'t use it before?'
def test_api_user_action_change_email_invalid_token(client): def test_api_user_action_change_email_invalid_token(client):
@ -97,4 +97,4 @@ def test_api_user_action_change_email_invalid_token(client):
response = client.post(url, json.dumps(data), content_type="application/json") response = client.post(url, json.dumps(data), content_type="application/json")
assert response.status_code == 400 assert response.status_code == 400
assert response.data['_error_message'] == 'Token is invalid' assert response.data['_error_message'] == 'Invalid, are you sure the token is correct and you didn\'t use it before?'