Add history module

remotes/origin/enhancement/email-actions
Andrey Antukh 2014-05-09 10:32:52 +02:00 committed by David Barragán Merino
parent 1a9ef0fe04
commit a8bdb364ee
18 changed files with 1381 additions and 18 deletions

View File

@ -1,9 +1,8 @@
#git+https://github.com/tomchristie/django-rest-framework.git@2.4.0
djangorestframework==2.3.13
django-reversion==1.8.0
Django==1.6.2
South==0.8.3
South==0.8.4
django-filter==0.7
django-picklefield==0.3.0
django-sampledatahelper==0.2.1

View File

View File

@ -0,0 +1,88 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.contrib.contenttypes.models import ContentType
from django.utils.translation import ugettext as _
from django.shortcuts import get_object_or_404
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from taiga.base.api import GenericViewSet
from taiga.base.filters import IsProjectMemberFilterBackend
from taiga.base import exceptions as exc
from . import permissions
from . import serializers
from . import services
# TODO: add specific permission for view history?
class HistoryViewSet(GenericViewSet):
filter_backends = (IsProjectMemberFilterBackend,)
permission_classes = (IsAuthenticated, permissions.HistoryPermission)
serializer_class = serializers.HistoryEntrySerializer
content_type = None
def get_content_type(self):
app_name, model = self.content_type.split(".", 1)
return get_object_or_404(ContentType, app_label=app_name, model=model)
def get_object(self):
ct = self.get_content_type()
model_cls = ct.model_class()
qs = model_cls.objects.all()
filtered_qs = self.filter_queryset(qs)
return super().get_object(queryset=filtered_qs)
def response_for_queryset(self, queryset):
# Switch between paginated or standard style responses
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_pagination_serializer(page)
else:
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
# Just for restframework! Because it raises
# 404 on main api root if this method not exists.
def list(self, request):
return Response({})
def retrieve(self, request, pk):
obj = self.get_object()
qs = services.get_history_queryset_by_model_instance(obj)
return self.response_for_queryset(qs)
class UserStoryHistory(HistoryViewSet):
content_type = "userstories.userstory"
class TaskHistory(HistoryViewSet):
content_type = "tasks.task"
class IssueHistory(HistoryViewSet):
content_type = "issues.issue"
class WikiHistory(HistoryViewSet):
content_type = "wiki.wiki"

View File

@ -0,0 +1,261 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from functools import partial
from django.db.models.loading import get_model
from django.contrib.contenttypes.models import ContentType
from taiga.base.utils.iterators import as_tuple
from taiga.base.utils.iterators import as_dict
from taiga.mdrender.service import render as mdrender
import os
####################
# Values
####################
@as_dict
def _get_generic_values(ids:tuple, *, typename=None, attr:str="name") -> tuple:
app_label, model_name = typename.split(".", 1)
content_type = ContentType.objects.get(app_label=app_label, model=model_name)
model_cls = content_type.model_class()
ids = filter(lambda x: x is not None, ids)
qs = model_cls.objects.filter(pk__in=ids)
for instance in qs:
yield str(instance.pk), getattr(instance, attr)
@as_dict
def _get_users_values(ids:set) -> dict:
user_model = get_model("users", "User")
ids = filter(lambda x: x is not None, ids)
qs = user_model.objects.filter(pk__in=tuple(ids))
for user in qs:
yield str(user.pk), user.get_full_name()
_get_us_status_values = partial(_get_generic_values, typename="projects.userstorystatus")
_get_task_status_values = partial(_get_generic_values, typename="projects.taskstatus")
_get_issue_status_values = partial(_get_generic_values, typename="projects.issuestatus")
_get_issue_type_values = partial(_get_generic_values, typename="projects.issuetype")
_get_role_values = partial(_get_generic_values, typename="users.role")
_get_points_values = partial(_get_generic_values, typename="projects.points")
_get_priority_values = partial(_get_generic_values, typename="projects.priority")
_get_severity_values = partial(_get_generic_values, typename="projects.severity")
_get_milestone_values = partial(_get_generic_values, typename="milestones.milestone")
def _common_users_values(diff):
"""
Groups common values resolver logic of userstories,
issues and tasks.
"""
values = {}
users = set()
if "owner" in diff:
users.update(diff["owner"])
if "watchers" in diff:
for ids in diff["watchers"]:
if not ids:
continue
users.update(ids)
if "assigned_to" in diff:
users.update(diff["assigned_to"])
if users:
values["users"] = _get_users_values(users)
return values
def milestone_values(diff):
values = _common_users_values(diff)
return values
def userstory_values(diff):
values = _common_users_values(diff)
if "status" in diff:
values["status"] = _get_us_status_values(diff["status"])
if "milestone" in diff:
values["milestone"] = _get_milestone_values(diff["milestone"])
if "points" in diff:
points, roles = set(), set()
for pointsentry in diff["points"]:
if pointsentry is None:
continue
for role_id, point_id in pointsentry.items():
points.add(point_id)
roles.add(role_id)
values["roles"] = _get_role_values(roles)
values["points"] = _get_points_values(points)
return values
def issue_values(diff):
values = _common_users_values(diff)
if "status" in diff:
values["status"] = _get_issue_status_values(diff["status"])
if "milestone" in diff:
values["milestone"] = _get_milestone_values(diff["milestone"])
if "priority" in diff:
values["priority"] = _get_priority_values(diff["priority"])
if "severity" in diff:
values["severity"] = _get_severity_values(diff["severity"])
if "type" in diff:
values["issue_type"] = _get_issue_type_values(diff["type"])
return values
def task_values(diff):
values = _common_users_values(diff)
if "status" in diff:
values["status"] = _get_task_status_values(diff["status"])
if "milestone" in diff:
values["milestone"] = _get_milestone_values(diff["milestone"])
return values
def wiki_values(diff):
values = _common_users_values(diff)
return values
####################
# Freezes
####################
@as_tuple
def extract_attachments(obj) -> list:
for attach in obj.attachments.all():
yield {"id": attach.id,
"filename": os.path.basename(attach.attached_file.name),
"description": attach.description,
"is_deprecated": attach.is_deprecated,
"description": attach.description,
"order": attach.order}
def milestone_freezer(milestone) -> dict:
snapshot = {
"name": milestone.name,
"slug": milestone.slug,
"owner": milestone.owner_id,
"estimated_start": milestone.estimated_start,
"estimated_finish": milestone.estimated_finish,
"closed": milestone.closed,
"disponibility": milestone.disponibility
}
return snapshot
def userstory_freezer(us) -> dict:
rp_cls = get_model("userstories", "RolePoints")
rpqsd = rp_cls.objects.filter(user_story=us)
points = {}
for rp in rpqsd:
points[str(rp.role_id)] = rp.points_id
snapshot = {
"ref": us.ref,
"owner": us.owner_id,
"status": us.status_id,
"is_closed": us.is_closed,
"finish_date": us.finish_date,
"order": us.order,
"subject": us.subject,
"description": us.description,
"description_html": mdrender(us.project, us.description),
"assigned_to": us.assigned_to_id,
"milestone": us.milestone_id,
"client_requirement": us.client_requirement,
"team_requirement": us.team_requirement,
"watchers": [x.id for x in us.watchers.all()],
"attachments": extract_attachments(us),
"tags": us.tags,
"points": points,
"from_issue": us.generated_from_issue_id,
}
return snapshot
def issue_freezer(issue) -> dict:
snapshot = {
"ref": issue.ref,
"owner": issue.owner_id,
"status": issue.status_id,
"priority": issue.priority_id,
"severity": issue.severity_id,
"type": issue.type_id,
"milestone": issue.milestone_id,
"subject": issue.subject,
"description": issue.description,
"description_html": mdrender(issue.project, issue.description),
"assigned_to": issue.assigned_to_id,
"watchers": [x.pk for x in issue.watchers.all()],
"attachments": extract_attachments(issue),
"tags": issue.tags,
}
return snapshot
def task_freezer(task) -> dict:
snapshot = {
"ref": task.ref,
"owner": task.owner_id,
"status": task.status_id,
"milestone": task.milestone_id,
"subject": task.subject,
"description": task.description,
"description_html": mdrender(task.project, task.description),
"assigned_to": task.assigned_to_id,
"watchers": [x.pk for x in task.watchers.all()],
"attachments": extract_attachments(task),
"tags": task.tags,
"user_story": task.user_story_id,
"is_iocaine": task.is_iocaine,
}
return snapshot
def wiki_freezer(wiki) -> dict:
snapshot = {
"slug": wiki.slug,
"owner": wiki.owner_id,
"content": wiki.content,
"content_home": mdrender(wiki.project, wiki.content),
"watchers": [x.pk for x in wiki.watchers.all()],
"attachments": extract_attachments(wiki),
}
return snapshot

View File

@ -0,0 +1,77 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction as tx
from django.db.models.loading import get_model
from django.core.paginator import Paginator
from reversion import get_unique_for_object
from taiga.projects.history.services import take_snapshot
class Command(BaseCommand):
help = 'Migrate reversion history to new history system.'
def efficient_queryset_iter(queryset):
paginator = Paginator(queryset, 20)
for page_num in paginator.page_range:
page = paginator.page(page)
for element in page.object_list:
yield element
def iter_object_with_version(self, model_cls):
qs = model_cls.objects.all()
for obj in qs:
revs = get_unique_for_object(obj)
for rev in revs:
yield obj, rev
def handle_generic_model(self, app_name, model):
model_cls = get_model(app_name, model)
for obj, rev in self.iter_object_with_version(model_cls):
msg = "Processing app:{0} model:{1} pk:{2} revid:{3}."
print(msg.format(app_name, model.lower(), obj.id, rev.id), file=sys.stderr)
oldobj = rev.object_version.object
if rev.revision is None:
continue
comment = rev.revision.comment
user = rev.revision.user
hentry = take_snapshot(oldobj, user=user, comment=comment)
if hentry is None:
continue
hentry.created_at = rev.revision.date_created
hentry.save()
def clear_history(self):
model_cls = get_model("history", "HistoryEntry")
model_cls.objects.all().delete()
@tx.atomic
def handle(self, *args, **options):
self.clear_history()
self.handle_generic_model("tasks", "Task")
self.handle_generic_model("userstories", "UserStory")
self.handle_generic_model("issues", "Issue")

View File

@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
from south.utils import datetime_utils as datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding model 'HistoryEntry'
db.create_table('history_historyentry', (
('id', self.gf('django.db.models.fields.CharField')(unique=True, default='e3cec230-d752-11e3-a409-b499ba5650c0', max_length=255, primary_key=True)),
('user', self.gf('django_pgjson.fields.JsonField')(default=None)),
('created_at', self.gf('django.db.models.fields.DateTimeField')(blank=True, auto_now_add=True)),
('type', self.gf('django.db.models.fields.SmallIntegerField')()),
('is_snapshot', self.gf('django.db.models.fields.BooleanField')(default=False)),
('key', self.gf('django.db.models.fields.CharField')(blank=True, null=True, default=None, max_length=255)),
('diff', self.gf('django_pgjson.fields.JsonField')(blank=False, default=None)),
('snapshot', self.gf('django_pgjson.fields.JsonField')(blank=False, default=None)),
('values', self.gf('django_pgjson.fields.JsonField')(blank=False, default=None)),
('comment', self.gf('django.db.models.fields.TextField')(blank=True)),
('comment_html', self.gf('django.db.models.fields.TextField')(blank=True)),
))
db.send_create_signal('history', ['HistoryEntry'])
def backwards(self, orm):
# Deleting model 'HistoryEntry'
db.delete_table('history_historyentry')
models = {
'history.historyentry': {
'Meta': {'object_name': 'HistoryEntry', 'ordering': "['created_at']"},
'comment': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'comment_html': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'created_at': ('django.db.models.fields.DateTimeField', [], {'blank': 'True', 'auto_now_add': 'True'}),
'diff': ('django_pgjson.fields.JsonField', [], {'blank': 'False', 'default': 'None'}),
'id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'default': "'e3cf38a0-d752-11e3-a409-b499ba5650c0'", 'max_length': '255', 'primary_key': 'True'}),
'is_snapshot': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'key': ('django.db.models.fields.CharField', [], {'blank': 'True', 'null': 'True', 'default': 'None', 'max_length': '255'}),
'snapshot': ('django_pgjson.fields.JsonField', [], {'blank': 'False', 'default': 'None'}),
'type': ('django.db.models.fields.SmallIntegerField', [], {}),
'user': ('django_pgjson.fields.JsonField', [], {'default': 'None'}),
'values': ('django_pgjson.fields.JsonField', [], {'blank': 'False', 'default': 'None'})
}
}
complete_apps = ['history']

View File

@ -0,0 +1,161 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import uuid
import enum
from django.utils.translation import ugettext_lazy as _
from django.db import models
from django.db.models.loading import get_model
from django.utils.functional import cached_property
from django_pgjson.fields import JsonField
class HistoryType(enum.IntEnum):
change = 1
create = 2
class HistoryEntry(models.Model):
"""
Domain model that represents a history
entry storage table.
It is used for store object changes and
comments.
"""
TYPE_CHOICES = ((HistoryType.change, _("Change")),
(HistoryType.create, _("Create")))
id = models.CharField(primary_key=True, max_length=255, unique=True,
editable=False, default=lambda: str(uuid.uuid1()))
user = JsonField(blank=True, default=None, null=True)
created_at = models.DateTimeField(auto_now_add=True)
type = models.SmallIntegerField(choices=TYPE_CHOICES)
is_snapshot = models.BooleanField(default=False)
key = models.CharField(max_length=255, null=True, default=None, blank=True)
# Stores the last diff
diff = JsonField(null=True, default=None)
# Stores the last complete frozen object snapshot
snapshot = JsonField(null=True, default=None)
# Stores a values of all identifiers used in
values = JsonField(null=True, default=None)
# Stores a comment
comment = models.TextField(blank=True)
comment_html = models.TextField(blank=True)
@cached_property
def is_comment(self):
return self.type == HistoryType.comment
@cached_property
def owner(self):
pk = self.user["pk"]
model = get_model("users", "User")
return model.objects.get(pk=pk)
@cached_property
def values_diff(self):
result = {}
users_keys = ["assigned_to", "owner"]
def resolve_value(field, key):
data = self.values[field]
key = str(key)
if key not in data:
return None
return data[key]
for key in self.diff:
value = None
if key in users_keys:
value = [resolve_value("users", x) for x in self.diff[key]]
elif key == "watchers":
value = [[resolve_value("users", x) for x in self.diff[key][0]],
[resolve_value("users", x) for x in self.diff[key][1]]]
elif key == "points":
points = {}
pointsold = self.diff["points"][0]
pointsnew = self.diff["points"][1]
# pointsold = pointsnew
if pointsold is None:
for role_id, point_id in pointsnew.items():
role_name = resolve_value("roles", role_id)
points[role_name] = [None, resolve_value("points", point_id)]
else:
for role_id, point_id in pointsnew.items():
role_name = resolve_value("roles", role_id)
oldpoint_id = pointsold.get(role_id, None)
points[role_name] = [resolve_value("points", oldpoint_id),
resolve_value("points", point_id)]
# Process that removes points entries with
# duplicate value.
for role in dict(points):
values = points[role]
if values[1] == values[0]:
del points[role]
if points:
value = points
elif key == "attachments":
attachments = {
"new": [],
"changed": [],
"deleted": [],
}
oldattachs = {x["id"]:x for x in self.diff["attachments"][0]}
newattachs = {x["id"]:x for x in self.diff["attachments"][1]}
for aid in set(tuple(oldattachs.keys()) + tuple(newattachs.keys())):
if aid in oldattachs and aid in newattachs:
if oldattachs[aid] != newattachs[aid]:
attachments["changed"].append([oldattachs[aid],newattachs[aid]])
elif aid in oldattachs and aid not in newattachs:
attachments["deleted"].append(oldattachs[aid])
elif aid not in oldattachs and aid in newattachs:
attachments["new"].append(newattachs[aid])
if attachments["new"] or attachments["changed"] or attachments["deleted"]:
value = attachments
elif key in self.values:
value = [resolve_value(key, x) for x in self.diff[key]]
else:
value = self.diff[key]
if not value:
continue
result[key] = value
return result
class Meta:
ordering = ["created_at"]

View File

@ -0,0 +1,20 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from taiga.base.permissions import Permission
class HistoryPermission(Permission):
def has_object_permission(self, request, view, obj):
# TODO: change this.
return True

View File

@ -0,0 +1,32 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from rest_framework import serializers
from taiga.base.serializers import JsonField
from . import models
class HistoryEntrySerializer(serializers.ModelSerializer):
diff = JsonField()
snapshot = JsonField()
values = JsonField()
values_diff = JsonField()
user = JsonField()
class Meta:
model = models.HistoryEntry

View File

@ -0,0 +1,293 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
This module contains a main domain logic for object history management.
This is possible example:
from taiga.projects import history
class ViewSet(restfw.ViewSet):
def create(request):
object = get_some_object()
history.freeze(object)
# Do something...
history.persist_history(object, user=request.user)
"""
from collections import namedtuple
from functools import partial, wraps, lru_cache
from copy import deepcopy
from django.conf import settings
from django.db.models.loading import get_model
from django.db import transaction as tx
from django.core.paginator import Paginator, InvalidPage
from django.contrib.contenttypes.models import ContentType
from .models import HistoryType
from taiga.mdrender.service import render as mdrender
# Type that represents a freezed object
FrozenObj = namedtuple("FrozenObj", ["key", "snapshot"])
FrozenDiff = namedtuple("FrozenDiff", ["key", "diff", "snapshot"])
# Dict containing registred contentypes with their freeze implementation.
_freeze_impl_map = {}
# Dict containing registred containing with their values implementation.
_values_impl_map = {}
def make_key_from_model_object(obj:object) -> str:
"""
Create unique key from model instance.
"""
tn = get_typename_for_model_class(obj.__class__)
return "{0}:{1}".format(tn, obj.pk)
def get_typename_for_model_class(model:object) -> str:
"""
Get typename for model instance.
"""
ct = ContentType.objects.get_for_model(model)
return "{0}.{1}".format(ct.app_label, ct.model)
def register_values_implementation(fn=None, *, typename:str=None):
"""
Register values implementation for specified typename.
This function can be used as decorator.
"""
if fn is None:
return partial(register_values_implementation, typename=typename)
if typename is None:
raise RuntimeError("typename must be specied")
@wraps(fn)
def _wrapper(*args, **kwargs):
return fn(*args, **kwargs)
_values_impl_map[typename] = _wrapper
return _wrapper
def register_freeze_implementation(fn=None, *, typename:str=None):
"""
Register freeze implementation for specified typename.
This function can be used as decorator.
"""
if fn is None:
return partial(register_freeze_implementation, typename=typename)
if typename is None:
raise RuntimeError("typename must be specied")
@wraps(fn)
def _wrapper(*args, **kwargs):
return fn(*args, **kwargs)
_freeze_impl_map[typename] = _wrapper
return _wrapper
# Low level api
def freeze_model_instance(obj:object) -> FrozenObj:
"""
Creates a new frozen object from model instance.
The freeze process consists on converting model
instances to hashable plain python objects and
wrapped into FrozenObj.
"""
typename = get_typename_for_model_class(obj.__class__)
if typename not in _freeze_impl_map:
raise RuntimeError("No implementation found for {}".format(typename))
key = make_key_from_model_object(obj)
impl_fn = _freeze_impl_map[typename]
return FrozenObj(key, impl_fn(obj))
def make_diff(oldobj:FrozenObj, newobj:FrozenObj) -> FrozenDiff:
"""
Compute a diff between two frozen objects.
"""
assert isinstance(newobj, FrozenObj), "newobj parameter should be instance of FrozenObj"
if oldobj is None:
return FrozenDiff(newobj.key, {}, newobj.snapshot)
first = oldobj.snapshot
second = newobj.snapshot
diff = {}
not_found_value = None
# Check all keys in first dict
for key in first:
if key not in second:
diff[key] = (first[key], not_found_value)
elif first[key] != second[key]:
diff[key] = (first[key], second[key])
# Check all keys in second dict to find missing
for key in second:
if key not in first:
diff[key] = (not_found_value, second[key])
return FrozenDiff(newobj.key, diff, newobj.snapshot)
def make_diff_values(typename:str, fdiff:FrozenDiff) -> dict:
"""
Given a typename and diff, build a values dict for it.
"""
if typename not in _values_impl_map:
raise RuntimeError("No implementation found for {}".format(typename))
impl_fn = _values_impl_map[typename]
return impl_fn(fdiff.diff)
def _rebuild_snapshot_from_diffs(keysnapshot, partials):
result = deepcopy(keysnapshot)
for part in partials:
for key, value in part.diff.items():
result[key] = value[1]
return result
def get_last_snapshot_for_key(key:str) -> FrozenObj:
entry_model = get_model("history", "HistoryEntry")
# Search last snapshot
qs = (entry_model.objects
.filter(key=key, is_snapshot=True)
.order_by("-created_at"))
keysnapshot = qs.first()
if keysnapshot is None:
return None, True
# Get all partial snapshots
entries = tuple(entry_model.objects
.filter(key=key, is_snapshot=False)
.filter(created_at__gte=keysnapshot.created_at)
.order_by("created_at"))
snapshot = _rebuild_snapshot_from_diffs(keysnapshot.snapshot, entries)
max_partial_diffs = getattr(settings, "MAX_PARTIAL_DIFFS", 60)
if len(entries) >= max_partial_diffs:
return FrozenObj(keysnapshot.key, snapshot), True
return FrozenObj(keysnapshot.key, snapshot), False
# Public api
@tx.atomic
def take_snapshot(obj:object, *, comment:str="", user=None):
"""
Given any model instance with registred content type,
create new history entry of "change" type.
This raises exception in case of object wasn't
previously freezed.
"""
key = make_key_from_model_object(obj)
typename = get_typename_for_model_class(obj.__class__)
new_fobj = freeze_model_instance(obj)
old_fobj, need_snapshot = get_last_snapshot_for_key(key)
fdiff = make_diff(old_fobj, new_fobj)
fvals = make_diff_values(typename, fdiff)
# If diff and comment are empty, do
# not create empty history entry
if not fdiff.diff and not comment and old_fobj != None:
return None
entry_type = HistoryType.change if old_fobj else HistoryType.create
entry_model = get_model("history", "HistoryEntry")
user_id = None if user is None else user.id
user_name = "" if user is None else user.get_full_name()
kwargs = {
"user": {"pk": user_id, "name": user_name},
"type": entry_type,
"key": key,
"diff": fdiff.diff,
"snapshot": fdiff.snapshot if need_snapshot else None,
"is_snapshot": need_snapshot,
"comment": comment,
"comment_html": mdrender(obj.project, comment),
"values": fvals,
}
return entry_model.objects.create(**kwargs)
# High level query api
def get_history_queryset_by_model_instance(obj:object, types=(HistoryType.change,)):
"""
Get one page of history for specified object.
"""
key = make_key_from_model_object(obj)
history_entry_model = get_model("history", "HistoryEntry")
qs = history_entry_model.objects.filter(key=key, type__in=types)
return qs.order_by("-created_at")
# Freeze implementatitions
from .freeze_impl import milestone_freezer
from .freeze_impl import userstory_freezer
from .freeze_impl import issue_freezer
from .freeze_impl import task_freezer
from .freeze_impl import wiki_freezer
register_freeze_implementation(milestone_freezer, typename="milestones.milestone")
register_freeze_implementation(userstory_freezer, typename="userstories.userstory")
register_freeze_implementation(issue_freezer, typename="issues.issue")
register_freeze_implementation(task_freezer, typename="tasks.task")
register_freeze_implementation(wiki_freezer, typename="wiki.wiki")
from .freeze_impl import milestone_values
from .freeze_impl import userstory_values
from .freeze_impl import issue_values
from .freeze_impl import task_values
from .freeze_impl import wiki_values
register_values_implementation(milestone_values, typename="milestones.milestone")
register_values_implementation(userstory_values, typename="userstories.userstory")
register_values_implementation(issue_values, typename="issues.issue")
register_values_implementation(task_values, typename="tasks.task")
register_values_implementation(wiki_values, typename="wiki.wiki")

View File

@ -0,0 +1,92 @@
<dl>
{% for field_name, values in changed_fields.items() %}
<dt style="background: #669933; padding: 5px 15px; color: #fff">
<b>{{ verbose_name(object, field_name) }}</b>
</dt>
{# POINTS #}
{% if field_name == "points" %}
{% for role, points in values.items() %}
<dd style="background: #b2cc99; padding: 5px 15px; color: #fff">
<b>{{ role }}</b>
</dd>
<dd style="background: #eee; padding: 5px 15px; color: #444">
<b>to:</b> <i>{{ points.1|linebreaksbr }}</i>
</dd>
<dd style="padding: 5px 15px; color: #bbb">
<b>from:</b> <i>{{ points.0|linebreaksbr }}</i>
</dd>
{% endfor %}
{# ATTACHMENTS #}
{% elif field_name == "attachments" %}
{% if values.new %}
<dd style="background: #b2cc99; padding: 5px 15px; color: #fff">
<b>{{ _("Added") }}</b>
</dd>
{% for att in values['new']%}
<dd style="background: #eee; padding: 5px 15px; color: #444">
<i>{{ att.filename|linebreaksbr }}</i>
</dd>
{% endfor %}
{% endif %}
{% if values.changed %}
<dd style="background: #b2cc99; padding: 5px 15px; color: #fff">
<b>{{ _("Changed") }}</b>
</dd>
{% for att in values['changed'] %}
<dd style="background: #eee; padding: 5px 15px; color: #444">
<i>{{ att.1.filename|linebreaksbr }}</i>
</dd>
{% endfor %}
{% endif %}
{% if values.deleted %}
<dd style="background: #b2cc99; padding: 5px 15px; color: #fff">
<b>{{ _("Deleted") }}</b>
</dd>
{% for att in values['deleted']%}
<dd style="padding: 5px 15px; color: #bbb">
<i>{{ att.filename|linebreaksbr }}</i>
</dd>
{% endfor %}
{% endif %}
{# TAGS AND WATCHERS #}
{% elif field_name in ["tags", "watchers"] %}
<dd style="background: #eee; padding: 5px 15px; color: #444">
<b>to:</b> <i>{{ ', '.join(values.1)|linebreaksbr }}</i>
</dd>
{% if values.0 != None or values.0 != [] %}
<dd style="padding: 5px 15px; color: #bbb">
<b>from:</b> <i>{{ ', '.join(values.0)|linebreaksbr }}</i>
</dd>
{% endif %}
{# * #}
{% else %}
{% if values.1 != None or values.1 != "" %}
<dd style="background: #eee; padding: 5px 15px; color: #444">
<b>to:</b> <i>{{ values.1|linebreaksbr }}</i>
</dd>
{% endif %}
{% if values.0 != None or values.0 != "" %}
<dd style="padding: 5px 15px; color: #bbb">
<b>from:</b> <i>{{ values.0|linebreaksbr }}</i>
</dd>
{% endif %}
{% endif %}
{% endfor %}
</dl>

View File

@ -0,0 +1,45 @@
{% for field_name, values in changed_fields.items() %}
- {{ verbose_name(object, field_name) }}:
{# POINTS #}
{% if field_name == "points" %}
{% for role, points in values.items() %}
* {{ role }} to: {{ points.1|linebreaksbr }} from: {{ points.0|linebreaksbr }}
{% endfor %}
{# ATTACHMENTS #}
{% elif field_name == "attachments" %}
{% if values.new %}
* {{ _("Added") }}:
{% for att in values['new']%}
- {{ att.filename|linebreaksbr }}
{% endfor %}
{% endif %}
{% if values.changed %}
* {{ _("Changed") }}
{% for att in values['changed'] %}
- {{ att.1.filename|linebreaksbr }}
{% endfor %}
{% endif %}
{% if values.deleted %}
* {{ _("Deleted") }}
{% for att in values['deleted']%}
- {{ att.filename|linebreaksbr }}
{% endfor %}
{% endif %}
{# TAGS AND WATCHERS #}
{% elif field_name in ["tags", "watchers"] %}
* to: {{ ', '.join(values.1)|linebreaksbr }}
{% if values.0 != None or values.0 != [] %}
* from: {{ ', '.join(values.0)|linebreaksbr }}
{% endif %}
{# * #}
{% else %}
{% if values.1 != None or values.1 != "" %}
* to: {{ values.1|linebreaksbr }}
{% endif %}
{% if values.0 != None or values.0 != "" %}
* from: {{ values.0|linebreaksbr }}
{% endif %}
{% endif %}
{% endfor %}

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,27 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django_jinja.base import Library
register = Library()
@register.global_function
def verbose_name(obj:object, field_name:str) -> str:
try:
return obj._meta.get_field(field_name).verbose_name
except Exception:
return field_name

View File

@ -0,0 +1,207 @@
# Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
# Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
# Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django import test
from django.core.urlresolvers import reverse
from django.core.files.uploadedfile import SimpleUploadedFile
from django.contrib.contenttypes.models import ContentType
from django.db.models.loading import get_model
from taiga.users.tests import create_user
from taiga.projects.tests import create_project
from taiga.projects.userstories.tests import create_userstory
from . import services as history
from . import models
class HistoryApiViewsTest(test.TestCase):
fixtures = ["initial_domains.json", "initial_project_templates.json"]
def setUp(self):
self.user1 = create_user(1) # Project owner
self.project1 = create_project(1, self.user1)
def test_resolve_urls(self):
self.assertEqual(reverse("userstory-history-detail", args=[1]), "/api/v1/history/userstory/1")
def test_list_history_entries(self):
userstory1 = create_userstory(1, self.user1, self.project1)
userstory1.subject = "test1"
userstory1.save()
history.take_snapshot(userstory1)
userstory1.subject = "test2"
userstory1.save()
history.take_snapshot(userstory1)
response = self.client.login(username=self.user1.username,
password=self.user1.username)
url = reverse("userstory-history-detail", args=[userstory1.pk])
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
class HistoryServicesTest(test.TestCase):
fixtures = ["initial_domains.json", "initial_project_templates.json"]
def setUp(self):
self.user1 = create_user(1) # Project owner
self.project1 = create_project(1, self.user1)
# def test_freeze_userstory(self):
# userstory1 = create_userstory(1, self.user1, self.project1)
# fobj = history.freeze_model_instance(userstory1)
# self.assertEqual(fobj.key, "userstories.userstory:{}".format(userstory1.id))
# self.assertIn("status", fobj.snapshot)
def test_freeze_wrong_object(self):
some_object = object()
with self.assertRaises(Exception):
history.freeze_model_instance(some_object)
def test_diff(self):
userstory1 = create_userstory(1, self.user1, self.project1)
userstory1.subject = "test1"
userstory1.save()
fobj1 = history.freeze_model_instance(userstory1)
userstory1.subject = "test2"
userstory1.save()
fobj2 = history.freeze_model_instance(userstory1)
fdiff = history.make_diff(fobj1, fobj2)
self.assertEqual(fdiff.diff, {"subject": ('test1', 'test2')})
def test_snapshot(self):
userstory1 = create_userstory(1, self.user1, self.project1)
userstory1.subject = "test1"
userstory1.save()
hentry = history.take_snapshot(userstory1)
userstory1.subject = "test2"
userstory1.save()
self.assertEqual(hentry.key, "userstories.userstory:{}".format(userstory1.id))
self.assertEqual(models.HistoryEntry.objects.count(), 1)
history.take_snapshot(userstory1)
self.assertEqual(models.HistoryEntry.objects.count(), 2)
def test_comment(self):
userstory1 = create_userstory(1, self.user1, self.project1)
self.assertEqual(models.HistoryEntry.objects.count(), 0)
hentry = history.take_snapshot(userstory1, comment="Sample comment")
self.assertEqual(models.HistoryEntry.objects.count(), 1)
self.assertEqual(hentry.comment, "Sample comment")
def test_userstory_points(self):
userstory1 = create_userstory(1, self.user1, self.project1)
hentry = history.take_snapshot(userstory1)
self.assertEqual({}, hentry.values_diff)
rpmodel_cls = get_model("userstories", "RolePoints")
pmodel_cls = get_model("projects", "Points")
rolepoints = rpmodel_cls.objects.filter(user_story=userstory1)[0]
points = pmodel_cls.objects.get(project=userstory1.project, value=15)
rolepoints.points = points
rolepoints.save()
hentry = history.take_snapshot(userstory1)
self.assertIn("points", hentry.values_diff)
self.assertIn("UX", hentry.values_diff["points"])
self.assertEqual(hentry.values_diff["points"]["UX"], ["?", "15"])
def test_userstory_attachments(self):
userstory1 = create_userstory(1, self.user1, self.project1)
hentry = history.take_snapshot(userstory1)
self.assertEqual({}, hentry.values_diff)
# Create attachment file
attachment_modelcls = get_model("attachments", "Attachment")
content_type = ContentType.objects.get_for_model(userstory1.__class__)
temporary_file = SimpleUploadedFile("text.txt", b"sample content")
attachment = attachment_modelcls.objects.create(project=userstory1.project,
content_type=content_type,
content_object=userstory1,
object_id=userstory1.id,
owner=self.user1,
attached_file=temporary_file)
hentry = history.take_snapshot(userstory1)
self.assertIn("attachments", hentry.values_diff)
def test_values(self):
userstory1 = create_userstory(1, self.user1, self.project1)
userstory1.subject = "test1"
userstory1.save()
hentry = history.take_snapshot(userstory1)
userstory1.subject = "test2"
userstory1.assigned_to = self.user1
userstory1.save()
hentry = history.take_snapshot(userstory1)
self.assertIn("users", hentry.values)
self.assertEqual(len(hentry.values), 1)
self.assertIn("assigned_to", hentry.values_diff)
self.assertEqual(hentry.values_diff["assigned_to"], [None, "Foo1 Bar1"])
def test_partial_snapshots(self):
userstory1 = create_userstory(1, self.user1, self.project1)
hentry = history.take_snapshot(userstory1)
userstory1.subject = "test1"
userstory1.save()
hentry = history.take_snapshot(userstory1)
userstory1.subject = "test2"
userstory1.save()
hentry = history.take_snapshot(userstory1)
userstory1.subject = "test3"
userstory1.save()
hentry = history.take_snapshot(userstory1)
userstory1.subject = "test4"
userstory1.save()
hentry = history.take_snapshot(userstory1)
userstory1.subject = "test5"
userstory1.save()
hentry = history.take_snapshot(userstory1)
self.assertEqual(models.HistoryEntry.objects.count(), 6)
self.assertEqual(models.HistoryEntry.objects.filter(is_snapshot=True).count(), 1)
self.assertEqual(models.HistoryEntry.objects.filter(is_snapshot=False).count(), 5)

View File

@ -16,35 +16,31 @@
from taiga.base import routers
from taiga.auth.api import AuthViewSet
from taiga.users.api import UsersViewSet, PermissionsViewSet
from taiga.base.searches.api import SearchViewSet
from taiga.base.resolver.api import ResolverViewSet
from taiga.projects.api import (ProjectViewSet, MembershipViewSet, InvitationViewSet,
UserStoryStatusViewSet, PointsViewSet, TaskStatusViewSet,
IssueStatusViewSet, IssueTypeViewSet, PriorityViewSet,
SeverityViewSet, ProjectAdminViewSet, RolesViewSet)
from taiga.domains.api import DomainViewSet, DomainMembersViewSet
from taiga.projects.milestones.api import MilestoneViewSet
from taiga.projects.userstories.api import UserStoryViewSet, UserStoryAttachmentViewSet
from taiga.projects.tasks.api import TaskViewSet, TaskAttachmentViewSet
from taiga.projects.issues.api import IssueViewSet, IssueAttachmentViewSet
from taiga.projects.wiki.api import WikiViewSet, WikiAttachmentViewSet
router = routers.DefaultRouter(trailing_slash=False)
# taiga.users
from taiga.users.api import UsersViewSet
from taiga.users.api import PermissionsViewSet
from taiga.auth.api import AuthViewSet
router.register(r"users", UsersViewSet, base_name="users")
router.register(r"permissions", PermissionsViewSet, base_name="permissions")
router.register(r"auth", AuthViewSet, base_name="auth")
# Resolver & Search
from taiga.base.searches.api import SearchViewSet
from taiga.base.resolver.api import ResolverViewSet
router.register(r"resolver", ResolverViewSet, base_name="resolver")
router.register(r"search", SearchViewSet, base_name="search")
# Domains
from taiga.domains.api import DomainViewSet
from taiga.domains.api import DomainMembersViewSet
from taiga.projects.api import ProjectAdminViewSet
router.register(r"sites", DomainViewSet, base_name="sites")
router.register(r"site-members", DomainMembersViewSet, base_name="site-members")
router.register(r"site-projects", ProjectAdminViewSet, base_name="site-projects")
@ -78,6 +74,20 @@ router.register(r"priorities", PriorityViewSet, base_name="priorities")
router.register(r"severities",SeverityViewSet , base_name="severities")
# History & Components
from taiga.projects.history.api import UserStoryHistory
from taiga.projects.history.api import TaskHistory
from taiga.projects.history.api import IssueHistory
from taiga.projects.history.api import WikiHistory
router.register(r"history/userstory", UserStoryHistory, base_name="userstory-history")
router.register(r"history/task", TaskHistory, base_name="task-history")
router.register(r"history/issue", IssueHistory, base_name="issue-history")
router.register(r"history/wiki", WikiHistory, base_name="wiki-history")
# Project components
from taiga.projects.milestones.api import MilestoneViewSet
from taiga.projects.userstories.api import UserStoryViewSet