From 2fc1b17f439d57d13800829a03111ba3d02c8a59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Motiejus=20Jak=C5=A1tys?= Date: Thu, 11 Jan 2024 23:36:15 +0200 Subject: [PATCH] add simple view --- app/e11sync/settings.py | 14 ++++ app/lib/__init__.py | 0 app/lib/anonymize_ip.py | 105 +++++++++++++++++++++++++ app/signup/templates/signup/index.html | 14 +++- app/signup/tests.py | 22 +++++- app/signup/views.py | 46 ++++++++++- 6 files changed, 194 insertions(+), 7 deletions(-) create mode 100644 app/lib/__init__.py create mode 100644 app/lib/anonymize_ip.py diff --git a/app/e11sync/settings.py b/app/e11sync/settings.py index af34a6f..468ed94 100644 --- a/app/e11sync/settings.py +++ b/app/e11sync/settings.py @@ -104,3 +104,17 @@ COMPRESS_PRECOMPILERS = ( ) DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField' + +LOGGING = { + "version": 1, + "disable_existing_loggers": False, + "handlers": { + "console": { + "class": "logging.StreamHandler", + }, + }, + "root": { + "handlers": ["console"], + "level": "WARNING", + }, +} diff --git a/app/lib/__init__.py b/app/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/lib/anonymize_ip.py b/app/lib/anonymize_ip.py new file mode 100644 index 0000000..56c5698 --- /dev/null +++ b/app/lib/anonymize_ip.py @@ -0,0 +1,105 @@ +# https://raw.githubusercontent.com/samuelmeuli/anonymize-ip/7f4faedf64643bd3de3607ab8905c1b1a3f8841a/anonymizeip/anonymize_ip.py + +from ipaddress import ip_address + + +def anonymize_ip( + address, + ipv4_mask="255.255.255.0", + ipv6_mask="ffff:ffff:ffff:ffff:0000:0000:0000:0000" +): + """ + Anonymize the provided IPv4 or IPv6 address by setting parts of the + address to 0 + + :param str|int address: IP address to be anonymized + :param str ipv4_mask: Mask that defines which parts of an IPv4 address are + set to 0 (default: "255.255.255.0") + :param str ipv6_mask: Mask that defines which parts of an IPv6 address are + set to 0 (default: "ffff:ffff:ffff:ffff:0000:0000:0000:0000") + :return: Anonymized IP address + :rtype: str + """ + + # IP address to be anonymized + address_packed = ip_address(address).packed + address_len = len(address_packed) + + if address_len == 4: + # IPv4 + ipv4_mask_packed = ip_address(ipv4_mask).packed + __validate_ipv4_mask(ipv4_mask_packed) + return __apply_mask(address_packed, ipv4_mask_packed, 4) + elif address_len == 16: + # IPv6 + ipv6_mask_packed = ip_address(ipv6_mask).packed + __validate_ipv6_mask(ipv6_mask_packed) + return __apply_mask(address_packed, ipv6_mask_packed, 16) + else: + # Invalid address + raise ValueError("Address does not consist of 4 (IPv4) or 16 (IPv6) " + "octets") + + +def __apply_mask(address_packed, mask_packed, nr_bytes): + """ + Perform a bitwise AND operation on all corresponding bytes between the + mask and the provided address. Mask parts set to 0 will become 0 in the + anonymized IP address as well + + :param bytes address_packed: Binary representation of the IP address to + be anonymized + :param bytes mask_packed: Binary representation of the corresponding IP + address mask + :param int nr_bytes: Number of bytes in the address (4 for IPv4, 16 for + IPv6) + :return: Anonymized IP address + :rtype: str + """ + + anon_packed = bytearray() + for i in range(0, nr_bytes): + anon_packed.append(mask_packed[i] & address_packed[i]) + return str(ip_address(bytes(anon_packed))) + + +def __validate_ipv4_mask(mask_packed): + # Test that mask only contains valid numbers + for byte in mask_packed: + if byte != 0 and byte != 255: + raise ValueError("ipv4_mask must only contain numbers 0 or 255") + + # Test that IP address does not get anonymized completely + if mask_packed == b'\x00\x00\x00\x00': + raise ValueError("ipv4_mask cannot be set to \"0.0.0.0\" (all " + "anonymized addresses will be 0.0.0.0)") + + # Test that IP address is changed by anonymization + if mask_packed == b'\xff\xff\xff\xff': + raise ValueError("ipv4_mask cannot be set to \"255.255.255.255\" " + "(addresses will not be anonymized)") + + +def __validate_ipv6_mask(mask_packed): + # Test that mask only contains valid numbers + for byte in mask_packed: + if byte != 0 and byte != 255: + raise ValueError("ipv6_mask must only contain numbers 0 or ffff") + + # Test that IP address does not get anonymized completely + if ( + mask_packed == + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ): + raise ValueError("ipv6_mask cannot be set to " + "\"0000:0000:0000:0000:0000:0000:0000:0000\" (all " + "anonymized addresses will be 0.0.0.0)") + + # Test that IP address is changed by anonymization + if ( + mask_packed == + b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff' + ): + raise ValueError("ipv6_mask cannot be set to " + "\"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff\" " + "(addresses will not be anonymized)") diff --git a/app/signup/templates/signup/index.html b/app/signup/templates/signup/index.html index 634399e..9d1855e 100644 --- a/app/signup/templates/signup/index.html +++ b/app/signup/templates/signup/index.html @@ -77,12 +77,18 @@
{% csrf_token %} - {% comment %} - {{ .Err }} + + {% if error_message %} + {{ error_message }} + {% elif request.GET.success %} - You are now subscribed! + {% if request.GET.already_subscribed %} + Looks like you were already subscribed. We share your excitement! + {% else %} + You are now subscribed! + {% endif %} - {% endcomment %} + {% endif %} diff --git a/app/signup/tests.py b/app/signup/tests.py index 7ce503c..47f7df2 100644 --- a/app/signup/tests.py +++ b/app/signup/tests.py @@ -1,3 +1,21 @@ -from django.test import TestCase +from django.test import TestCase, Client + +from django.urls import reverse + +def SignupViewTests(TestCase): + def setUp(self): + self.client = Client() + + def test_index(self): + resp = self.client.get(reverse("index")) + self.assertEqual(response.status_code, 200) + self.assertContains(resp, "11sync is a privacy-respecting way") + + def test_ok_signup(self): + resp = self.client.post(reverse("index"), {"email": "foo@example.com"}, + follow = True, + HTTP_USER_AGENT = "foo-agent", + REMOTE_ADDR = "127.0.0.2") + self.assertEqual(resp.redirect_chain, ('http://testserver/?success', 302)) + self.assertContains(resp, "You are now subscribed!") -# Create your tests here. diff --git a/app/signup/views.py b/app/signup/views.py index 51fb3f8..e3c70ca 100644 --- a/app/signup/views.py +++ b/app/signup/views.py @@ -1,4 +1,48 @@ -from django.shortcuts import render +import logging + +from django.shortcuts import render, redirect +from django.db import IntegrityError + +from lib.anonymize_ip import anonymize_ip + +from .models import Signup + + +logger = logging.getLogger(__name__) + def index(request): + if request.method == "POST": + x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') + if x_forwarded_for: + ip = x_forwarded_for.split(',')[0] + else: + ip = request.META.get('REMOTE_ADDR') + anonymous_ip = anonymize_ip(ip) + + e = Email( + email = request.POST.get("email"), + anonymized_ip = anonymous_ip, + user_agent = request.META["HTTP_USER_AGENT"], + ) + + try: + e.clean_fields() + except ValidationError as e: + return e.render(request, "signup/index.html", + {"error_message": e.message}) + + logging.info("registering email={}".format(request.POST.get("email"))) + try: + e.save() + except IntegrityError: + # email already registered, presumably + return redirect("index") + "?already_subscribed" + except DatabaseError: + logger.exception("database error when registering an email") + err = "Sorry, database error. Please come back later." + return render(request, "signup/index.html", + {"error_message": err}) + return redirect("index") + "?success" + return render(request, "signup/index.html", {})