Add tests
parent
178ab9ec43
commit
39f3f82970
|
@ -17,8 +17,13 @@
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import ipaddress
|
||||||
|
import socket
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
import django_sites as sites
|
import django_sites as sites
|
||||||
from django.core.urlresolvers import reverse as django_reverse
|
from django.core.urlresolvers import reverse as django_reverse
|
||||||
|
from django.utils.translation import ugettext as _
|
||||||
|
|
||||||
URL_TEMPLATE = "{scheme}://{domain}/{path}"
|
URL_TEMPLATE = "{scheme}://{domain}/{path}"
|
||||||
|
|
||||||
|
@ -43,3 +48,31 @@ def get_absolute_url(path):
|
||||||
def reverse(viewname, *args, **kwargs):
|
def reverse(viewname, *args, **kwargs):
|
||||||
"""Same behavior as django's reverse but uses django_sites to compute absolute url."""
|
"""Same behavior as django's reverse but uses django_sites to compute absolute url."""
|
||||||
return get_absolute_url(django_reverse(viewname, *args, **kwargs))
|
return get_absolute_url(django_reverse(viewname, *args, **kwargs))
|
||||||
|
|
||||||
|
|
||||||
|
class HostnameValueError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class IpAddresValueError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def validate_destination_address(url):
|
||||||
|
host = urlparse(url).hostname
|
||||||
|
port = urlparse(url).port
|
||||||
|
|
||||||
|
try:
|
||||||
|
socket_args, *others = socket.getaddrinfo(host, port)
|
||||||
|
except Exception:
|
||||||
|
raise HostnameValueError(_("Host access error"))
|
||||||
|
|
||||||
|
destination_address = socket_args[4][0]
|
||||||
|
try:
|
||||||
|
ipa = ipaddress.ip_address(destination_address)
|
||||||
|
except ValueError:
|
||||||
|
raise IpAddresValueError(_("IP Address error"))
|
||||||
|
if ipa.is_private:
|
||||||
|
raise IpAddresValueError("Private IP Address not allowed")
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
|
@ -22,7 +22,7 @@ import requests
|
||||||
from requests.exceptions import RequestException
|
from requests.exceptions import RequestException
|
||||||
|
|
||||||
from taiga.base.api.renderers import UnicodeJSONRenderer
|
from taiga.base.api.renderers import UnicodeJSONRenderer
|
||||||
from taiga.base.utils import json
|
from taiga.base.utils import json, urls
|
||||||
from taiga.base.utils.db import get_typename_for_model_instance
|
from taiga.base.utils.db import get_typename_for_model_instance
|
||||||
from taiga.celery import app
|
from taiga.celery import app
|
||||||
|
|
||||||
|
@ -30,7 +30,7 @@ from .serializers import (EpicSerializer, EpicRelatedUserStorySerializer,
|
||||||
UserStorySerializer, IssueSerializer, TaskSerializer,
|
UserStorySerializer, IssueSerializer, TaskSerializer,
|
||||||
WikiPageSerializer, MilestoneSerializer,
|
WikiPageSerializer, MilestoneSerializer,
|
||||||
HistoryEntrySerializer, UserSerializer)
|
HistoryEntrySerializer, UserSerializer)
|
||||||
from . import utils
|
|
||||||
from .models import WebhookLog
|
from .models import WebhookLog
|
||||||
|
|
||||||
|
|
||||||
|
@ -74,8 +74,8 @@ def _send_request(webhook_id, url, key, data):
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
utils.validate_destination_address(url)
|
urls.validate_destination_address(url)
|
||||||
except utils.IpaddresValueError as e:
|
except urls.IpAddresValueError as e:
|
||||||
# Error validating url
|
# Error validating url
|
||||||
webhook_log = WebhookLog.objects.create(webhook_id=webhook_id, url=url,
|
webhook_log = WebhookLog.objects.create(webhook_id=webhook_id, url=url,
|
||||||
status=0,
|
status=0,
|
||||||
|
|
|
@ -1,21 +0,0 @@
|
||||||
import ipaddress
|
|
||||||
import socket
|
|
||||||
from urllib.parse import urlparse
|
|
||||||
|
|
||||||
|
|
||||||
class IpaddresValueError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def validate_destination_address(url):
|
|
||||||
host = urlparse(url).hostname
|
|
||||||
port = urlparse(url).port
|
|
||||||
socket_args, _ = socket.getaddrinfo(host, port)
|
|
||||||
destination_address = socket_args[4][0]
|
|
||||||
|
|
||||||
try:
|
|
||||||
ipa = ipaddress.ip_address(destination_address)
|
|
||||||
except ValueError:
|
|
||||||
raise IpaddresValueError(_("IP Address error"))
|
|
||||||
if ipa.is_private:
|
|
||||||
raise IpaddresValueError("Not allowed IP Address")
|
|
|
@ -107,3 +107,35 @@ def test_to_tsquery():
|
||||||
expected = re.sub("([0-9])", r"'\1':*", expected)
|
expected = re.sub("([0-9])", r"'\1':*", expected)
|
||||||
actual = to_tsquery(input)
|
actual = to_tsquery(input)
|
||||||
assert actual == expected
|
assert actual == expected
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("url", [
|
||||||
|
"http://127.0.0.1",
|
||||||
|
"http://[::1]",
|
||||||
|
"http://192.168.0.12",
|
||||||
|
"http://10.0.0.1",
|
||||||
|
"https://172.25.0.1",
|
||||||
|
"https://10.25.23.100",
|
||||||
|
"ftp://192.168.1.100/",
|
||||||
|
"http://[::ffff:c0a8:164]/",
|
||||||
|
"scp://192.168.1.100/",
|
||||||
|
"http://www.192.168.1.100.xip.io/",
|
||||||
|
"http://test.local/",
|
||||||
|
])
|
||||||
|
def test_validate_bad_destination_address(url):
|
||||||
|
with pytest.raises(IpAddresValueError):
|
||||||
|
validate_destination_address(url)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("url", [
|
||||||
|
"http://192.167.0.12",
|
||||||
|
"http://11.0.0.1",
|
||||||
|
"https://173.25.0.1",
|
||||||
|
"https://193.24.23.100",
|
||||||
|
"ftp://173.168.1.100/",
|
||||||
|
"scp://194.168.1.100/",
|
||||||
|
"http://www.google.com/",
|
||||||
|
"http://1.1.1.1/",
|
||||||
|
])
|
||||||
|
def test_validate_good_destination_address(url):
|
||||||
|
assert validate_destination_address(url)
|
||||||
|
|
Loading…
Reference in New Issue