add simple view
This commit is contained in:
parent
0e762f8520
commit
2fc1b17f43
@ -104,3 +104,17 @@ COMPRESS_PRECOMPILERS = (
|
|||||||
)
|
)
|
||||||
|
|
||||||
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
|
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
|
||||||
|
|
||||||
|
LOGGING = {
|
||||||
|
"version": 1,
|
||||||
|
"disable_existing_loggers": False,
|
||||||
|
"handlers": {
|
||||||
|
"console": {
|
||||||
|
"class": "logging.StreamHandler",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"root": {
|
||||||
|
"handlers": ["console"],
|
||||||
|
"level": "WARNING",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
0
app/lib/__init__.py
Normal file
0
app/lib/__init__.py
Normal file
105
app/lib/anonymize_ip.py
Normal file
105
app/lib/anonymize_ip.py
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
# https://raw.githubusercontent.com/samuelmeuli/anonymize-ip/7f4faedf64643bd3de3607ab8905c1b1a3f8841a/anonymizeip/anonymize_ip.py
|
||||||
|
|
||||||
|
from ipaddress import ip_address
|
||||||
|
|
||||||
|
|
||||||
|
def anonymize_ip(
|
||||||
|
address,
|
||||||
|
ipv4_mask="255.255.255.0",
|
||||||
|
ipv6_mask="ffff:ffff:ffff:ffff:0000:0000:0000:0000"
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Anonymize the provided IPv4 or IPv6 address by setting parts of the
|
||||||
|
address to 0
|
||||||
|
|
||||||
|
:param str|int address: IP address to be anonymized
|
||||||
|
:param str ipv4_mask: Mask that defines which parts of an IPv4 address are
|
||||||
|
set to 0 (default: "255.255.255.0")
|
||||||
|
:param str ipv6_mask: Mask that defines which parts of an IPv6 address are
|
||||||
|
set to 0 (default: "ffff:ffff:ffff:ffff:0000:0000:0000:0000")
|
||||||
|
:return: Anonymized IP address
|
||||||
|
:rtype: str
|
||||||
|
"""
|
||||||
|
|
||||||
|
# IP address to be anonymized
|
||||||
|
address_packed = ip_address(address).packed
|
||||||
|
address_len = len(address_packed)
|
||||||
|
|
||||||
|
if address_len == 4:
|
||||||
|
# IPv4
|
||||||
|
ipv4_mask_packed = ip_address(ipv4_mask).packed
|
||||||
|
__validate_ipv4_mask(ipv4_mask_packed)
|
||||||
|
return __apply_mask(address_packed, ipv4_mask_packed, 4)
|
||||||
|
elif address_len == 16:
|
||||||
|
# IPv6
|
||||||
|
ipv6_mask_packed = ip_address(ipv6_mask).packed
|
||||||
|
__validate_ipv6_mask(ipv6_mask_packed)
|
||||||
|
return __apply_mask(address_packed, ipv6_mask_packed, 16)
|
||||||
|
else:
|
||||||
|
# Invalid address
|
||||||
|
raise ValueError("Address does not consist of 4 (IPv4) or 16 (IPv6) "
|
||||||
|
"octets")
|
||||||
|
|
||||||
|
|
||||||
|
def __apply_mask(address_packed, mask_packed, nr_bytes):
|
||||||
|
"""
|
||||||
|
Perform a bitwise AND operation on all corresponding bytes between the
|
||||||
|
mask and the provided address. Mask parts set to 0 will become 0 in the
|
||||||
|
anonymized IP address as well
|
||||||
|
|
||||||
|
:param bytes address_packed: Binary representation of the IP address to
|
||||||
|
be anonymized
|
||||||
|
:param bytes mask_packed: Binary representation of the corresponding IP
|
||||||
|
address mask
|
||||||
|
:param int nr_bytes: Number of bytes in the address (4 for IPv4, 16 for
|
||||||
|
IPv6)
|
||||||
|
:return: Anonymized IP address
|
||||||
|
:rtype: str
|
||||||
|
"""
|
||||||
|
|
||||||
|
anon_packed = bytearray()
|
||||||
|
for i in range(0, nr_bytes):
|
||||||
|
anon_packed.append(mask_packed[i] & address_packed[i])
|
||||||
|
return str(ip_address(bytes(anon_packed)))
|
||||||
|
|
||||||
|
|
||||||
|
def __validate_ipv4_mask(mask_packed):
|
||||||
|
# Test that mask only contains valid numbers
|
||||||
|
for byte in mask_packed:
|
||||||
|
if byte != 0 and byte != 255:
|
||||||
|
raise ValueError("ipv4_mask must only contain numbers 0 or 255")
|
||||||
|
|
||||||
|
# Test that IP address does not get anonymized completely
|
||||||
|
if mask_packed == b'\x00\x00\x00\x00':
|
||||||
|
raise ValueError("ipv4_mask cannot be set to \"0.0.0.0\" (all "
|
||||||
|
"anonymized addresses will be 0.0.0.0)")
|
||||||
|
|
||||||
|
# Test that IP address is changed by anonymization
|
||||||
|
if mask_packed == b'\xff\xff\xff\xff':
|
||||||
|
raise ValueError("ipv4_mask cannot be set to \"255.255.255.255\" "
|
||||||
|
"(addresses will not be anonymized)")
|
||||||
|
|
||||||
|
|
||||||
|
def __validate_ipv6_mask(mask_packed):
|
||||||
|
# Test that mask only contains valid numbers
|
||||||
|
for byte in mask_packed:
|
||||||
|
if byte != 0 and byte != 255:
|
||||||
|
raise ValueError("ipv6_mask must only contain numbers 0 or ffff")
|
||||||
|
|
||||||
|
# Test that IP address does not get anonymized completely
|
||||||
|
if (
|
||||||
|
mask_packed ==
|
||||||
|
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
):
|
||||||
|
raise ValueError("ipv6_mask cannot be set to "
|
||||||
|
"\"0000:0000:0000:0000:0000:0000:0000:0000\" (all "
|
||||||
|
"anonymized addresses will be 0.0.0.0)")
|
||||||
|
|
||||||
|
# Test that IP address is changed by anonymization
|
||||||
|
if (
|
||||||
|
mask_packed ==
|
||||||
|
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
|
||||||
|
):
|
||||||
|
raise ValueError("ipv6_mask cannot be set to "
|
||||||
|
"\"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff\" "
|
||||||
|
"(addresses will not be anonymized)")
|
@ -77,12 +77,18 @@
|
|||||||
<form class="pure-form pure-form-aligned" action="{% url "index" %}" method="post">
|
<form class="pure-form pure-form-aligned" action="{% url "index" %}" method="post">
|
||||||
<fieldset>
|
<fieldset>
|
||||||
{% csrf_token %}
|
{% csrf_token %}
|
||||||
{% comment %}
|
|
||||||
<span class="pure-form-message message-error">{{ .Err }}</span>
|
{% if error_message %}
|
||||||
|
<span class="pure-form-message message-error">{{ error_message }}</span>
|
||||||
|
{% elif request.GET.success %}
|
||||||
<span class="pure-form-message message-success">
|
<span class="pure-form-message message-success">
|
||||||
|
{% if request.GET.already_subscribed %}
|
||||||
|
Looks like you were already subscribed. We share your excitement!
|
||||||
|
{% else %}
|
||||||
You are now subscribed!
|
You are now subscribed!
|
||||||
|
{% endif %}
|
||||||
</span>
|
</span>
|
||||||
{% endcomment %}
|
{% endif %}
|
||||||
|
|
||||||
<input type="email" class="subscribe-email" name="email" id="email" placeholder="your@email.com" required />
|
<input type="email" class="subscribe-email" name="email" id="email" placeholder="your@email.com" required />
|
||||||
|
|
||||||
|
@ -1,3 +1,21 @@
|
|||||||
from django.test import TestCase
|
from django.test import TestCase, Client
|
||||||
|
|
||||||
|
from django.urls import reverse
|
||||||
|
|
||||||
|
def SignupViewTests(TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.client = Client()
|
||||||
|
|
||||||
|
def test_index(self):
|
||||||
|
resp = self.client.get(reverse("index"))
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
self.assertContains(resp, "11sync is a privacy-respecting way")
|
||||||
|
|
||||||
|
def test_ok_signup(self):
|
||||||
|
resp = self.client.post(reverse("index"), {"email": "foo@example.com"},
|
||||||
|
follow = True,
|
||||||
|
HTTP_USER_AGENT = "foo-agent",
|
||||||
|
REMOTE_ADDR = "127.0.0.2")
|
||||||
|
self.assertEqual(resp.redirect_chain, ('http://testserver/?success', 302))
|
||||||
|
self.assertContains(resp, "You are now subscribed!")
|
||||||
|
|
||||||
# Create your tests here.
|
|
||||||
|
@ -1,4 +1,48 @@
|
|||||||
from django.shortcuts import render
|
import logging
|
||||||
|
|
||||||
|
from django.shortcuts import render, redirect
|
||||||
|
from django.db import IntegrityError
|
||||||
|
|
||||||
|
from lib.anonymize_ip import anonymize_ip
|
||||||
|
|
||||||
|
from .models import Signup
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def index(request):
|
def index(request):
|
||||||
|
if request.method == "POST":
|
||||||
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
||||||
|
if x_forwarded_for:
|
||||||
|
ip = x_forwarded_for.split(',')[0]
|
||||||
|
else:
|
||||||
|
ip = request.META.get('REMOTE_ADDR')
|
||||||
|
anonymous_ip = anonymize_ip(ip)
|
||||||
|
|
||||||
|
e = Email(
|
||||||
|
email = request.POST.get("email"),
|
||||||
|
anonymized_ip = anonymous_ip,
|
||||||
|
user_agent = request.META["HTTP_USER_AGENT"],
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
e.clean_fields()
|
||||||
|
except ValidationError as e:
|
||||||
|
return e.render(request, "signup/index.html",
|
||||||
|
{"error_message": e.message})
|
||||||
|
|
||||||
|
logging.info("registering email={}".format(request.POST.get("email")))
|
||||||
|
try:
|
||||||
|
e.save()
|
||||||
|
except IntegrityError:
|
||||||
|
# email already registered, presumably
|
||||||
|
return redirect("index") + "?already_subscribed"
|
||||||
|
except DatabaseError:
|
||||||
|
logger.exception("database error when registering an email")
|
||||||
|
err = "Sorry, database error. Please come back later."
|
||||||
|
return render(request, "signup/index.html",
|
||||||
|
{"error_message": err})
|
||||||
|
return redirect("index") + "?success"
|
||||||
|
|
||||||
return render(request, "signup/index.html", {})
|
return render(request, "signup/index.html", {})
|
||||||
|
Loading…
Reference in New Issue
Block a user