From 79efa926b5449148abb8fb2f1df74ae21d264d2a Mon Sep 17 00:00:00 2001 From: Joris van Rantwijk Date: Sun, 8 Dec 2019 15:07:19 +0100 Subject: [PATCH] First release of Digilyzer --- README.txt | 162 +++ Staat_der_Nederlanden_EV_Root_CA.pem | 32 + digilyzer.py | 1244 +++++++++++++++++++++ qrdecode.py | 1549 ++++++++++++++++++++++++++ requirements.txt | 5 + 5 files changed, 2992 insertions(+) create mode 100644 README.txt create mode 100644 Staat_der_Nederlanden_EV_Root_CA.pem create mode 100755 digilyzer.py create mode 100644 qrdecode.py create mode 100644 requirements.txt diff --git a/README.txt b/README.txt new file mode 100644 index 0000000..5a3f8d9 --- /dev/null +++ b/README.txt @@ -0,0 +1,162 @@ + +Digilyzer: Open-source alternatief voor de DigiD app +==================================================== + +Digilyzer is een open-source alternatief voor de DigiD app. +Digilyzer werkt op Windows en Linux PCs, dus niet als mobiele app. + +Op dit moment is Digilyzer experimentele software en ongeschikt voor +serieus gebruik. Maar mogelijk is het een eerste stap op weg naar +betrouwbare, free open-source DigiD software. + + +WAARSCHUWING: + + * Ondeskundig gebruik van Digilyzer kan de beveiliging van uw DigiD + in gevaar brengen. Momenteel is Digilyzer alleen geschikt voor + computerprogrammeurs, niet voor eindgebruikers. + + * Digilyzer is onofficiele, experimentele software. Het is niet gemaakt + of getest door de overheid. Digilyzer kan plotseling stoppen met + werken na updates van het officiele DigiD systeem. + + * Geef nooit uw DigiD gegevens aan onbekende software, apps of websites. + Gebruik alleen de officiele DigiD website en de officiele DigiD app + van de rijksoverheid. + + + +Voordelen van Digilyzer +----------------------- + +De source code van Digilyzer is openbaar. Hierdoor kan iedereen +controleren hoe Digilyzer werkt. Bovendien kan het programma worden +aangepast om te werken op alle soorten computers en telefoons. + +Daarentegen is de officiele DigiD app alleen beschikbaar via de app +stores van Google en Apple. De werking van de DigiD app is bovendien +geheim, waardoor niemand kan controleren welke gegevens de app doorgeeft +aan de overheid. + +Digilyzer werkt op Windows en Linux PCs, in tegenstelling tot de +officiele DigiD app die alleen werkt op mobiele apparaten. Apparaten die +je overal mee naartoe neemt terwijl de DigiD app beveiligd is met een +pincode van slechts 5 cijfers. + + + +Veiligheidsrisicos van Digilyzer +-------------------------------- + +Digilyzer heeft momenteel zoveel nadelen dat het gebruik ervan is +af te raden. Het is veiliger om alleen de officiele DigiD website en +de officiele DigiD app van de rijksoverheid te gebruiken. + +Omdat Digilyzer onofficiele software is, kun je er als gebruiker niet +op vertrouwen dat het doet wat het belooft. Het is zeer riskant om je +DigiD gegevens in te voeren in onbekende software zoals Digilyzer. +Wie toch Digilyzer wil gebruiken, moet eerst de Python code zorgvuldig +bestuderen en begrijpen. Pas daarna kun je erop vertrouwen dat Digilyzer +geen misbruik maakt van je gegevens. + +Digilyzer bewaart inloggegegens in een bestand op de computer. +(De officiele DigiD app bewaart deze gegevens op de mobiele telefoon.) +Als dit bestand in verkeerde handen valt, kunnen onbevoegden inloggen op +je DigiD account. In het algemeen zijn PCs en laptops slechter beveiligd +dan telefoons en tablets. Op PCs zijn applicaties niet goed van elkaar +gescheiden. Een onbetrouwbaar programma of website kan daardoor de hele +PC overnemen. + +De source code van Digilyzer is niet gecontroleerd door experts. +Er kunnen dus fouten in Digilyzer zitten die de beveiliging van je DigiD +in gevaar brengen. + + + +Werking van Digilyzer +--------------------- + +Digilyzer maakt verbinding met de DigiD server van de overheid op +dezelfde manier als de officiele DigiD app dat doet. De DigiD server +ziet geen verschil tussen Digilyzer en de echte DigiD app. + +Digilyzer is gebaseerd op versie 5.13.2 van de DigiD app. Als nieuwere +versies van de app op een andere manier met de server communiceren, zal +Digilyzer waarschijnlijk niet meer werken. + +Digilyzer is gemaakt in Python en gebruikt de volgende libraries: + * Python 3.7.3 + * numpy 1.16.2 + * urllib3 1.24.1 + * cryptography 2.6.1 + * Pillow 5.4.1 + * PyGObject 3.30.4 (alleen voor Linux) + +Gebruik Digilyzer alleen op een goed beveiligde computer. +Digilyzer bewaart gegevens in een bestand "digilyzer.settings" in de +home directory. In combinatie met je pincode, geeft dit bestand toegang +tot je DigiD account. Als dit bestand in verkeerde handen valt, kunnen +onbevoegden inloggen op je DigiD account. Als dit bestand kwijtraakt, +kun je niet meer inloggen met Digilyzer. Het bestand is vergelijkbaar +met de gegevens die de officiele DigiD app op je smartphone bewaart. + +Net als de officiele app, moet Digilyzer eerst geactiveerd worden +voordat je ermee kunt inloggen. Activeren gaat als volgt: + + 1. Geef het commando "python3 digilyzer.py activate". + + 2. Kies eventueel een wachtwoord om Digilyzer gegevens te beveiligen. + + 3. Vul je gebruikersnaam en wachtwoord in (dezelfde als op de DigiD + website). Deze gegevens worden naar de DigiD server gestuurd om + activatie mogelijk te maken. De officiele DigiD app doet dat + namelijk ook. + + 4. Kies een eigen pincode van 5 cijfers. + + 5. Er wordt een activatiecode gestuurd via SMS of via een brief. + + 6a. Indien SMS: Voer de activatiecode in via Digilyzer. + + 6b. Indien brief: Wacht op de brief met de activatiecode. + Geef dan het commando "python3 digilyzer.py complete". + Voer de activatiecode in via Digilyzer. + Voer je pincode in om activatie te bevestigen. + +Eenmaal geactiveerd, kun je Digilyzer gebruiken om in te loggen op +websites zoals digid.nl, de belastingdienst of zorgverzekeraars. +Inloggen (authenticatie) gaat als volgt: + + 1. Geef het commando "python3 digilyzer.py authenticate". + + 2. Digilyzer toont een "koppelcode". + Vul deze koppelcode in op de website waar je wil inloggen. + + 3. De website laat een QR code zien. + Maak een screenshot van de QR code en geef deze aan Digilyzer. + + 4. Digilyzer toont een authenticatieverzoek. + Controleer of dat klopt met de website waar je wil inloggen. + Voer je pincode in om activatie te bevestigen. + +Sommige functies van de officiele DigiD app, ontbreken in Digilyzer. +Voorbeelden van ontbrekende functies zijn: pincode veranderen, +paspoort controle via NFC, inloggen op mobiele apps. + + + +Geen ondersteuning +------------------ + +Digilyzer is onofficiele, experimentele software. Het is niet gemaakt +of getest door de overheid. Er is geen enkele garantie dat deze software +correct werkt. Bovendien kan Digilyzer plotseling stoppen met werken na +updates van het officiele DigiD systeem. + +Er is niemand beschikbaar die vragen over het gebruik van Digilyzer +kan beantwoorden. + +De DigiD helpdesk weet niets over Digilyzer en kan geen ondersteuning +bieden bij het gebruik ervan. + +-- diff --git a/Staat_der_Nederlanden_EV_Root_CA.pem b/Staat_der_Nederlanden_EV_Root_CA.pem new file mode 100644 index 0000000..315f665 --- /dev/null +++ b/Staat_der_Nederlanden_EV_Root_CA.pem @@ -0,0 +1,32 @@ +-----BEGIN CERTIFICATE----- +MIIFcDCCA1igAwIBAgIEAJiWjTANBgkqhkiG9w0BAQsFADBYMQswCQYDVQQGEwJO +TDEeMBwGA1UECgwVU3RhYXQgZGVyIE5lZGVybGFuZGVuMSkwJwYDVQQDDCBTdGFh +dCBkZXIgTmVkZXJsYW5kZW4gRVYgUm9vdCBDQTAeFw0xMDEyMDgxMTE5MjlaFw0y +MjEyMDgxMTEwMjhaMFgxCzAJBgNVBAYTAk5MMR4wHAYDVQQKDBVTdGFhdCBkZXIg +TmVkZXJsYW5kZW4xKTAnBgNVBAMMIFN0YWF0IGRlciBOZWRlcmxhbmRlbiBFViBS +b290IENBMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA48d+ifkkSzrS +M4M1LGns3Amk41GoJSt5uAg94JG6hIXGhaTK5skuU6TJJB79VWZxXSzFYGgEt9nC +UiY4iKTWO0Cmws0/zZiTs1QUWJZV1VD+hq2kY39ch/aO5ieSZxeSAgMs3NZmdO3d +Z//BYY1jTw+bbRcwJu+r0h8QoPnFfxZpgQNH7R5ojXKhTbImxrpsX23Wr9GxE46p +rfNeaXUmGD5BKyF/7otdBwadQ8QpCiv8Kj6GyzyDOvnJDdrFmeK8eEEzduG/L13l +pJhQDBXd4Pqcfzho0LKmeqfRMb1+ilgnQ7O6M5HTp5gVXJrm0w912fxBmJc+qiXb +j5IusHsMX/FjqTf5m3VpTCgmJdrV8hJwRVXj33NeN/UhbJCONVrJ0yPr08C+eKxC +KFhmpUZtcALXEPlLVPxdhkqHz3/KRawRWrUgUY0viEeXOcDPusBCAUCZSCELa6fS +/ZbV0b5GnUngC6agIk440ME8MLxwjyx1zNDFjFE7PZQIZCZhfbnDZY8UnCHQqv0X +cgOPvZuM5l5Tnrmd74K74bzickFbIZTTRTeU0d8JOV3nI6qaHcptqAqGhYqCvkIH +1vI4gnPah1vlPNOePqc7nvQDs/nxfRN0Av+7oeX6AHkcpmZBiFxgV6YuCcS6/ZrP +px9Aw7vMWgpVSzs4dlG4Y4uElBbmVvMCAwEAAaNCMEAwDwYDVR0TAQH/BAUwAwEB +/zAOBgNVHQ8BAf8EBAMCAQYwHQYDVR0OBBYEFP6rAJCYniT8qcwaivsnuL8wbqg7 +MA0GCSqGSIb3DQEBCwUAA4ICAQDPdyxuVr5Os7aEAJSrR8kN0nbHhp8dB9O2tLsI +eK9p0gtJ3jPFrK3CiAJ9Brc1AsFgyb/E6JTe1NOpEyVa/m6irn0F3H3zbPB+po3u +2dfOWBfoqSmuc0iH55vKbimhZF8ZE/euBhD/UcabTVUlT5OZEAFTdfETzsemQUHS +v4ilf0X8rLiltTMMgsT7B/Zq5SWEXwbKwYY5EdtYzXc7LMJMD16a4/CrPmEbUCTC +wPTxGfARKbalGAKb12NMcIxHowNDXLldRqANb/9Zjr7dn3LDWyvfjFvO5QxGbJKy +CqNMVEIYFRIYvdr8unRu/8G2oGTYqV9Vrp9canaW2HNnh/tNf1zuacpzEPuKqf2e +vTY4SUmH9A4U8OmHuD+nT3pajnnUk+S7aFKErGzp85hwVXIy+TSrK0m1zSBi5Dp6 +Z2Orltxtrpfs/J92VoguZs9btsmksNcFuuEnL5O7Jiqik7Ab846+HUCjuTaPPoIa +Gl6I6lD4WeKDRikL40Rc4ZW2aZCaFG+XroHPaO+Zmr615+F/+PoTRxZMzG0IQOeL +eG9QgkRQP2YGiqtDhFZKDyAthg710tvSeopLzaXoTvFeJiUBWSOgftL2fiFX1ye8 +FVdMpEbB4IMeDExNH08GGeL5qPQ6gqGyeUN51q1veieQA6TqJIc/2b3Z6fJfUEkc +7uzXLg== +-----END CERTIFICATE----- diff --git a/digilyzer.py b/digilyzer.py new file mode 100755 index 0000000..61ee3bb --- /dev/null +++ b/digilyzer.py @@ -0,0 +1,1244 @@ +#!/usr/bin/env python3 + +""" +Digilyzer - een onofficiele, open-source DigiD app + +WAARSCHUWING: Dit is experimentele software. Ondeskundig gebruik kan + de beveiliging van uw DigiD in gevaar brengen. Lees eerst README.txt. + +Gebruik: + python3 digilyzer.py {status|checkversion|activate|complete|authenticate} + [--verbose] + +Functies: + status Activatiestatus weergeven (maakt geen contact met server) + checkversion Check versie en compatibiliteit met de server + activate Activatie starten + complete Activatie afronden na ontvangst van brief met activatiecode + authenticate Inloggen op een website via DigiD + +Opties: + --verbose Print extra details over de voortgang +""" + + +import sys +import argparse +import base64 +import enum +import getpass +import json +import os +import os.path +import pathlib +import re +import secrets +import ssl +import time +import uuid +import urllib3 +import PIL.Image +import cryptography.fernet +import cryptography.hazmat.primitives.asymmetric.ec +import cryptography.hazmat.primitives.ciphers +import cryptography.hazmat.primitives.ciphers.algorithms +import cryptography.hazmat.primitives.ciphers.modes +import cryptography.hazmat.primitives.hashes +import cryptography.hazmat.primitives.kdf.pbkdf2 +import cryptography.hazmat.primitives.padding +import cryptography.hazmat.primitives.serialization +import cryptography.hazmat.backends + +import qrdecode + + +opt_verbose = False + + +def log(s): + if opt_verbose: + print("..", s) + + +class ApiError(Exception): + def __init__(self, error_message, status=""): + super().__init__(error_message) + self.status = status + + +class ApplicationError(Exception): + pass + + +if sys.platform.startswith("linux"): + # Module PIL.ImageGrab is not supported on Linux. + # Use GTK to get screenshot / clipboard. + + def get_screenshot(): + """Take a screenshot and return it as a PIL image.""" + import io + import gi + gi.require_version("Gdk", "3.0") + from gi.repository import Gdk + win = Gdk.get_default_root_window() + w = win.get_width() + h = win.get_height() + pixbuf = Gdk.pixbuf_get_from_window(win, 0, 0, w, h) + if not pixbuf: + raise ApplicationError("Kan geen screenshot maken") + (ok, pngdata) = pixbuf.save_to_bufferv("png", [], []) + if not ok: + raise ApplicationError("Kan de screenshot niet converteren naar PNG") + return PIL.Image.open(io.BytesIO(pngdata)) + + def get_clipboard_image(): + """Get an image from the clipboard.""" + import io + import gi + gi.require_version("Gtk", "3.0") + from gi.repository import Gdk + from gi.repository import Gtk + clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) + pixbuf = clipboard.wait_for_image() + if not pixbuf: + raise ApplicationError("Geen afbeelding gevonden op het clipboard") + (ok, pngdata) = pixbuf.save_to_bufferv("png", [], []) + if not ok: + raise ApplicationError("Kan de afbeelding niet converteren naar PNG") + return PIL.Image.open(io.BytesIO(pngdata)) + +else: + + def get_screenshot(): + """Take a screenshot and return it as a PIL image.""" + import PIL.ImageGrab + from ctypes import windll + user32 = windll.user32 + user32.SetProcessDPIAware() # necessary to ensure full screen grab + return PIL.ImageGrab.grab() + + def get_clipboard_image(): + """Get an image from the clipboard.""" + import PIL.ImageGrab + return PIL.ImageGrab.grabclipboard() + + +class DigidUI: + + def msg(self, s): + print(s) + + def _prompt_string(self, message): + print(message, end="") + sys.stdout.flush() + val = sys.stdin.readline() + return val.rstrip("\r\n") + + def prompt_settings_passphrase(self): + return getpass.getpass("Digilyzer wachtwoord: ") + + def prompt_username(self): + return self._prompt_string("DigiD gebruikersnaam: ") + + def prompt_password(self): + return getpass.getpass("DigiD wachtwoord: ") + + def prompt_pincode(self): + return getpass.getpass("Pincode: ") + + def prompt_activationcode(self): + return self._prompt_string("Activatiecode: ") + + def show_koppelcode(self, koppelcode): + print("Koppelcode: {}".format(koppelcode)) + + def get_qrcode(self): + print("Op de website verschijnt een QR code.") + while True: + print() + print("Kies een methode om de QR code te laden:") + print(" 1. Via het clipboard") + print(" 2. Door middel van een screenshot") + print(" 3. Afbeelding laden uit een bestand") + while True: + s = self._prompt_string("Kies 1, 2 of 3: ") + try: + v = int(s.strip()) + if v in (1, 2, 3): + break + except ValueError: + pass + print("Ongeldige keuze") + print() + try: + if v == 1: + print("Plaats een screenshot van de QR code op het clipboard,") + print("bijvoorbeeld door in de browser op Alt-PrtScr te drukken.") + s = self._prompt_string("Druk daarna op Enter ... ") + print() + img = get_clipboard_image() + if img is None: + print("FOUT: Geen afbeelding gevonden op het clipboard") + continue + elif v == 2: + print("Zorg dat de QR code zichtbaar is op het scherm.") + s = self._prompt_string("Druk dan op Enter om een screenshot te maken ... ") + print() + img = get_screenshot() + if img is None: + print("FOUT: Kan geen screenshot maken") + continue + elif v == 3: + print("Maak een screenshot van de QR code en bewaar de afbeelding") + print("als een bestand. Voer de volledige bestandsnaam in.") + s = self._prompt_string("Bestandsnaam: ") + print() + img = PIL.Image.open(s) + print("Afbeelding: {}x{} pixels".format(img.width, img.height)) + except Exception as exc: + print("FOUT:", type(exc).__name__ + ":", exc) + continue + try: + qrdata = qrdecode.decode_qrcode(img) + except Exception as exc: + print("FOUT: Geen QR code gevonden in afbeelding") + print(type(exc).__name__ + ":", exc) + continue + return qrdata.decode("ascii") + + def confirm_activation(self): + while True: + val = self._prompt_string("Activatie starten (j/n)? ") + val = val.lower() + if val == "j": + return True + if val == "n": + return False + + +class HttpClient: + + # Host name van de DigiD backend server. + DIGID_HOST = "digid.nl" + + # SHA-256 fingerprint van het TLS certificaat van de DigiD backend server. + DIGID_SSL_FINGERPRINT = ( + "5b:c0:2b:e7:e0:77:83:be:aa:b0:5d:9c:b0:74:09:79" + + ":6d:5b:ec:ae:11:b1:7b:8e:f6:0e:f1:1c:e6:5d:ba:50") + + def __init__(self): + self._fixed_headers = { + "API-Version": "2", + "App-Version": "5.14.0", + "OS-Type": "Android", + "OS-Version": "7.1.1", + "Release-Type": "Productie", + "Accept": "application/json", + "User-Agent": "okhttp/3.2.0" + } + self.persistcookie = None + ca_certs = os.path.join(os.path.dirname(__file__), + "Staat_der_Nederlanden_EV_Root_CA.pem") + self._pool = urllib3.HTTPSConnectionPool( + host=self.DIGID_HOST, + maxsize=1, + ca_certs=ca_certs, + assert_fingerprint=self.DIGID_SSL_FINGERPRINT) + + def get(self, path): + """Stuur een HTTPS GET opdracht naar de DigiD server. + + Parameters: + path: Absolute pad component in de URL. + + Resultaat: + Dict van key/value paren uit het antwoord van de server. + """ + url = "https://" + self.DIGID_HOST + path + headers = {} + headers.update(self._fixed_headers) + if self.persistcookie: + headers["Cookie"] = "_persist=" + self.persistcookie + log("HTTP GET " + url) + resp = self._pool.request("GET", url, headers=headers, timeout=10.0) + return self._decode_response(resp) + + def post(self, path, data): + """Stuur een HTTPS POST opdracht naar de DigiD server. + + Parameters: + path: Absolute pad component in de URL. + data: Dict van key/value paren om naar de server te sturen. + + Resultaat: + Dict van key/value paren uit het antwoord van de server. + """ + body = json.dumps(data).encode("ascii") + url = "https://" + self.DIGID_HOST + path + headers = {"Content-Type": "application/json"} + headers.update(self._fixed_headers) + if self.persistcookie: + headers["Cookie"] = "_persist=" + self.persistcookie + log("HTTP POST " + url) + resp = self._pool.request("POST", + url, + body=body, + headers=headers, + timeout=10.0) + return self._decode_response(resp) + + def _handle_cookies(self, cookies): + """Verwerk een "Set-Cookie" header uit een HTTP antwoord. + + Alleen de "_persist" cookie wordt herkend. De inhoud hiervan + wordt bewaard en bij volgende HTTP opdrachten teruggestuurd naar + de DigiD server. + """ + for cookie in cookies: + if cookie.startswith("_persist="): + v = cookie[9:] + p = v.find(";") + if p >= 0: + v = v[:p] + p = v.find(" ") + if p >= 0: + v = v[:p] + if len(v) > 1 and v[0] == '"' and v[-1] == '"': + v = v[1:-1] + self.persistcookie = v + + def _decode_response(self, resp): + """Verwerk het antwoord op een HTTP opdracht. + + Behandel het antwoord als een JSON structuur. + Geef als resultaat een dict van key/value paren uit de JSON structuur. + + Geef een ApiError exception als de HTTP opdracht faalt, of + als de server een "status" veld ongelijk aan "ok" teruggeeft. + """ + log("HTTP status: {} {}".format(resp.status, resp.reason)) + if resp.status < 200 or resp.status > 299: + errmsg = "HTTP status {} ({})".format(resp.status, resp.reason) + status = "http_{}".format(resp.status) + raise ApiError(errmsg, status) + log("HTTP headers: " + str(resp.headers)) + log("HTTP data: {!r}".format(resp.data)) + cookies = resp.headers.getlist("Set-Cookie") + self._handle_cookies(cookies) + val = json.loads(resp.data.decode("ascii")) + if not isinstance(val, dict): + raise ApiError("Bad JSON message") + status = str(val.get("status", "")).lower() + error = str(val.get("error", "")) + if status and (status != "ok"): + errmsg = "Got API status {!r} ({})".format(status, error) + raise ApiError(errmsg, status) + return val + + +class LoginLevel(enum.IntEnum): + UNKNOWN = 0 + BASIS = 10 + MIDDEN = 20 + SUBSTANTIEEL = 25 + HOOG = 30 + + +class DigidSettings: + """Instellingen die worden opgeslagen tussen Digilyzer sessies. + + Deze gegevens worden aangemaakt tijdens het activatie proces, + en zijn in volgende Digilyzer sessies weer nodig om in te loggen. + + Deze gegevens worden opgeslagen in het bestand "digilyzer.settings". + """ + + _FIELDS = ( + "activation_status", # Status van activatieproces. + "app_id", # ID van deze client, gekozen door server. + "instance_id", # ID van deze client, gekozen door app. + "symmetric_key", # Encryptiesleutel voor de pincode. + "mask_code", # Willekeurige code voor maskeren van pincode. + "public_key", # Publieke ECDSA sleutel. + "private_key", # Geheime ECDSA sleutel. + "login_level") # Login niveau (integer). + + def __init__(self): + self.activation_status = "not_activated" + self.app_id = "" + self.instance_id = "" + self.symmetric_key = "" + self.mask_code = "" + self.login_level = 0 + self.public_key = "" + self.private_key = "" + + def from_dict(self, d): + for field in self._FIELDS: + setattr(self, field, d.get(field)) + + def as_dict(self): + return dict((field, getattr(self, field)) for field in self._FIELDS) + + +class DigidClient: + + DEVICE_NAME = "Samsung Galaxy S6" + + def __init__(self): + self.app_session_id = None + self._http = HttpClient() + + def set_session_id(self, session_id): + self.app_session_id = session_id + + def set_persistcookie(self, persistcookie): + self._http.persistcookie = persistcookie + + def check_version(self): + # Returns: + # action: str + # update_url: str + resp = self._http.get("/apps/version") + return resp + + def basic_authenticate(self, username, password): + # Returns: + # app_session_id: str + # activation_method: str + # app_authenticator_pending: bool + # max_amount: int + data = { + "username": username, + "password": password, + "device_name": self.DEVICE_NAME + } + resp = self._http.post("/apps/auth", data) + app_session_id = resp.get("app_session_id") + if not app_session_id: + raise ApiError("BasicAuthenticate did not return 'app_session_id'") + self.app_session_id = app_session_id + return resp + + def basic_authentication_session(self, settings, smscode): + # Returns: + # user_app_id: str + # koppelcode: str + # max_amount: int + data = { + "app_session_id": self.app_session_id, + "instance_id": settings.instance_id, + "device_name": self.DEVICE_NAME, + "smscode": smscode + } + resp = self._http.post("/apps/session", data) + return resp + + def enrollment_challenge(self, settings): + # Returns: + # challenge: str + data = { + "app_session_id": self.app_session_id, + "user_app_id": settings.app_id, + "app_public_key": settings.public_key + } + resp = self._http.post("/apps/challenge", data) + return resp + + def complete_challenge(self, settings, signed_challenge): + # Returns: + # symmetric_key: str + # iv: str + data = { + "app_session_id": self.app_session_id, + "signed_challenge": signed_challenge, + "app_public_key": settings.public_key, + "hardware_support": False, + "nfc_support": False + } + resp = self._http.post("/apps/challenge_response", data) + status = resp.get("status") + return resp + + def complete_activation(self, settings, masked_pincode): + # Returns: + # authentication_level: int + data = { + "app_session_id": self.app_session_id, + "masked_pincode": masked_pincode, + "user_app_id": settings.app_id + } + resp = self._http.post("/apps/pincode", data) + return resp + + def init_letter_activation(self): + data = {"app_session_id": self.app_session_id} + resp = self._http.post("/apps/letter", data) + + def poll_letter_activation(self): + data = {"app_session_id": self.app_session_id} + resp = self._http.post("/apps/letter_poll", data) + + def create_activation_code_session(self, settings): + # Returns: + # app_session_id: str + data = {"app_session_id": None, + "user_app_id": settings.app_id, + "re_request_letter": False} + resp = self._http.post("/apps/activationcode_session", data) + app_session_id = resp.get("app_session_id") + if not app_session_id: + raise ApiError("CreateActivationCodeSession did not return 'app_session_id'") + self.app_session_id = app_session_id + + def complete_letter_activation(self, activationcode): + data = { + "app_session_id": self.app_session_id, + "activationcode": activationcode + } + resp = self._http.post("/apps/activationcode", data) + + def send_sms(self): + # Returns: + # phonenumber: str + data = {"app_session_id": self.app_session_id} + resp = self._http.post("/apps/sms", data) + return resp + + def authenticate_challenge(self, settings): + # Returns: + # challenge: str + # iv: str + # webservice: str + # action: str + # return_url: str + # authentication_level: int (formatted as string) + data = { + "app_session_id": self.app_session_id, + "user_app_id": settings.app_id, + "instance_id": settings.instance_id + } + resp = self._http.post("/apps/challenge", data) + return resp + + def authenticate(self, settings, signed_challenge, masked_pincode): + # Returns: + # authentication_level: int + data = { + "app_session_id": self.app_session_id, + "user_app_id": settings.app_id, + "app_public_key": settings.public_key, + "signed_challenge": signed_challenge, + "masked_pincode": masked_pincode, + "upgrade_app": False + } + resp = self._http.post("/apps/authenticate", data) + return resp + + +def bytes_to_hex(data): + return "".join("{:02x}".format(b) for b in data) + + +def hex_to_bytes(s): + n = len(s) + if n % 2 != 0: + raise ValueError("Expecting even-length hexadecimal string") + values = [int(s[i:i+2], 16) for i in range(0, n, 2)] + return bytes(values) + + +def make_ec_keypair(): + """Maak een publiek/geheim paar sleutels op de elliptic curve SECP256R1. + + Geef als resultaat het tuple (public_key, private_key). + Beide sleutels worden in base16 representatie opgeleverd. + """ + priv_key = cryptography.hazmat.primitives.asymmetric.ec.generate_private_key( + cryptography.hazmat.primitives.asymmetric.ec.SECP256R1, + cryptography.hazmat.backends.default_backend()) + priv_key_bytes = priv_key.private_bytes( + cryptography.hazmat.primitives.serialization.Encoding.DER, + cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8, + cryptography.hazmat.primitives.serialization.NoEncryption()) + priv_key_str = bytes_to_hex(priv_key_bytes) + pub_key = priv_key.public_key() + pub_key_bytes = pub_key.public_bytes( + cryptography.hazmat.primitives.serialization.Encoding.X962, + cryptography.hazmat.primitives.serialization.PublicFormat.UncompressedPoint) + pub_key_str = bytes_to_hex(pub_key_bytes) + return (pub_key_str, priv_key_str) + + +def ec_sign(private_key, data): + """Onderteken een bericht met ECDSA-SHA256. + + Parameters: + private_key: Geheime sleutel in het formaat van make_ec_keypair(). + data: String om te ondertekenen. + + Resultaat: + Digitale handtekening als de base16 representatie van de ECDSA + handtekening in DER codering. + """ + priv_key_bytes = hex_to_bytes(private_key) + data_bytes = data.encode("utf-8") + priv_key = cryptography.hazmat.primitives.serialization.load_der_private_key( + priv_key_bytes, + password=None, + backend=cryptography.hazmat.backends.default_backend()) + algorithm = cryptography.hazmat.primitives.asymmetric.ec.ECDSA( + cryptography.hazmat.primitives.hashes.SHA256()) + signed_data = priv_key.sign(data_bytes, algorithm) + return bytes_to_hex(signed_data) + + +def valid_pincode(pincode): + return (len(pincode) == 5) and re.match(r"^[0-9]{5}$", pincode) + + +def make_pincode_mask(): + """Genereer een willekeurige code van 5 cijfers.""" + return "{:05d}".format(secrets.randbelow(100000)) + + +def mask_pincode(mask, pincode): + """Maskeer de pincode door er een masker code bij op te tellen.""" + mask_digits = [int(c) for c in mask] + pin_digits = [int(c) for c in pincode] + assert len(mask_digits) == 5 + assert len(pin_digits) == 5 + masked_digits = [(m + p) % 10 + for (m, p) in zip(mask_digits, pin_digits)] + masked_code = "".join(str(c) for c in masked_digits) + return masked_code + + +def encrypt_pincode(symmetric_key, iv, data): + """Versleutel de gemaskeerde pincode met AES-256-CBC. + + Parameters: + symmetric_key: Base16 representatie van de AES-256 sleutel. + iv: Base16 representatie van de IV. + data: Gemaskeerde pincode om te versleutelen. + + Resultaat: + Versleutelde data in base16 representatie. + """ + data_bytes = data.encode("utf-8") + key_bytes = hex_to_bytes(symmetric_key) + iv_bytes = hex_to_bytes(iv) + if len(key_bytes) != 32: + raise ValueError("Unsupported symmetric key length") + if len(iv_bytes) != 16: + raise ValueError("Unsupported IV length") + cipher = cryptography.hazmat.primitives.ciphers.Cipher( + cryptography.hazmat.primitives.ciphers.algorithms.AES(key_bytes), + cryptography.hazmat.primitives.ciphers.modes.CBC(iv_bytes), + backend=cryptography.hazmat.backends.default_backend()) + ctx = cipher.encryptor() + padder = cryptography.hazmat.primitives.padding.PKCS7(128).padder() + padded_bytes = padder.update(data_bytes) + padder.finalize() + encrypted_bytes = ctx.update(padded_bytes) + ctx.finalize() + return bytes_to_hex(encrypted_bytes) + + +def make_koppelcode(): + """Genereer een willekeurige "koppelcode" van 4 letters.""" + code_len = 4 + allowed_chars = "BCDFGHJLMNPQRSTVWXZ" + code_chars = [] + for i in range(code_len): + r = secrets.randbelow(len(allowed_chars)) + code_chars.append(allowed_chars[r]) + return "".join(code_chars) + + +class DigidApp: + + def __init__(self): + self.settings_file = os.path.join(pathlib.Path.home(), + "digilyzer.settings") + self.settings = None + self._cli = DigidClient() + self._ui = DigidUI() + self._settings_salt = None + self._settings_key = None + + def _derive_settings_key(self, salt, passphrase): + kdf = cryptography.hazmat.primitives.kdf.pbkdf2.PBKDF2HMAC( + algorithm=cryptography.hazmat.primitives.hashes.SHA256(), + length=32, + salt=salt, + iterations=4096, + backend=cryptography.hazmat.backends.default_backend()) + key = kdf.derive(passphrase.encode("utf-8")) + self._settings_salt = salt + self._settings_key = base64.urlsafe_b64encode(key) + + def _prompt_new_passphrase(self): + msg = self._ui.msg + msg("Tijdens activatie worden geheime sleutels en codes aangemaakt.") + msg("Deze instellingen worden opgeslagen in het bestand digilyzer.settings.") + msg("U kunt de instellingen beveiligen met een wachtwoord.") + msg("") + msg("Kies een wachtwoord om de instellingen te beveiligen.") + msg("Het gaat hier NIET om uw wachtwoord voor de DigiD website.") + msg("Het wachtwoord moet opnieuw worden ingevoerd bij elk gebruik van") + msg("deze software. Als u een leeg wachtwoord invoert (Enter), worden") + msg("de instellingen onbeveiligd opgeslagen.") + msg("") + while True: + passphrase = self._ui.prompt_settings_passphrase() + if passphrase.strip() == "": + msg("") + msg("Instellingen worden onbeveiligd opgeslagen.") + self._settings_salt = None + self._settings_key = None + return + msg("Voer nogmaals hetzelfde wachtwoord in ter controle.") + passphrase2 = self._ui.prompt_settings_passphrase() + if passphrase == passphrase2: + msg("") + salt = secrets.token_bytes(16) + self._derive_settings_key(salt, passphrase) + return + msg("Niet hetzelfde wachtwoord. Probeer opnieuw.") + + def _decrypt_settings(self, raw_data): + msg = self._ui.msg + tag = b"%digilyzer1%" + if raw_data.startswith(tag): + p = raw_data.find(b"%", len(tag)) + if p < 0: + raise ApplicationError("Ongeldig bestandsformaat voor instellingen AA") + salt = base64.urlsafe_b64decode(raw_data[len(tag):p]) + cipher_data = raw_data[p+1:] + if len(salt) != 16: + raise ApplicationError("Ongeldig bestandsformaat voor instellingen BB") + if salt == self._settings_salt: + # Try stored encryption key. + try: + plain_data = fernet.decrypt(cipher_data) + return plain_data + except cryptography.fernet.InvalidToken: + pass + msg("De instellingen zijn beveiligd met een wachtwoord.") + msg("Voer het wachtwoord in om de instellingen de ontcijferen.") + msg("Het gaat hier NIET om uw wachtwoord voor de DigiD website.") + while True: + msg("") + passphrase = self._ui.prompt_settings_passphrase() + self._derive_settings_key(salt, passphrase) + fernet = cryptography.fernet.Fernet(self._settings_key) + try: + plain_data = fernet.decrypt(cipher_data) + return plain_data + except cryptography.fernet.InvalidToken: + msg("Onjuist wachtwoord.") + else: + # plain text JSON + self._settings_salt = None + self._settings_key = None + return raw_data + + def _encrypt_settings(self, plain_data): + if self._settings_key: + fernet = cryptography.fernet.Fernet(self._settings_key) + cipher_data = fernet.encrypt(plain_data) + raw_data = (b"%digilyzer1%" + + base64.urlsafe_b64encode(self._settings_salt) + + b"%" + + cipher_data) + else: + raw_data = plain_data + return raw_data + + def load_settings(self): + msg = self._ui.msg + try: + with open(self.settings_file, "rb") as f: + msg("Instellingen worden gelezen uit {}".format(self.settings_file)) + raw_data = f.read() + except FileNotFoundError: + msg("Geen instellingen gevonden in {}".format(self.settings_file)) + self.settings = None + return False + plain_data = self._decrypt_settings(raw_data) + json_data = json.loads(plain_data) + self.settings = DigidSettings() + self.settings.from_dict(json_data) + return True + + def save_settings(self): + msg = self._ui.msg + msg("Instellingen worden opgeslagen in {}".format(self.settings_file)) + json_data = self.settings.as_dict() + plain_data = json.dumps(json_data, indent=4).encode("utf-8") + raw_data = self._encrypt_settings(plain_data) + with open(os.open(self.settings_file, + os.O_WRONLY | os.O_CREAT | os.O_TRUNC, + 0o600), "wb") as f: + f.write(raw_data) + + def check_version(self): + """Stuur een CheckVersion opdracht naar de DigiD server.""" + msg = self._ui.msg + msg("DigiD server query: CheckVersion") + resp = self._cli.check_version() + action = resp.get("action", "") + if action == "active": + msg("CheckVersion - OK") + else: + msg("CheckVersion niet OK - action = {!r}".format(action)) + raise ApplicationError("Onverwacht resultaat van CheckVersion") + + def status(self): + """Lees de Digilyzer instellingen en rapporteer de activatie status.""" + + msg = self._ui.msg + self.load_settings() + + msg("") + if self.settings is None: + msg("Status: geen instellingen gevonden") + else: + msg("Status: {}".format(self.settings.activation_status)) + msg("instance_id: {}".format(self.settings.instance_id)) + msg("app_id: {}".format(self.settings.app_id)) + try: + login_level_str = LoginLevel(self.settings.login_level).name + except ValueError: + login_level_str = "???" + msg("login_level: {} ({})" + .format(self.settings.login_level, login_level_str)) + + def activate(self): + """Start het activatie proces.""" + + msg = self._ui.msg + self.check_version() + self.load_settings() + + if self.settings is None: + self.settings = DigidSettings() + msg("") + self._prompt_new_passphrase() + + if self.settings.activation_status != "not_activated": + msg("Onverwachte actievatiestatus in instellingen: {}" + .format(self.settings.activation_status)) + raise ApplicationError( + "Kan niet starten met activatie - activatie was al gestart") + + # Genereer een willekeurige UUID om te gebruiken als "instance_id". + self.settings.instance_id = str(uuid.uuid4()) + log("nieuw instance_id = {}".format(self.settings.instance_id)) + + # Genereer een ECDSA publiek/geheim paar sleutels. + (self.settings.public_key, + self.settings.private_key) = make_ec_keypair() + self.settings.mask_code = make_pincode_mask() + self.save_settings() + + msg("") + msg("Om activatie te starten zijn uw DigiD gebruikersnaam en") + msg("wachtwoord nodig. Dit zijn dezelfde gegevens die u ook gebruikt") + msg("om in te loggen op digid.nl.") + msg("") + + username = self._ui.prompt_username() + if not username: + raise ApplicationError( + "Ongeldige gebruikersnaam - activatie afgebroken") + + password = self._ui.prompt_password() + if not password: + raise ApplicationError( + "Ongeldig wachtwoord - activatie afgebroken") + + msg("") + msg("Kies een pincode van 5 cijfers.") + msg("Deze pincode moet opnieuw worden ingevoerd bij" + + " elk gebruik van deze software.") + + while True: + pincode = self._ui.prompt_pincode() + if not valid_pincode(pincode): + msg("Ongeldige pincode, probeer opnieuw.") + msg("Voer nogmaals dezelfde pincode in ter controle.") + pincode2 = self._ui.prompt_pincode() + if pincode2 == pincode: + break + msg("Pincodes zijn niet hetzelfde, probeer opnieuw.") + + msg("") + msg("DigiD server query: BasicAuthenticate") + resp = self._cli.basic_authenticate(username, password) + activation_method = resp.get("activation_method") + if resp.get("app_authenticator_pending"): + msg("WAARSCHUWING: DigiD server zegt activatie is al gestart.") + if activation_method == "standalone": + msg("Activatie methode: activatiecode via SMS") + elif activation_method == "letter": + msg("Activatie methode: activatiecode via brief") + else: + raise ApplicationError( + "Onbekende activatie methode {!r}".format(activation_method)) + + msg("") + if not self._ui.confirm_activation(): + return + + if activation_method == "standalone": + self._activate_sms() + elif activation_method == "letter": + self._activate_letter() + + msg("DigiD server query: EnrollmentChallenge") + resp = self._cli.enrollment_challenge(self.settings) + challenge = resp.get("challenge") + if not challenge: + raise ApiError("EnrollmentChallenge did not return 'challenge'") + + signed_challenge = ec_sign(self.settings.private_key, challenge) + msg("DigiD server query: CompleteChallenge") + resp = self._cli.complete_challenge(self.settings, signed_challenge) + self.settings.symmetric_key = resp.get("symmetric_key") + iv = resp.get("iv") + if not self.settings.symmetric_key: + raise ApiError("CompleteChallenge did not return 'symmetric_key'") + if not iv: + raise ApiError("CompleteChallenge did not return 'iv'") + + masked_pincode = mask_pincode(self.settings.mask_code, pincode) + masked_pincode = encrypt_pincode(self.settings.symmetric_key, + iv, + masked_pincode) + + msg("DigiD server query: CompleteActivation") + authentication_level = None + try: + resp = self._cli.complete_activation(self.settings, masked_pincode) + authentication_level = resp.get("authentication_level") + except ApiError as exc: + if (activation_method != "letter") or (exc.status != "pending"): + raise + + msg("") + if activation_method == "standalone": + if not isinstance(authentication_level, int): + raise ApiError( + "CompleteActivation did not return 'authentication_level'") + msg("Activatie geslaagd (login_level={})." + .format(authentication_level)) + msg("De 'authenticate' functie is nu beschikbaar.") + self.settings.activation_status = "activated" + self.settings.login_level = authentication_level + elif activation_method == "letter": + msg("Brief met activatiecode aangevraagd.") + msg("Gebruik de 'complete' functie na ontvangst van de brief.") + self.settings.activation_status = "pending" + self.save_settings() + + def _activate_sms(self): + """Ontvang activatiecode via SMS en bevestig deze naar de server.""" + + msg = self._ui.msg + msg("") + msg("DigiD server query: SendSMS") + resp = self._cli.send_sms() + phonenumber = resp.get("phonenumber") + + msg("") + msg("U ontvangt een activatiecode via SMS op telefoonnr {}." + .format(phonenumber)) + msg("Voer deze code in (6 tekens).") + + while True: + smscode = self._ui.prompt_activationcode() + smscode = smscode.strip() + if len(smscode) == 6: + break + msg("Ongeldige activatiecode, probeer opnieuw.") + msg("") + + msg("DigiD server query: BasicAuthenticationSession") + resp = self._cli.basic_authentication_session(self.settings, smscode) + self.settings.app_id = resp.get("user_app_id") + if not self.settings.app_id: + raise ApiError("BasicAuthenticationSession did not return 'user_app_id'") + + def _activate_letter(self): + """Vraag de server om een activatiecode per brief.""" + + msg = self._ui.msg + msg("DigiD server query: BasicAuthenticationSession") + resp = self._cli.basic_authentication_session(self.settings, None) + self.settings.app_id = resp.get("user_app_id") + if not self.settings.app_id: + raise ApiError("BasicAuthenticationSession did not return 'user_app_id'") + + msg("DigiD server query: InitLetterActivation") + self._cli.init_letter_activation() + + while True: + time.sleep(2) + msg("DigiD server query: PollLetterActivation") + try: + self._cli.poll_letter_activation() + except ApiError as exc: + if exc.status != "pending": + raise + continue + break + + def complete(self): + """Activatie afronden na ontvangst van de brief met activatiecode.""" + + msg = self._ui.msg + self.check_version() + self.load_settings() + + if self.settings is None: + raise ApplicationError( + "Kan activatie niet afronden - geen instellingen gevonden") + if self.settings.activation_status != "pending": + msg("Onverwachte actievatiestatus in instellingen: {}" + .format(self.settings.activation_status)) + if self.settings.activation_status == "activated": + raise ApplicationError( + "Kan activatie niet afronden - app is al geactiveerd") + else: + raise ApplicationError( + "Kan activatie niet afronden - activatie nog niet gestart") + + msg("") + msg("U heeft een brief ontvangen met een activatiecode (9 tekens).") + msg("Voer deze code in.") + activationcode = self._ui.prompt_activationcode() + activationcode = activationcode.strip() + if len(activationcode) != 9: + raise ApplicationError("Ongeldige activatiecode - afgebroken.") + + msg("") + msg("DigiD server query: CreateActivationCodeSession") + self._cli.create_activation_code_session(self.settings) + + self._authentication_flow(letter_activation=True) + + msg("DigiD server query: CompleteLetterActivation") + self._cli.complete_letter_activation(activationcode.upper()) + + msg("") + msg("Activatie geslaagd (login_level={})." + .format(self.settings.login_level)) + msg("De 'authenticate' functie is nu beschikbaar.") + self.settings.activation_status = "activated" + self.save_settings() + + def _authentication_flow(self, letter_activation): + """Doorloop het authenticatie proces.""" + + msg = self._ui.msg + msg("DigiD server query: AuthenticateChallenge") + resp = self._cli.authenticate_challenge(self.settings) + + challenge = resp.get("challenge") + iv = resp.get("iv") + webservice = resp.get("webservice") + action = resp.get("action") + authentication_level = resp.get("authentication_level") + + if not challenge: + raise ApiError("AuthenticateChallenge did not return 'challenge'") + if not iv: + raise ApiError("AuthenticateChallenge did not return 'iv'") + + msg("") + msg("Authenticatie verzoek:") + msg(" actie: {}".format(action)) + msg(" webservice: {}".format(webservice)) + msg(" authentication level: {}".format(authentication_level)) + + if letter_activation: + if webservice: + raise ApplicationError( + "Onverwachte 'webservice' {!r}".format(webservice)) + if action != "activation_by_letter": + raise ApplicationError( + "Onverwachte 'action' {!r}".format(action)) + else: + try: + if int(authentication_level) > self.settings.login_level: + msg("WAARSCHUWING: Gevraagd authenticatie niveau is " + + "hoger dan login level ({})." + .format(self.settings.login_level)) + except ValueError: + msg("WAARSCHUWING: Onbekend authenticatie niveau gevraagd.") + + msg("") + msg("Controleer of u bovenstaande authenticatie wilt bevestigen.") + msg("Druk anders op Ctrl-C om af te breken.") + msg("") + msg("Voer uw pincode in (5 cijfers) om authenticatie te bevestigen.") + + while True: + pincode = self._ui.prompt_pincode() + if valid_pincode(pincode): + break + msg("Ongeldige pincode, probeer opnieuw.") + msg("") + + signed_challenge = ec_sign(self.settings.private_key, challenge) + masked_pincode = mask_pincode(self.settings.mask_code, pincode) + masked_pincode = encrypt_pincode(self.settings.symmetric_key, + iv, + masked_pincode) + + msg("DigiD server query: Authenticate") + resp = self._cli.authenticate(self.settings, + signed_challenge, + masked_pincode) + + authentication_level = resp.get("authentication_level") + if isinstance(authentication_level, int): + self.settings.login_level = authentication_level + + def _parse_qrcode(self, qrcode): + """Verwerk de gescande QR code. + + Geef als resultaat een dict van key/value paren uit de code. + """ + + data = {} + if not qrcode.startswith("digid-app-auth:"): + raise ApplicationError("Ongeldige QR code") + remain = qrcode[15:] + if remain.startswith("//"): + remain = remain[2:] + frags = remain.split("&") + for frag in frags: + w = frag.split("=", 1) + if len(w) != 2: + raise ApplicationError("Ongeldige QR code") + data[w[0]] = w[1] + return data + + def authenticate(self): + """Start de authenticatie functie (om in te loggen op een website).""" + + msg = self._ui.msg + self.check_version() + self.load_settings() + + if self.settings is None: + raise ApplicationError( + "Authenticatie niet mogelijk - geen instellingen gevonden") + if self.settings.activation_status != "activated": + msg("Onverwachte actievatiestatus in instellingen: {}" + .format(self.settings.activation_status)) + raise ApplicationError( + "Authenticatie niet mogelijk - app is nog niet geactiveerd") + + koppelcode = make_koppelcode() + + msg("") + msg("Voer de koppelcode in op de website waar u wilt inloggen.") + self._ui.show_koppelcode(koppelcode) + msg("") + + qrcode = self._ui.get_qrcode() + log("QR code: " + qrcode) + + qrdata = self._parse_qrcode(qrcode) + qrhost = qrdata.get("host") + app_session_id = qrdata.get("app_session_id") + persistcookie = qrdata.get("lb") + verification_code = qrdata.get("verification_code") + qrtimestamp = qrdata.get("at") + + if qrhost and qrhost != "digid.nl": + raise ApplicationError( + "Ongeldige 'host' in QR code (host={!r})".format(qrhost)) + + if not app_session_id: + raise ApplicationError( + "Veld 'app_session_id' ontbreekt in QR code") + + if qrtimestamp: + try: + tsval = float(qrtimestamp) + except ValueError: + raise ApplicationError( + "Ongeldige timestamp ('at' veld) in QR code") + if time.time() > tsval + 15 * 60: + raise ApplicationError( + "QR code is ouder dan 15 minuten (at={})".format(tsval)) + + if not verification_code: + raise ApplicationError( + "Veld 'verification_code' ontbreekt in QR code") + if verification_code != koppelcode: + raise ApplicationError( + "Verkeerde koppelcode in QR code ({})" + .format(verification_code)) + + self._cli.set_session_id(app_session_id) + if persistcookie: + self._cli.set_persistcookie(persistcookie) + + self._authentication_flow(letter_activation=False) + + msg("") + msg("Authenticatie geslaagd.") + + +def main(): + global opt_verbose + + parser = argparse.ArgumentParser() + parser.format_help = lambda: __doc__ + "\n" + parser.format_usage = lambda: __doc__ + "\n" + + parser.add_argument("mode", type=str, nargs="?") + parser.add_argument("--verbose", action="store_true") + + args = parser.parse_args() + + if (not args.mode) or (args.mode.lower() == "help"): + parser.print_usage() + sys.exit(0) + + if args.verbose: + opt_verbose = True + + try: + if args.mode not in ("status", + "checkversion", + "activate", + "complete", + "authenticate"): + raise ApplicationError("Onbekende actie '{}'".format(args.mode)) + + app = DigidApp() + + if args.mode == "status": + app.status() + elif args.mode == "checkversion": + app.check_version() + elif args.mode == "activate": + app.activate() + elif args.mode == "complete": + app.complete() + elif args.mode == "authenticate": + app.authenticate() + + except ApplicationError as exc: + print("FOUT: {}".format(exc), file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() + diff --git a/qrdecode.py b/qrdecode.py new file mode 100644 index 0000000..6259367 --- /dev/null +++ b/qrdecode.py @@ -0,0 +1,1549 @@ +""" +Decoding QR codes from high-quality images. + +The algorithms in this module are unsophisticated and will +only work for computer generated, non-rotated, undamaged +QR codes. This module can not be used to process QR codes +from photographs or scanned images. + +Only Model 2 QR codes are supported. +""" + +import sys +import numpy as np +import PIL + + +# The Reed-Solomon codes for QR error correction are computed over +# the finite field GF(2**8) = GF(2)[a] / (a**8 + a**4 + a**3 + a**2 + 1). +# +# Elements of this field are represented as 8-bit unsigned integers, +# where each bit represents a coefficient of the polynomial with +# the least significant bit corresponding to the lowest order term. +# +# The element "a" (represented as integer value 2) is a primitive element +# of this field. +# +REED_SOLOMON_GF_POLY = 0b100011101 + + +# Construct lookup tables for exp/log: +# reed_solomon_gf_exp[k] = a**k +# reed_solomon_gf_log[a**k] = k +reed_solomon_gf_exp = 256 * [0] +reed_solomon_gf_log = 256 * [0] +v = 1 +for k in range(1, 256): + v = (v << 1) ^ ((v >> 7) * REED_SOLOMON_GF_POLY) + reed_solomon_gf_exp[k] = v + reed_solomon_gf_log[v] = k +reed_solomon_gf_exp[0] = 1 +reed_solomon_gf_log[1] = 0 +del k, v + + +class QRDecodeError(Exception): + """Raised when QR decoding fails.""" + pass + + +def debug_msg(msg): + """Print a debug message.""" + print(msg, file=sys.stderr) + + +def bits_to_word(bits): + """Convert a list or array of bits to an integer. + + Parameters: + bits: List or array of bits, starting with least-significant bit. + + Returns: + Unsigned integer value of the bits. + """ + v = 0 + p = 1 + for b in bits: + if b: + v += p + p *= 2 + return v + + +def decode_version_word(raw_word): + """Decode error correction bits in the version information. + + Parameters: + raw_word (int): 18-bit integer containing raw version information. + + Returns: + 6-bit integer containing the decoded QR code version. + + Raises: + QRDecodeError: If the version information can not be decoded. + """ + + # The version information uses a (18,6) BCH code with generator + # polynomial x**12 + x**11 + x**10 + x**9 + x**8 + x**5 + x**2 + 1. + + poly = 0b1111100100101 + + # This code only detects bit errors but does not correct them. + # TODO : implement proper error correction + + v = raw_word + while v >= 2**12: + if v & 1: + v ^= poly + v >>= 1 + + if v: + raise QRDecodeError("Data corruption in version information") + + return raw_word >> 12 + + +def decode_format_word(raw_word): + """Decode error correction bits in the format word. + + Parameters: + raw_word (int): 15-bit integer containing raw format information. + + Returns: + 5-bit integer containing the decoded format information. + + Raises: + QRDecodeError: If the format information can not be decoded. + """ + + # The format word uses a (15,5) BCH code with generator + # polynomial x**10 + x**8 + x**5 + x**4 + x**2 + x + 1. + + poly = 0b10100110111 + + # This code only detects bit errors but does not correct them. + # TODO : implement proper error correction + + v = raw_word + while v >= 2**10: + if v & 1: + v ^= poly + v >>= 1 + + if v: + raise QRDecodeError("Data corruption in format bits") + + return raw_word >> 10 + + +def quantize_image(image): + """Quantize the specified image into black and white pixels. + + Parameters: + image (PIL.Image): Input image. + + Returns: + 2D Numpy array where 0 = black, 1 = white. + """ + + # Convert to greyscale. + img_grey = image.convert(mode="L") + + # Extract pixel values. + data_grey = np.array(img_grey) + + # Quantize to black-and-white. + min_pixel = np.min(data_grey) + max_pixel = np.max(data_grey) + threshold = (min_pixel + max_pixel) // 2 + data_bw = (data_grey > threshold).astype(np.uint8) + + return data_bw + + +def scan_boundaries(img_data): + """Scan horizontally to detect color boundaries. + + Returns (boundpos, boundmap). + + boundpos is a 2D array of shape (nrow, ncol+2). + boundpos[y,k] is the X coordinate of the first pixel after the k-th + color boundary on row y. + boundpos[y,0] == 0 by definition. + boundpos[y,k] == ncol if k is larger than the number of color boundaries. + + boundmap is a 2D array of shape (nrow, ncol). + boundmap[y,x] is the number of color boundaries to the left of pixel (x,y). + """ + + (nrow, ncol) = img_data.shape + + boundpos = np.zeros((nrow, ncol + 2), dtype=np.uint32) + boundmap = np.zeros((nrow, ncol), dtype=np.uint32) + for y in range(nrow): + (edges,) = np.where(img_data[y, 1:] != img_data[y, :-1]) + boundpos[y, 0] = 0 + if len(edges) > 0: + boundpos[y, 1:1+len(edges)] = edges + 1 + boundpos[y, 1+len(edges):] = ncol + steps = np.zeros(ncol) + steps[edges+1] = 1 + boundmap[y] = np.cumsum(steps) + + return (boundpos, boundmap) + + +def check_position_detection(bounds): + """Check whether the specified range of 5 intervals has the right + proportions to correspond to a slice through a position detection pattern. + + An ideal slice through a position detection pattern consists of + 5 intervals colored B,W,B,W,B with lengths proportional to 1,1,3,1,1. + + Returns: + (center_coord, pixels_per_module) if this could be a position + detection pattern, otherwise (0, 0). + """ + + # Expected relative positions of black/white boundaries + # within the position detection pattern. + expect_bound_pos = [-3.5, -2.5, -1.5, 1.5, 2.5, 3.5] + + if (len(bounds) != 6) or (bounds[4] >= bounds[5]): + return (0, 0) + + pattern_width = float(bounds[5] - bounds[0]) + middle_width = float(bounds[3] - bounds[2]) + if (pattern_width < 7) or (middle_width < 3): + return (0, 0) + + center = float(sum(bounds)) / 6.0 + pitch = (pattern_width + middle_width) / 10.0 + + good = True + for k in range(6): + rel_bound_pos = (bounds[k] - center) / pitch + if abs(rel_bound_pos - expect_bound_pos[k]) >= 0.5: + good = False + break + + if not good: + return (0, 0) + + return (center, pitch) + + +def find_position_detection_patterns(img_data): + """Locate QR code position detection patterns. + + Parameters: + img_data (ndarray): 2D Numpy array containing black-and-white image. + + Returns: + List of tuples (x, y, dx, dy). + + Note that integer values of X/Y coordinates refer to pixel corners. + The upper-left corner of the image has coordinates (0, 0). + The center of the upper-left pixel has coordinates (0.5, 0.5). + The lower-right corner of the image has coordinates (nrow, ncol). + """ + + (nrow, ncol) = img_data.shape + + if (nrow < 7) or (ncol < 7): + return [] + + # Scan for horizontal and vertical color boundaries. + (hbounds, hmap) = scan_boundaries(img_data) + (vbounds, vmap) = scan_boundaries(img_data.transpose()) + + patterns_raw = [] + + # Scan each row to find position detection patterns. + for y in range(nrow): + # Start at the first black interval. + bx = 0 + if img_data[y, 0] != 0: + bx += 1 + # Consider each range of five intervals with colors B,W,B,W,B. + while hbounds[y, bx+4] < ncol: + # Check that this horizontal slice has the correct + # proportions for a position detection pattern. + (cx, dx) = check_position_detection(hbounds[y, bx:bx+6]) + if dx > 0: + # Check that the vertical slice also has a pattern. + x = int(cx) + by = vmap[x, y] - 2 + if img_data[y, x] == 0 and by >= 0 and by + 4 < nrow: + (cy, dy) = check_position_detection(vbounds[x, by:by+6]) + if (dy > 0) and (dx <= 2 * dy) and (dy <= 2 * dx): + # Add this location as a candidate pattern. + patterns_raw.append((cx, cy, dx, dy)) + bx += 2 + + # Discard duplicate entries. + patterns = [] + for fnd in patterns_raw: + (cx, cy, dx, dy) = fnd + dupl = False + for (tcx, tcy, tdx, tdy) in patterns: + if ((abs(tcx - cx) < 3 * max(dx, tdx)) + and (abs(tcy - cy) < 3 * max(dy, tdy))): + dupl = True + break + if not dupl: + patterns.append(fnd) + + return patterns + + +def make_finder_triplets(patterns): + """Select three position detection patterns that could + together form the finder pattern for a QR code. + + If multiple finder triplets are feasible, return them all, + starting with the highest QR code version. + + Parameters: + patterns: List of tuples describing position detection patterns. + + Returns: + List of tuples (finder_ul, finder_ur, finder_dl). + """ + + finder_triplets = [] + + # Try all candidates for the upper-left pattern. + for fnd in patterns: + (cx, cy, dx, dy) = fnd + + # Search a matching pattern with horizontal separation. + for fndh in patterns: + (hcx, hcy, hdx, hdy) = fndh + + # Check that pixel pitch is roughly compatible. + if 8 * abs(dx - hdx) > dx + hdx: + continue + if 8 * abs(dy - hdy) > dy + hdy: + continue + + # Check that Y coordinates match. + if abs(cy - hcy) > dy + hdy: + continue + + # Check that X separation is sufficient. + xsep = 2 * abs(cx - hcx) / (dx + hdx) + if xsep < 12: + continue + + # Search a matching pattern with vertical separation. + for fndv in patterns: + (vcx, vcy, vdx, vdy) = fndv + + # Check that pixel pitch is roughly compatible. + if 8 * abs(dx - vdx) > dx + vdx: + continue + if 8 * abs(dy - vdy) > dy + vdy: + continue + + # Check that X coordinates match. + if abs(cx - vcx) > dx + vdx: + continue + + # Check that X and Y separation are roughly compatible. + ysep = 2 * abs(cy - vcy) / (dy + vdy) + if ysep < 12 or ysep < 0.75 * xsep or ysep > 1.25 * xsep: + continue + + # Identify upper-right and lower-left patterns, + # depending on rotation. + if (hcx - cx) * (vcy - cy) > 0: + # not rotated or 180 degrees rotated + fnd_ur = fndh + fnd_dl = fndv + else: + # 90 degrees or 270 degrees rotated + fnd_ur = fndv + fnd_dl = fndh + + # Estimate QR code version. + qrver = (0.5 * (xsep + ysep) - 10) / 4.0 + + # Bonus point if QR code is non-rotated. + score = qrver + if hcx > cx and vcy > cy: + score += 1 + + finder_triplets.append((score, (fnd, fnd_ur, fnd_dl))) + + # Sort by decreasing score. + finder_triplets.sort(reverse=True) + + return [triplet for (score, triplet) in finder_triplets] + + +def extract_qr_version(img_data, finder_ul, finder_ur): + """Extract the QR version from the upper-right version field. + + Parameters: + img_data (ndarray): 2D array representing the quantized image. + finder_ul: Tuple representing the location of the upper-left + position detection pattern. + finder_ur: Tuple representing the location of the upper-right + position detection pattern. + + Returns: + QR code version (range 1 .. 40). + + Raises: + QRDecodeError: If the version information can not be decoded. + """ + + (ul_cx, ul_cy, ul_dx, ul_dy) = finder_ul + (ur_cx, ur_cy, ur_dx, ur_dy) = finder_ur + + # Create affine transform to specify the local QR matrix + # around the upper-right finder. + transform = np.zeros((3, 3)) + if abs(ur_cx - ul_cx) > abs(ur_cy - ul_cy): + # not rotated or 180 degrees rotated + transform[0, 0] = ur_dx * np.sign(ur_cx - ul_cx) + transform[1, 1] = ur_dy * np.sign(ur_cx - ul_cx) + else: + # 90 degrees or 270 degrees rotated + transform[1, 0] = ur_dy * np.sign(ur_cy - ul_cy) + transform[0, 1] = -ur_dx * np.sign(ur_cy - ul_cy) + + transform[0, 2] = ur_cx + transform[1, 2] = ur_cy + transform[2, 2] = 1.0 + + version_bits = [] + for i in range(18): + x = i % 3 - 7 + y = i // 3 - 3 + xp = transform[0,0] * x + transform[0,1] * y + transform[0,2] + yp = transform[1,0] * x + transform[1,1] * y + transform[1,2] + xp = int(xp) + yp = int(yp) + version_bits.append(1 - img_data[yp, xp]) + + # Convert bits to word. + version_word_raw = bits_to_word(version_bits) + + # Decode error correction bits. + qr_version = decode_version_word(version_word_raw) + + if qr_version < 1 or qr_version > 40: + raise QRDecodeError("Unsupported QR code version {}" + .format(qr_version)) + + return qr_version + + +def locate_qr_code(img_data, triplet): + """Consider the QR code defined by the specified finder triplet + and extract precise location, orientation and QR code version. + + Parameters: + img_data (ndarray): 2D array representing the quantized image. + triplet: Tuple (finder_ul, finder_ur, finder_dl). + + Returns: + Tuple (affine_transform, qr_version). + + Raises: + QRDecodeError: If no QR code was detected. + """ + + (finder_ul, finder_ur, finder_dl) = triplet + + (ul_cx, ul_cy, ul_dx, ul_dy) = finder_ul + (ur_cx, ur_cy, ur_dx, ur_dy) = finder_ur + (dl_cx, dl_cy, dl_dx, dl_dy) = finder_dl + + # Estimate the QR code version based on horizontal data. + hdist = ((2 * (ul_cx - ur_cx) / (ul_dx + ur_dx))**2 + + (2 * (ul_cy - ur_cy) / (ul_dy + ur_dy))**2)**0.5 + qrver = round((hdist - 10) / 4) + + # For QR versions higher than 6, decode the version information. + if qrver > 6: + qrver = extract_qr_version(img_data, finder_ul, finder_ur) + + # Determine nominal separation between finders. + qrsep = 10 + 4 * qrver + + # Determine module-to-pixel scaling and rotation. + transform = np.zeros((3, 3)) + transform[0, 0] = (ur_cx - ul_cx) / qrsep + transform[1, 0] = (ur_cy - ul_cy) / qrsep + transform[0, 1] = (dl_cx - ul_cx) / qrsep + transform[1, 1] = (dl_cy - ul_cy) / qrsep + + # Determine coordinates of upper-left coordinate. + transform[0, 2] = ul_cx - 3.5 * (transform[0, 0] + transform[0, 1]) + transform[1, 2] = ul_cy - 3.5 * (transform[1, 0] + transform[1, 1]) + transform[2, 2] = 1.0 + + return (transform, qrver) + + +def sample_qr_matrix(img_data, transform, qr_version): + """Sample each module in the QR matrix. + + Parameters: + img_data (ndarray): 2D array representing the quantized image. + transform (ndarray): Affine transform specifying the position, + size and orientation of the QR code. + qr_version (int): QR code version. + + Returns: + 2D square Numpy array containing the value of each module + (0 = white, 1 = black). + """ + + qrsize = 17 + 4 * qr_version + + xcoord = np.zeros((qrsize, qrsize)) + xcoord[0] = np.arange(qrsize) + 0.5 + xcoord[1:] = xcoord[0] + + ycoord = xcoord.transpose() + + xidx = transform[0,0] * xcoord + transform[0,1] * ycoord + transform[0,2] + yidx = transform[1,0] * xcoord + transform[1,1] * ycoord + transform[1,2] + + xidx = xidx.astype(np.int32) + yidx = yidx.astype(np.int32) + + # Clip coordinates to the image area. + (nrow, ncol) = img_data.shape + xidx = np.clip(xidx, 0, ncol - 1) + yidx = np.clip(yidx, 0, nrow - 1) + + matrix = img_data[yidx, xidx] + matrix = 1 - matrix + return matrix + + +def extract_format_data(matrix): + """Extract format information from the upper-left corner. + + Parameters: + matrix (ndarray): 2D array containing the QR matrix. + + Returns: + Tuple (error_correction_level, mask_pattern). + + Raises: + QRDecodeError: If the format information can not be decoded. + """ + + format_mask = 0b101010000010010 + + # Fetch format bits from matrix. + format_bits = [] + for i in range(6): + format_bits.append(matrix[i, 8]) + format_bits.append(matrix[7, 8]) + format_bits.append(matrix[8, 8]) + format_bits.append(matrix[8, 7]) + for i in range(6): + format_bits.append(matrix[8, 5-i]) + + # Convert bits to word and apply the format mask. + format_word_raw = bits_to_word(format_bits) + format_word_raw ^= format_mask + + # Decode error correction bits. + format_word = decode_format_word(format_word_raw) + + # Decode error correction level and mask pattern. + error_correction_idx = ((format_word >> 3) & 3) + mask_pattern = (format_word & 7) + + error_correction_table = "MLHQ" + error_correction_level = error_correction_table[error_correction_idx] + + return (error_correction_level, mask_pattern) + + +def make_mask_pattern(qrsize, mask_pattern): + """Generate the specified 2D XOR mask pattern. + + Parameters: + qrsize (int): Size of the QR code (number of modules along one edge). + mask_pattern (int): Mask pattern reference from format information. + + Returns: + 2D array to be XOR-ed with the matrix before extracting codewords. + """ + + xcoord = np.zeros((qrsize, qrsize), dtype=np.uint32) + xcoord[0] = np.arange(qrsize) + xcoord[1:] = xcoord[0] + ycoord = xcoord.transpose() + + if mask_pattern == 0: + mask_val = (xcoord + ycoord) % 2 + elif mask_pattern == 1: + mask_val = ycoord % 2 + elif mask_pattern == 2: + mask_val = xcoord % 3 + elif mask_pattern == 3: + mask_val = (xcoord + ycoord) % 3 + elif mask_pattern == 4: + mask_val = (ycoord // 2 + xcoord // 3) % 2 + elif mask_pattern == 5: + mask_val = (xcoord * ycoord) % 2 + (xcoord * ycoord) % 3 + elif mask_pattern == 6: + mask_val = ((xcoord * ycoord) % 2 + (xcoord * ycoord) % 3) % 2 + else: + mask_val = (xcoord + ycoord + (xcoord * ycoord) % 3) % 2 + + mask_bool = (mask_val == 0) + mask = mask_bool.astype(np.uint8) + + return mask + + +def get_alignment_pattern_locations(qr_version): + """Return a list of (x, y) locations of alignment patterns. + + Parameters: + qr_version (int): QR code version. + + Returns: + List of tuples (x, y) specifying the center location of + each alignment pattern. + """ + + coord_tbl = { + 1: [6], + 2: [6, 18], + 3: [6, 22], + 4: [6, 26], + 5: [6, 30], + 6: [6, 34], + 7: [6, 22, 38], + 8: [6, 24, 42], + 9: [6, 26, 46], + 10: [6, 28, 50], + 11: [6, 30, 54], + 12: [6, 32, 58], + 13: [6, 34, 62], + 14: [6, 26, 46, 66], + 15: [6, 26, 48, 70], + 16: [6, 26, 50, 74], + 17: [6, 30, 54, 78], + 18: [6, 30, 56, 82], + 19: [6, 30, 58, 86], + 20: [6, 34, 62, 90], + 21: [6, 28, 50, 72, 94], + 22: [6, 26, 50, 74, 98], + 23: [6, 30, 54, 78, 102], + 24: [6, 28, 54, 80, 106], + 25: [6, 32, 58, 84, 110], + 26: [6, 30, 58, 86, 114], + 27: [6, 34, 62, 90, 118], + 28: [6, 26, 50, 74, 98, 122], + 29: [6, 30, 54, 78, 102, 126], + 30: [6, 26, 52, 78, 104, 130], + 31: [6, 30, 56, 82, 108, 134], + 32: [6, 34, 60, 86, 112, 138], + 33: [6, 30, 58, 86, 114, 142], + 34: [6, 34, 62, 90, 118, 146], + 35: [6, 30, 54, 78, 102, 126, 150], + 36: [6, 24, 50, 76, 102, 128, 154], + 37: [6, 28, 54, 80, 106, 132, 158], + 38: [6, 32, 58, 84, 110, 136, 162], + 39: [6, 26, 54, 82, 110, 138, 166], + 40: [6, 30, 58, 86, 114, 142, 170] + } + + coords = coord_tbl[qr_version] + cmin = coords[0] + cmax = coords[-1] + + align_locs = [] + for y in coords: + for x in coords: + if ((x == cmin or y == cmin) + and (x in (cmin, cmax)) + and (y in (cmin, cmax))): + # Skip alignment patterns that would collide + # with position detection patterns. + pass + else: + align_locs.append((x, y)) + + return align_locs + + +def get_data_locations(qr_version): + """Return the locations of all modules representing codewords. + + The locations are sorted in the order of bit placement, starting with + the most significant bit of the codeword in the lower-right corner. + + Parameters: + qr_version (int): QR code version. + + Returns: + Array of shape (num_bits, 2) where each row describes + a module location with the X coordinate in the first column + and the Y coordinate in the second column. + """ + + qrsize = 17 + 4 * qr_version + + # Build a map that marks modules used in function patterns. + func_mask = np.zeros((qrsize, qrsize), dtype=np.uint8) + func_mask[:9, :9] = 1 # upper-left position detection pattern + func_mask[:9, -8:] = 1 # upper-right position detection pattern + func_mask[-8:, :9] = 1 # lower-left position detection pattern + func_mask[6, :] = 1 # horizontal timing pattern + func_mask[:, 6] = 1 # vertical timing pattern + + # Mark version information in the mask. + if qr_version > 6: + func_mask[:6, -11:-8] = 1 # upper-right version information + func_mask[-11:-8, :6] = 1 # lower-left version information + + # Mark alignment patterns in the mask. + align_locs = get_alignment_pattern_locations(qr_version) + for (x, y) in align_locs: + func_mask[y-2:y+3, x-2:x+3] = 1 + + # List the modules in the QR matrix, including special areas and + # function patterns but skipping the vertical timing column. + # List these modules from right to left in strips of two columns wide, + # alternating between upward and downward traversal of the strips. + + # Number of two-column strips. + nstrip = (qrsize - 1) // 2 + + # Prepare X coordinates. + xcoords = np.arange(qrsize - 1, 0, -1) # columns right-to-left + xcoords[-6:] -= 1 # skip vertical timing column + xcoords = xcoords.reshape((nstrip, 2)) # make groups of two columns + xcoords = np.repeat(xcoords, repeats=qrsize, axis=0) # repeat for each row + xcoords = xcoords.flatten() # ungroup + + # Prepare Y coordinates + ycoords = np.arange(qrsize) # list rows downward + ycoords = np.repeat(ycoords, repeats=2) # repeat for two columns per strip + ycoords = np.concatenate((ycoords[::-1], ycoords)) # upward + downward + ycoords = np.tile(ycoords, nstrip // 2) # repeat for each pair of strips + + # Out of this list of modules, select the modules that are not used + # in function patterns. + (idx,) = np.nonzero(1 - func_mask[ycoords, xcoords]) + + # Build the return array. + data_locations = np.column_stack((xcoords[idx], ycoords[idx])) + return data_locations + + +def extract_codewords(matrix, mask_pattern): + """Extract the sequence of codewords from the QR matrix. + + Parameters: + matrix (ndarray): 2D array containing the QR matrix. + mask_pattern (int): Mask pattern reference from format information. + + Returns: + Array of codewords in order of placement in the matrix. + """ + + qrsize = matrix.shape[0] + qr_version = (qrsize - 17) // 4 + + # Unmask the QR code. + mask = make_mask_pattern(qrsize, mask_pattern) + unmasked_matrix = matrix ^ mask + + # Get the locations of codewords in placement order. + data_locations = get_data_locations(qr_version) + xcoords = data_locations[:, 0] + ycoords = data_locations[:, 1] + + # Fetch codeword bits from the matrix. + data_bits = unmasked_matrix[ycoords, xcoords] + + # Split bits in groups of 8 bits per codeword. + nwords = len(data_bits) // 8 + codeword_bits = data_bits[:8*nwords].reshape((nwords, 8)) + + # Calculate value of each 8-bit codeword. + bit_values = (1 << np.arange(8))[::-1] + codewords = np.sum(codeword_bits * bit_values, axis=1) + + return codewords.astype(np.uint8) + + +def get_block_structure(qr_version, error_correction_level): + """Return the data block structure of the specified QR code type. + + Parameters: + qr_version (int): QR code version + error_correction_level (str): Error correction level (L, M, Q or H). + + Returns: + Tuple (n_codewords, n_check_words, n_blocks, max_errors). + """ + + num_codewords_table = { + 1: 26, 2: 44, 3: 70, 4: 100, + 5: 134, 6: 172, 7: 196, 8: 242, + 9: 292, 10: 346, 11: 404, 12: 466, + 13: 532, 14: 581, 15: 655, 16: 733, + 17: 815, 18: 901, 19: 991, 20: 1085, + 21: 1156, 22: 1258, 23: 1364, 24: 1474, + 25: 1588, 26: 1706, 27: 1828, 28: 1921, + 29: 2051, 30: 2185, 31: 2323, 32: 2465, + 33: 2611, 34: 2761, 35: 2876, 36: 3034, + 37: 3196, 38: 3362, 39: 3532, 40: 3706 + } + + block_structure_table = { + 1: [( 7, 1), ( 10, 1), ( 13, 1), ( 17, 1)], + 2: [( 10, 1), ( 16, 1), ( 22, 1), ( 28, 1)], + 3: [( 15, 1), ( 26, 1), ( 36, 2), ( 44, 2)], + 4: [( 20, 1), ( 36, 2), ( 52, 2), ( 64, 4)], + 5: [( 26, 1), ( 48, 2), ( 72, 4), ( 88, 4)], + 6: [( 36, 2), ( 64, 4), ( 96, 4), ( 112, 4)], + 7: [( 40, 2), ( 72, 4), ( 108, 6), ( 130, 5)], + 8: [( 48, 2), ( 88, 4), ( 132, 6), ( 156, 6)], + 9: [( 60, 2), ( 110, 5), ( 160, 8), ( 192, 8)], + 10: [( 72, 4), ( 130, 5), ( 192, 8), ( 224, 8)], + 11: [( 80, 4), ( 150, 5), ( 224, 8), ( 264, 11)], + 12: [( 96, 4), ( 176, 8), ( 260, 10), ( 308, 11)], + 13: [( 104, 4), ( 198, 9), ( 288, 12), ( 352, 16)], + 14: [( 120, 4), ( 216, 9), ( 320, 16), ( 384, 16)], + 15: [( 132, 6), ( 240, 10), ( 360, 12), ( 432, 18)], + 16: [( 144, 6), ( 280, 10), ( 408, 17), ( 480, 16)], + 17: [( 168, 6), ( 308, 11), ( 448, 16), ( 532, 19)], + 18: [( 180, 6), ( 338, 13), ( 504, 18), ( 588, 21)], + 19: [( 196, 7), ( 364, 14), ( 546, 21), ( 650, 25)], + 20: [( 224, 8), ( 416, 16), ( 600, 20), ( 700, 25)], + 21: [( 224, 8), ( 442, 17), ( 644, 23), ( 750, 25)], + 22: [( 252, 9), ( 476, 17), ( 690, 23), ( 816, 34)], + 23: [( 270, 9), ( 504, 18), ( 750, 25), ( 900, 30)], + 24: [( 300, 10), ( 560, 20), ( 810, 27), ( 960, 32)], + 25: [( 312, 12), ( 588, 21), ( 870, 29), (1050, 35)], + 26: [( 336, 12), ( 644, 23), ( 952, 34), (1110, 37)], + 27: [( 360, 12), ( 700, 25), (1020, 34), (1200, 40)], + 28: [( 390, 13), ( 728, 26), (1050, 35), (1260, 42)], + 29: [( 420, 14), ( 784, 28), (1140, 38), (1350, 45)], + 30: [( 450, 15), ( 812, 29), (1200, 40), (1440, 48)], + 31: [( 480, 16), ( 868, 31), (1290, 43), (1530, 51)], + 32: [( 510, 17), ( 924, 33), (1350, 45), (1620, 54)], + 33: [( 540, 18), ( 980, 35), (1440, 48), (1710, 57)], + 34: [( 570, 19), (1036, 37), (1530, 51), (1800, 60)], + 35: [( 570, 19), (1064, 38), (1590, 53), (1890, 63)], + 36: [( 600, 20), (1120, 40), (1680, 56), (1980, 66)], + 37: [( 630, 21), (1204, 43), (1770, 59), (2100, 70)], + 38: [( 660, 22), (1260, 45), (1860, 62), (2220, 74)], + 39: [( 720, 24), (1316, 47), (1950, 65), (2310, 77)], + 40: [( 750, 25), (1372, 49), (2040, 68), (2430, 81)] + } + + error_correction_table = {"L": 0, "M": 1, "Q": 2, "H": 3} + level_code = error_correction_table[error_correction_level] + + n_codewords = num_codewords_table[qr_version] + (n_check_words, n_blocks) = block_structure_table[qr_version][level_code] + + assert n_check_words % n_blocks == 0 + max_errors = n_check_words // n_blocks // 2 + if max_errors < 6: + max_errors -= 1 + + return (n_codewords, n_check_words, n_blocks, max_errors) + + +def rs_mul(a, b): + """Multiply two elements of GF(2**8) as used in the Reed Solomon code. + + Parameters: + a (int): Integer in range 0 .. 255. + b (int): Integer in range 0 .. 255. + + Returns: + Product in GF(2**8) as an integer in range 0 .. 255. + """ + + if a == 0 or b == 0: + return 0 + + # a * b == exp(log(a) + log(b)) + loga = reed_solomon_gf_log[a] + logb = reed_solomon_gf_log[b] + result = reed_solomon_gf_exp[(loga + logb) % 255] + + return result + + +def rs_div(a, b): + """Divide two elements of GF(2**8) as used in the Reed Solomon code. + + Parameters: + a (int): Dividend, range 0 .. 255. + b (int): Divisor, range 1 .. 255. + + Returns: + Quatient a / b in GF(2**8) as an integer in range 0 .. 255. + """ + + assert b != 0 + + if a == 0: + return 0 + + # a / b == exp(log(a) - log(b)) + loga = reed_solomon_gf_log[a] + logb = reed_solomon_gf_log[b] + result = reed_solomon_gf_exp[(255 + loga - logb) % 255] + + return result + + +def rs_eval_poly(poly, x): + """Evaluate a polynomial within GF(2**8) as used in the Reed Solomon code. + + Parameters: + poly (ndarray): Array of coefficients of the polynomial, + starting with the zero-order term. + x (int): Element to evaluate. + + Returns: + Value of the polynomial at "x" as an integer. + """ + n = len(poly) - 1 + ret = poly[n] + for i in range(n): + ret = rs_mul(ret, x) ^ poly[n-1-i] + return ret + + +def rs_berlekamp_massey(syndrome): + """Use the Berlekamp-Massey algorithm to calculate + the error locator polynomial for the specified set of syndromes. + + The error locator polynomial is a polynomial + C[L] * x**n + C[L-1] * x**(L-1) + ... + C[1] * x + C[0] + + with the following properties: + - C[0] = 1; + - the degree "n" is as low as possible; + - the convolution of C(x) with the syndrome polynomial S(x) equals zero. + + If we view the syndrome as an array S[0], S[1], ... S[N-1], + the last property of the error locator polynomial can be written + as follows: + C[0] * S[k] + C[1] * S[k-1] + ... + C[L] * S[k-L] = 0 + (for all k where L <= k < N). + + All calculations are in the GF(2**8) field of the Reed Solomon code. + + Parameters: + syndrome (list): List of syndromes. + + Returns: + List of coefficients of the error locator polynomial. + """ + + # See https://en.wikipedia.org/wiki/Berlekamp-Massey_algorithm + + n = len(syndrome) + poly = (n + 1) * [0] + poly[0] = 1 + prev_poly = list(poly) + l = 0 + prev_l = 0 + b = 1 + m = 1 + + for k in range(n): + assert prev_l + m == k + 1 - l + d = syndrome[k] + for i in range(1, l+1): + d ^= rs_mul(syndrome[k-i], poly[i]) + if d == 0: + m += 1 + else: + ratio = rs_div(d, b) + if 2 * l <= k: + for i in range(prev_l, -1, -1): + prev_poly[i+m] = poly[i+m] + poly[i+m] ^= rs_mul(prev_poly[i], ratio) + prev_poly[:m] = poly[:m] + prev_l = l + l = k + 1 - l + b = d + m = 1 + else: + assert prev_l + m <= l + for i in range(prev_l + 1): + poly[i+m] ^= rs_mul(prev_poly[i], ratio) + m += 1 + + return poly[:l+1] + + +def rs_forney(syndrome, error_locator, error_locations): + """Use Forney's algorithm to calculate the error values. + + Parameters: + syndrome (list): List of syndromes. + error_locator (list): Coefficients of the error locator polynomial. + error_locations (list): Error locations. + + Returns: + List with error value for each listed error location. + """ + + # See https://en.wikipedia.org/wiki/Forney_algorithm + + n_error = len(error_locations) + assert len(error_locator) == n_error + 1 + assert len(syndrome) >= len(error_locator) - 1 + + # Calculate the coefficients of the error evaluator polynomial. + # + # err_eval(x) = syndrome(x) * error_locator(x) (mod x**n_syndrome) + # + # Note that the definition of error_locator(x) guarantees that + # err_eval(x) will have zero-valued coefficients for all terms + # of degree >= n_error. Thus, the degree of err_eval(x) is at most + # (n_error - 1), and we only need to calculate the first n_error + # coefficients. + # + err_eval = (len(error_locator) - 1) * [0] + for k in range(len(error_locator) - 1): + for i in range(len(error_locator) - k - 1): + err_eval[k + i] ^= rs_mul(syndrome[k], error_locator[i]) + + # Calculate the coefficients of the formal derivative of + # the error locator polynomial. + errloc_deriv = (len(error_locator) - 1) * [0] + for i in range(1, len(error_locator)): + if i % 2 == 1: + errloc_deriv[i-1] = error_locator[i] + + # Calculate the error values: + # e[k] = X[k] * err_eval(1/X[k]) / errloc_deriv(1/X[k]) + # + # where X[k] = a**error_locations[k] + # + error_values = n_error * [0] + for k in range(n_error): + x = reed_solomon_gf_exp[error_locations[k]] + xinv = reed_solomon_gf_exp[255-error_locations[k]] + v_err_eval = rs_eval_poly(err_eval, xinv) + v_errloc_deriv = rs_eval_poly(errloc_deriv, xinv) + error_values[k] = rs_div(rs_mul(x, v_err_eval), v_errloc_deriv) + + return error_values + + +def rs_error_correction(data_words, check_words, max_errors, debug_level=0): + """Perform Reed-Solomon error correction on a message block. + + Parameters: + data_words (list): List of received data words. + check_words (list): List of received error correction words. + max_errors (int): Maximum number of errors to correct. + debug_level (int): Optional debug level. + + Returns: + List of corrected data words. + + Raises: + QRDecodeError: If error correction fails. + """ + + n_data_words = len(data_words) + n_check_words = len(check_words) + n_received_words = n_data_words + n_check_words + received_words = data_words + check_words + + # Sanity check on the number of correctable errors. + assert 2 * max_errors <= n_check_words + + if debug_level >= 2: + debug_msg("REED-SOLOMON: ({}, {}, r={})" + .format(n_received_words, n_data_words, max_errors)) + + # + # See also https://en.wikipedia.org/wiki/Reed-Solomon_error_correction + # + # The received words (data words followed by error check words) + # form a polynomial over GF(2**8): + # R(x) = r0 + r1 * x + r2 * x**2 + r3 * x**3 + ... + # + # Note that the first received word is the coefficient for the + # highest-powered term of the polynomial (r0 is the last received word). + # + # Calculate the error syndromes for k = 0 .. (n_check_words-1): + # syndrome[k] = R(a**k) + # + # Note that "a" (represented as integer value 2) is a primitive element + # of GF(2**8). + # + received_poly = received_words[::-1] + syndrome = n_check_words * [0] + for k in range(n_check_words): + x = reed_solomon_gf_exp[k] + syndrome[k] = rs_eval_poly(received_poly, x) + + # Quick check if all syndromes are zero. + if all([(x == 0) for x in syndrome]): + # No errors, just return the data words. + return data_words + + if debug_level >= 3: + debug_msg(" syndrome = " + str(syndrome)) + + # Determine the error locator polynomial. + error_locator = rs_berlekamp_massey(syndrome) + + # Check that the degree of the error locator polynomial does not + # exceed the maximum number of correctable errors. + n_error = len(error_locator) - 1 + if n_error > max_errors: + raise QRDecodeError("Uncorrectable errors in Reed-Solomon code") + + if debug_level >= 1: + debug_msg("REED-SOLOMON: {} errors".format(n_error)) + + # Find the roots of the error locator polynomial. + # If all roots are different AND each root equals "a**(-p[i])" where + # "p[i]" is a valid index into the received message, then + # the values "p[i]" represent the error locations. + error_locations = [] + for k in range(n_received_words): + x = reed_solomon_gf_exp[255-k] + v = rs_eval_poly(error_locator, x) + if v == 0: + error_locations.append(k) + + if debug_level >= 3: + debug_msg(" error_locations = " + str(error_locations)) + + # Check that all roots of the error locator polynomial are different + # and correspond to a valid position. + if len(error_locations) != n_error: + raise QRDecodeError("Uncorrectable errors in Reed-Solomon code") + + # Use Forney's algorithm to find the error values. + error_values = rs_forney(syndrome, error_locator, error_locations) + + # Correct errors. + # Note: location 0 is the last received word. + for i in range(n_error): + k = n_received_words - 1 - error_locations[i] + received_words[k] ^= error_values[i] + + ## Double check that the syndrome is all-zero after error correction. + ## This check is not necessary; the corrected syndrome is guaranteed + ## to be zero if Reed-Solomon decoding is correctly implemented. + ## This is a nice sanity check for debugging. + #received_poly = received_words[::-1] + #syndrome = n_check_words * [0] + #for k in range(n_check_words): + # x = reed_solomon_gf_exp[k] + # syndrome[k] = rs_eval_poly(received_poly, x) + #assert all([(x == 0) for x in syndrome]) + + # Return the corrected data words. + return received_words[:n_data_words] + + +def codeword_error_correction(codewords, + qr_version, + error_correction_level, + debug_level=0): + """Perform error correction and return only the data codewords. + + Parameters: + codewords (list): List of codewords in placement order. + qr_version (int): QR code version + error_correction_level (str): Error correction level (L, M, Q or H). + debug_level (int): Optional debug level. + + Returns: + List of error-corrected data codewords. + + Raises: + QRDecodeError: If error correction fails. + """ + + (n_codewords, n_check_words, n_blocks, max_errors + ) = get_block_structure(qr_version, error_correction_level) + + assert len(codewords) == n_codewords + + n_data_words = n_codewords - n_check_words + n_data_words_per_block = n_data_words // n_blocks + n_long_blocks = n_data_words % n_blocks + + corrected_data = [] + + for i in range(n_blocks): + + # Collect data words from codeword sequence. + k = i + n_blocks * n_data_words_per_block + data_words = codewords[i:k:n_blocks] + if i >= n_blocks - n_long_blocks: + extra_word = codewords[n_data_words-n_blocks+i] + data_words.append(extra_word) + + # Collect error correction words from codeword sequence. + check_words = codewords[n_data_words+i::n_blocks] + + # Perform Reed-Solomon error correction. + message = rs_error_correction(data_words, + check_words, + max_errors, + debug_level) + corrected_data += message + + return corrected_data + + +def get_bits_from_stream(bitstream, position, num_bits): + """Read bits from the bitstream. + + Parameters: + bitstream (list): List of 8-bit data codewords. + position (int): Index of first bit to read. + num_bits (int): Number of bits to read. + + Returns: + Integer representing the obtained bits (most-significant bit first). + + Raises: + QRDecodeError: If the requested range exceeds the bitstream length. + """ + + if position + num_bits > 8 * len(bitstream): + raise QRDecodeError("Unexpected end of bitstream") + + word_pos = position // 8 + bit_pos = position % 8 + + k = min(num_bits, 8 - bit_pos) + mask = (1 << k) - 1 + value = (bitstream[word_pos] >> (8 - bit_pos - k)) & mask + + bits_remaining = num_bits - k + + while bits_remaining >= 8: + word_pos += 1 + word = bitstream[word_pos] + value = (value << 8) | word + bits_remaining -= 8 + + if bits_remaining > 0: + word_pos += 1 + mask = (1 << bits_remaining) - 1 + lsb = 8 - bits_remaining + word = (bitstream[word_pos] >> (8 - bits_remaining)) & mask + value = (value << bits_remaining) | word + + return value + + +def decode_numeric_segment(bitstream, position, nchar): + """Decode a segment in numeric mode. + + Parameters: + bitstream (ndarray): Array of 8-bit data codewords. + position (int): Bit position within the bitstream. + nchar (int): Number of characters to decode. + + Returns: + Tuple (decoded_data, new_position). + + Raises: + QRDecodeError: If decoding fails or end of bitstream is reached. + """ + frag = bytearray(nchar) + ndone = 0 + while ndone < nchar: + k = min(nchar - ndone, 3) + nbits = 3 * k + 1 + value = get_bits_from_stream(bitstream, position, nbits) + position += nbits + if k > 2: + frag[ndone+2] = 0x30 + value % 10 + value = value // 10 + if k > 1: + frag[ndone+1] = 0x30 + value % 10 + value = value // 10 + if value > 9: + raise QRDecodeError("Invalid numeric data") + frag[ndone] = 0x30 + value + ndone += k + return (frag, position) + + +def decode_alphanumeric_segment(bitstream, position, nchar): + """Decode a segment in alphanumeric mode. + + Parameters: + bitstream (ndarray): Array of 8-bit data codewords. + position (int): Bit position within the bitstream. + nchar (int): Number of characters to decode. + + Returns: + Tuple (decoded_data, new_position). + + Raises: + QRDecodeError: If decoding fails or end of bitstream is reached. + """ + alphanum_table = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ $%*+-./:" + assert len(alphanum_table) == 45 + frag = bytearray(nchar) + ndone = 0 + while ndone < nchar: + k = min(nchar - ndone, 2) + nbits = 5 * k + 1 + value = get_bits_from_stream(bitstream, position, nbits) + position += nbits + if k > 1: + frag[ndone+1] = alphanum_table[value % 45] + value = value // 45 + if value > 44: + raise QRDecodeError("Invalid alphanumeric data") + frag[ndone] = alphanum_table[value] + ndone += k + return (frag, position) + + +def decode_8bit_segment(bitstream, position, nchar): + """Decode a segment in 8-bit mode. + + Parameters: + bitstream (ndarray): Array of 8-bit data codewords. + position (int): Bit position within the bitstream. + nchar (int): Number of characters to decode. + + Returns: + Tuple (decoded_data, new_position). + + Raises: + QRDecodeError: If decoding fails or end of bitstream is reached. + """ + frag = bytearray(nchar) + for i in range(nchar): + frag[i] = get_bits_from_stream(bitstream, position, 8) + position += 8 + return (frag, position) + + +def decode_bitstream(bitstream, qr_version): + """Decode the specified QR bitstream. + + Parameters: + bitstream (list): Array of 8-bit data codewords. + qr_version (int): QR code version. + + Returns: + Decoded data as a bytestring. + + Raises: + QRDecodeError: If decoding fails. + """ + + # Determine number of bits in character count field. + if qr_version <= 9: + character_count_bits = [0, 10, 9, 0, 8] + elif qr_version <= 26: + character_count_bits = [0, 12, 11, 0, 16] + else: + character_count_bits = [0, 14, 13, 0, 16] + + decoded_data = bytearray() + position = 0 + + # Decode segments until end of bitstream (or terminator). + while position + 4 <= 8 * len(bitstream): + + # Read mode indicator. + mode = get_bits_from_stream(bitstream, position, 4) + position += 4 + + # Stop at terminator marker. + if mode == 0: + break + + # Reject unsupported modes. + if mode not in (1, 2, 4): + if mode == 7: + raise QRDecodeError("ECI mode not supported") + if mode == 3: + raise QRDecodeError("Structured Append mode not supported") + if mode in (5, 9): + raise QRDecodeError("FNC1 mode not supported") + if mode == 8: + raise QRDecodeError("Kanji mode not supported") + raise QRDecodeError("Unsupported mode indicator 0x{:x}" + .format(mode)) + + # Read character count. + nbits = character_count_bits[mode] + nchar = get_bits_from_stream(bitstream, position, nbits) + if nchar < 0: + raise QRDecodeError("Unexpected end of bitstream") + position += nbits + + # Decode characters. + if mode == 1: + (frag, position + ) = decode_numeric_segment(bitstream, position, nchar) + elif mode == 2: + (frag, position + ) = decode_alphanumeric_segment(bitstream, position, nchar) + elif mode == 4: + (frag, position + ) = decode_8bit_segment(bitstream, position, nchar) + + decoded_data += frag + + return bytes(decoded_data) + + +def matrix_to_string(matrix): + """Format the QR matrix as a string.""" + + lines = [] + qrsize = matrix.shape[0] + for y in range(qrsize): + s = [] + for x in range(qrsize): + if matrix[y, x] == 0: + s.append(".") + elif matrix[y, x] == 1: + s.append("X") + else: + s.append("?") + lines.append(" " + " ".join(s)) + return "\n".join(lines) + + +def bitstream_to_string(bitstream): + """Format the bitstream as a string.""" + bits = [] + for word in bitstream: + for k in range(8): + bits.append((word >> (7 - k)) & 1) + return "".join(map(str, bits)) + + +def decode_qrcode(image, debug_level=0): + """Decode the QR code in the specified image. + + Parameters: + image (PIL.Image): Input image. + debug_level (int): Optional debug level (0..3). + + Returns: + Decoded data as a byte string. + + Raises: + QRDecodeError: If decoding fails. + """ + + # Convert to black-and-white. + img_data = quantize_image(image) + + # Locate position detection patterns. + patterns = find_position_detection_patterns(img_data) + + if debug_level >= 2: + debug_msg("POSITION DETECTION PATTERNS:") + for pattern in patterns: + debug_msg(" " + str(pattern)) + + if len(patterns) < 3: + npattern = len(patterns) + if npattern == 0: + raise QRDecodeError("No position detection patterns found") + raise QRDecodeError("Only {} position detection patterns found" + .format(npattern)) + + # Make groups of three compatible finders. + finder_triplets = make_finder_triplets(patterns) + + if len(finder_triplets) == 0: + raise QRDecodeError("No valid finder pattern found") + + # Try to decode according to each triplet. + first_exception = None + for triplet in finder_triplets: + + if debug_level >= 1: + debug_msg("FINDER TRIPLET:") + for fnd in triplet: + debug_msg(" " + str(fnd)) + + try: + # Extract QR code location, orientation and version. + transform, qr_version = locate_qr_code(img_data, triplet) + + if debug_level >= 2: + debug_msg("AFFINE TRANSFORM:") + debug_msg(str(transform)) + + # Sample the QR matrix. + matrix = sample_qr_matrix(img_data, transform, qr_version) + + if debug_level >= 3: + debug_msg(matrix_to_string(matrix)) + + # Extract format information. + (error_correction_level, mask_pattern + ) = extract_format_data(matrix) + + if debug_level >= 1: + debug_msg("QR VERSION: {} {} mask={}" + .format(qr_version, + error_correction_level, + mask_pattern)) + + # Extract codewords from the QR matrix. + codewords = extract_codewords(matrix, mask_pattern) + + # Unpack codeword sequence and perform error correction. + bitstream = codeword_error_correction(list(codewords), + qr_version, + error_correction_level, + debug_level) + + if debug_level >= 3: + debug_msg("BITSTREAM: " + bitstream_to_string(bitstream)) + + except QRDecodeError as exc: + # If decoding fails on the first finder triplet, + # save the exception and try decoding the remaining triplets. + # If all triplets fail, report the error from the first triplet. + if first_exception is None: + first_exception = exc + if debug_level >= 1: + debug_msg("FAILED: " + str(exc)) + continue + + # Successfully extracted a bitstream from the QR code. + # This implies we correctly located the QR code in the image, + # so from this point on it does not make sense to retry with + # different finder triplets if an error occurs. + + # Decode the bitstream. + return decode_bitstream(bitstream, qr_version) + + raise first_exception + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a00c394 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +urllib3==1.24.1 +cryptography==2.6.1 +Pillow==5.4.1 +numpy==1.16.2 +PyGObject==3.30.4 ; sys_platform=="linux"