Compare commits
1 Commits
main
...
608dc99398
Author | SHA1 | Date | |
---|---|---|---|
608dc99398
|
@ -1,248 +0,0 @@
|
|||||||
import base64
|
|
||||||
import cbor2
|
|
||||||
from cryptography import x509
|
|
||||||
from cryptography.hazmat.backends import default_backend
|
|
||||||
from cryptography.hazmat.primitives.asymmetric import ec
|
|
||||||
from cryptography.hazmat.primitives.asymmetric.utils import \
|
|
||||||
encode_dss_signature
|
|
||||||
from dataclasses import dataclass
|
|
||||||
import datetime
|
|
||||||
from enum import Enum
|
|
||||||
import os
|
|
||||||
from textwrap import wrap
|
|
||||||
from typing import Optional
|
|
||||||
import zlib
|
|
||||||
|
|
||||||
from . import base45
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Result:
|
|
||||||
disease: str # tg, always 840539006
|
|
||||||
country: str # co
|
|
||||||
certificate_issuer: str # is
|
|
||||||
certificate_identifier: str # ci
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Vaccination(Result):
|
|
||||||
dose_number: int # dn
|
|
||||||
total_series_of_doses: int # sd
|
|
||||||
date_of_vaccination: str # dt
|
|
||||||
vaccine: Optional[str] = None # vp
|
|
||||||
medical_product: Optional[str] = None # mp
|
|
||||||
manufacturer: Optional[str] = None # ma
|
|
||||||
|
|
||||||
def check(self) -> bool:
|
|
||||||
# Toutes les doses ont bien été injectées
|
|
||||||
valid = self.dose_number >= self.total_series_of_doses
|
|
||||||
# Vérification de la date
|
|
||||||
# TODO: Pour Astrazeneca, c'est 28 jours
|
|
||||||
date = datetime.date.fromisoformat(self.date_of_vaccination)
|
|
||||||
today = datetime.date.today()
|
|
||||||
delta = today - date
|
|
||||||
valid = valid and delta.days >= 7
|
|
||||||
|
|
||||||
return valid
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def deserialize(payload) -> "Vaccination":
|
|
||||||
translation = {
|
|
||||||
'tg': 'disease',
|
|
||||||
'vp': 'vaccine',
|
|
||||||
'mp': 'medical_product',
|
|
||||||
'ma': 'manufacturer',
|
|
||||||
'dn': 'dose_number',
|
|
||||||
'sd': 'total_series_of_doses',
|
|
||||||
'dt': 'date_of_vaccination',
|
|
||||||
'co': 'country',
|
|
||||||
'is': 'certificate_issuer',
|
|
||||||
'ci': 'certificate_identifier',
|
|
||||||
}
|
|
||||||
return Vaccination(**{translation[k]: v for k, v in payload.items()})
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Test(Result):
|
|
||||||
class TestResult(Enum):
|
|
||||||
POSITIVE = '260373001'
|
|
||||||
NEGATIVE = '260415000'
|
|
||||||
|
|
||||||
date: str # sc
|
|
||||||
result: TestResult # tr
|
|
||||||
type_of_test: Optional[str] = None # tt
|
|
||||||
test_name: Optional[str] = None # nm
|
|
||||||
manufacturer: Optional[str] = None # ma
|
|
||||||
result_date: Optional[str] = None # dr
|
|
||||||
center: Optional[str] = None # tc
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def deserialize(payload) -> "Test":
|
|
||||||
translation = {
|
|
||||||
'tg': 'disease',
|
|
||||||
'tt': 'type_of_test',
|
|
||||||
'nm': 'test_name',
|
|
||||||
'ma': 'manufacturer',
|
|
||||||
'sc': 'date',
|
|
||||||
'dt': 'result_date',
|
|
||||||
'tr': 'result',
|
|
||||||
'tc': 'center',
|
|
||||||
'co': 'country',
|
|
||||||
'is': 'certificate_issuer',
|
|
||||||
'ci': 'certificate_identifier',
|
|
||||||
}
|
|
||||||
return Test(**{translation[k]: v for k, v in payload.items()})
|
|
||||||
|
|
||||||
def check(self) -> bool:
|
|
||||||
valid = self.result == Test.TestResult.NEGATIVE
|
|
||||||
test_date = datetime.datetime.fromisoformat(self.date)
|
|
||||||
tzinfo = test_date.tzinfo
|
|
||||||
delta = datetime.datetime.now(tzinfo) - test_date
|
|
||||||
valid = valid and delta.days < 3 # 72h de validité
|
|
||||||
|
|
||||||
return valid
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class RecoveryStatement(Result):
|
|
||||||
positive_test_date: str # fr
|
|
||||||
valid_from: str # df
|
|
||||||
valid_until: str # du
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def deserialize(payload) -> "RecoveryStatement":
|
|
||||||
translation = {
|
|
||||||
'tg': 'disease',
|
|
||||||
'fr': 'postive_test_date',
|
|
||||||
'df': 'valid_from',
|
|
||||||
'du': 'valid_until',
|
|
||||||
'co': 'country',
|
|
||||||
'is': 'certificate_issuer',
|
|
||||||
'ci': 'certificate_identifier',
|
|
||||||
}
|
|
||||||
return RecoveryStatement(**{translation[k]: v
|
|
||||||
for k, v in payload.items()})
|
|
||||||
|
|
||||||
def check(self) -> bool:
|
|
||||||
valid_from = datetime.datetime.fromisoformat(self.valid_from)
|
|
||||||
valid_unti = datetime.datetime.fromisoformat(self.valid_until)
|
|
||||||
tzinfo = valid_from.tzinfo
|
|
||||||
now = datetime.datetime.now(tzinfo)
|
|
||||||
return valid_from <= now <= valid_until
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class GreenCertificate:
|
|
||||||
header: dict
|
|
||||||
payload: dict
|
|
||||||
signature: str
|
|
||||||
|
|
||||||
family_name: str # nam.fn
|
|
||||||
given_name: str # nam.gn
|
|
||||||
date_of_birth: str # nam.dob
|
|
||||||
version: str # ver
|
|
||||||
vaccination: Optional[Vaccination] = None # v
|
|
||||||
test: Optional[Test] = None # t
|
|
||||||
recovery_statement: Optional[RecoveryStatement] = None # r
|
|
||||||
|
|
||||||
@property
|
|
||||||
def result(self) -> Result:
|
|
||||||
if self.vaccination is not None:
|
|
||||||
return self.vaccination
|
|
||||||
elif self.test is not None:
|
|
||||||
return self.test
|
|
||||||
elif self.recovery_statement is not None:
|
|
||||||
return self.recovery_statement
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def load(qrcode) -> "GreenCertificate":
|
|
||||||
if not qrcode.startswith('HC1:'):
|
|
||||||
raise ValueError("QR code invalide.")
|
|
||||||
|
|
||||||
plain_input = qrcode.replace('HC1:', '')
|
|
||||||
|
|
||||||
# QR Code en base 45
|
|
||||||
compressed_cose = base45.b45decode(plain_input)
|
|
||||||
|
|
||||||
# Décompression ZIP si nécessaire
|
|
||||||
cose = compressed_cose
|
|
||||||
if compressed_cose[0] == 0x78:
|
|
||||||
fb = compressed_cose[1]
|
|
||||||
if fb == 0x01 or fb == 0x5E or fb == 0x9C or fb == 0xDA:
|
|
||||||
cose = zlib.decompress(compressed_cose)
|
|
||||||
|
|
||||||
# Chargement de l'objet CBOR
|
|
||||||
obj = cbor2.loads(cose)
|
|
||||||
header = cbor2.loads(obj.value[0])
|
|
||||||
payload = cbor2.loads(obj.value[2])
|
|
||||||
signature = obj.value[3]
|
|
||||||
|
|
||||||
return GreenCertificate.deserialize(header, payload, signature)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def deserialize(header, payload, signature) -> "GreenCertificate":
|
|
||||||
# Information utile
|
|
||||||
p = payload[-260][1]
|
|
||||||
|
|
||||||
certificate = GreenCertificate(
|
|
||||||
header=header,
|
|
||||||
payload=payload,
|
|
||||||
signature=signature,
|
|
||||||
family_name=p['nam']['fn'],
|
|
||||||
given_name=p['nam']['gn'],
|
|
||||||
date_of_birth=p['dob'],
|
|
||||||
version=p['ver'],
|
|
||||||
)
|
|
||||||
|
|
||||||
if 'v' in p:
|
|
||||||
certificate.vaccination = Vaccination.deserialize(p['v'][0])
|
|
||||||
elif 't' in p:
|
|
||||||
certificate.test = Test.deserialize(p['t'][0])
|
|
||||||
elif 'r' in p:
|
|
||||||
certificate.recovery_statement = \
|
|
||||||
RecoveryStatement.deserialize(p['r'][0])
|
|
||||||
|
|
||||||
return certificate
|
|
||||||
|
|
||||||
def check_signature(self) -> bool:
|
|
||||||
kid = base64.b64encode(self.header[4]).decode()
|
|
||||||
cert_name = kid.replace('/', '_')
|
|
||||||
|
|
||||||
base_dir = os.path.dirname(__file__)
|
|
||||||
cert_path = os.path.join(base_dir, 'certs', f"{cert_name}.pem")
|
|
||||||
if not os.path.isfile(cert_path):
|
|
||||||
print(f"Le certificat {kid} n'a pas été trouvé.")
|
|
||||||
print("Utilisez l'option --dontcheck pour sauter "
|
|
||||||
"la vérification.")
|
|
||||||
print("Si vous disposez du certificat, installez-le dans",
|
|
||||||
cert_path)
|
|
||||||
exit(1)
|
|
||||||
with open(os.path.join(base_dir, 'certs', f"{cert_name}.pem")) as f:
|
|
||||||
cert_asc = f.read()
|
|
||||||
|
|
||||||
cert = x509.load_pem_x509_certificate(cert_asc.encode(),
|
|
||||||
default_backend())
|
|
||||||
|
|
||||||
public_key = cert.public_key()
|
|
||||||
|
|
||||||
# Calcul de la bonne signature et des données signées
|
|
||||||
data = cbor2.dumps(
|
|
||||||
["Signature1", cbor2.dumps(self.header),
|
|
||||||
bytes(), cbor2.dumps(self.payload)]
|
|
||||||
)
|
|
||||||
|
|
||||||
l = len(self.signature)
|
|
||||||
r = int.from_bytes(self.signature[:l // 2], 'big')
|
|
||||||
s = int.from_bytes(self.signature[l // 2:], 'big')
|
|
||||||
signature = encode_dss_signature(r, s)
|
|
||||||
try:
|
|
||||||
# Vérification de la signature
|
|
||||||
public_key.verify(signature, data,
|
|
||||||
ec.ECDSA(cert.signature_hash_algorithm))
|
|
||||||
return True
|
|
||||||
except:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def check(self) -> bool:
|
|
||||||
return self.result.check()
|
|
@ -1,11 +1,20 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import base64
|
||||||
|
import cbor2
|
||||||
|
from cryptography import x509
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
from cryptography.hazmat.primitives.asymmetric.utils import \
|
||||||
|
encode_dss_signature
|
||||||
|
import datetime
|
||||||
import json
|
import json
|
||||||
import logging
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
from textwrap import wrap
|
||||||
|
import zlib
|
||||||
|
|
||||||
from .models import GreenCertificate
|
from . import base45
|
||||||
|
|
||||||
|
|
||||||
def read_qrcode(file) -> str:
|
def read_qrcode(file) -> str:
|
||||||
@ -14,7 +23,7 @@ def read_qrcode(file) -> str:
|
|||||||
Peut être combiné avec zbar pour directement lire le contenu du QRCode.
|
Peut être combiné avec zbar pour directement lire le contenu du QRCode.
|
||||||
"""
|
"""
|
||||||
with file:
|
with file:
|
||||||
content = file.readline()
|
content = file.read()
|
||||||
content = content.replace('\n', '')
|
content = content.replace('\n', '')
|
||||||
return content
|
return content
|
||||||
|
|
||||||
@ -31,34 +40,131 @@ def analyse_qrcode(qrcode: str, additional_info: bool = False,
|
|||||||
Renvoie la validité du pass sous forme de booléen.
|
Renvoie la validité du pass sous forme de booléen.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
certificate = GreenCertificate.load(qrcode)
|
if not qrcode.startswith('HC1:'):
|
||||||
|
raise ValueError("QR code invalide.")
|
||||||
|
|
||||||
|
plain_input = qrcode.replace('HC1:', '')
|
||||||
|
|
||||||
|
# QR Code en base 45
|
||||||
|
compressed_cose = base45.b45decode(plain_input)
|
||||||
|
|
||||||
|
# Décompression ZIP si nécessaire
|
||||||
|
cose = compressed_cose
|
||||||
|
if compressed_cose[0] == 0x78:
|
||||||
|
fb = compressed_cose[1]
|
||||||
|
if fb == 0x01 or fb == 0x5E or fb == 0x9C or fb == 0xDA:
|
||||||
|
cose = zlib.decompress(compressed_cose)
|
||||||
|
|
||||||
|
# Chargement de l'objet CBOR
|
||||||
|
obj = cbor2.loads(cose)
|
||||||
|
header = cbor2.loads(obj.value[0])
|
||||||
|
payload = cbor2.loads(obj.value[2])
|
||||||
|
signature = obj.value[3]
|
||||||
|
|
||||||
if check_signature:
|
if check_signature:
|
||||||
# Récupération du certificat utilisé
|
# Récupération du certificat utilisé
|
||||||
valid = certificate.check_signature()
|
kid = base64.b64encode(header[4]).decode()
|
||||||
|
cert_name = kid.replace('/', '_')
|
||||||
|
|
||||||
|
base_dir = os.path.dirname(__file__)
|
||||||
|
cert_path = os.path.join(base_dir, 'certs', f"{cert_name}.pem")
|
||||||
|
if not os.path.isfile(cert_path):
|
||||||
|
print(f"Le certificat {kid} n'a pas été trouvé.")
|
||||||
|
print("Utilisez l'option --dontcheck pour sauter "
|
||||||
|
"la vérification.")
|
||||||
|
print("Si vous disposez du certificat, installez-le dans",
|
||||||
|
cert_path)
|
||||||
|
exit(1)
|
||||||
|
with open(os.path.join(base_dir, 'certs', f"{cert_name}.pem")) as f:
|
||||||
|
cert_asc = f.read()
|
||||||
|
|
||||||
|
cert = x509.load_pem_x509_certificate(cert_asc.encode())
|
||||||
|
|
||||||
|
public_key = cert.public_key()
|
||||||
|
|
||||||
|
# Calcul de la bonne signature et des données signées
|
||||||
|
data = cbor2.dumps(
|
||||||
|
["Signature1", cbor2.dumps(header),
|
||||||
|
bytes(), cbor2.dumps(payload)]
|
||||||
|
)
|
||||||
|
|
||||||
|
l = len(signature)
|
||||||
|
r = int.from_bytes(signature[:l // 2], 'big')
|
||||||
|
s = int.from_bytes(signature[l // 2:], 'big')
|
||||||
|
signature = encode_dss_signature(r, s)
|
||||||
|
try:
|
||||||
|
# Vérification de la signature
|
||||||
|
public_key.verify(signature, data,
|
||||||
|
ec.ECDSA(cert.signature_hash_algorithm))
|
||||||
|
valid = True
|
||||||
|
except:
|
||||||
|
valid = False
|
||||||
else:
|
else:
|
||||||
valid = True
|
valid = True
|
||||||
logging.warning("Attention : la signature du"
|
print("Attention : la signature du QR code n'a pas été vérifiée.")
|
||||||
"QR code n'a pas été vérifiée.")
|
|
||||||
|
# Information utile
|
||||||
|
p = payload[-260][1]
|
||||||
|
|
||||||
|
if 'v' in p:
|
||||||
|
# Les vaccins sont valides 7 jours après la dernière dose
|
||||||
|
vaccin = p['v'][0]
|
||||||
|
# Toutes les doses sont requises
|
||||||
|
valid = valid and vaccin['dn'] == vaccin['sd']
|
||||||
|
# Vérification de la date
|
||||||
|
date = datetime.date.fromisoformat(vaccin['dt'])
|
||||||
|
today = datetime.date.today()
|
||||||
|
delta = today - date
|
||||||
|
valid = valid and delta.days >= 7
|
||||||
|
elif 't' in p:
|
||||||
|
# Les tests négatifs sont valables moins de 72h
|
||||||
|
test = p['t'][0]
|
||||||
|
assert test['tr'] in ['260373001', '260415000']
|
||||||
|
|
||||||
|
if test['tr'] == 260373001:
|
||||||
|
# Test positif
|
||||||
|
valid = False
|
||||||
|
|
||||||
|
test_date = datetime.datetime.fromisoformat(test['sc'])
|
||||||
|
tzinfo = test_date.tzinfo
|
||||||
|
delta = datetime.datetime.now(tzinfo) - test_date
|
||||||
|
valid = valid and delta.days < 3 # 3 jours de validité
|
||||||
|
elif 'r' in p:
|
||||||
|
# les tests positifs entre 11 et 182 jours après
|
||||||
|
test = p['r'][0]
|
||||||
|
valid_from = datetime.datetime.fromisoformat(test['df'])
|
||||||
|
valid_until = datetime.datetime.fromisoformat(test['du'])
|
||||||
|
tzinfo = valid_from.tzinfo
|
||||||
|
valid = valid and \
|
||||||
|
valid_from <= datetime.datetime.now(tzinfo) <= valid_until
|
||||||
|
else:
|
||||||
|
print("Type de passe inconnu.")
|
||||||
|
|
||||||
valid = valid and certificate.check()
|
|
||||||
|
|
||||||
if valid:
|
if valid:
|
||||||
logging.info("Pass sanitaire valide")
|
print("Pass sanitaire valide")
|
||||||
|
|
||||||
|
|
||||||
logging.debug("Nom : " + certificate.family_name)
|
print("Nom :", p['nam']['fn'])
|
||||||
logging.debug("Prénom : " + certificate.given_name)
|
print("Prénom :", p['nam']['gn'])
|
||||||
logging.debug("Date de naissance : " + certificate.date_of_birth)
|
print("Date de naissance :", p['dob'])
|
||||||
|
|
||||||
if additional_info:
|
if additional_info:
|
||||||
# TODO Meilleur affichage
|
# TODO Meilleur affichage
|
||||||
logging.debug("Informations supplémentaires : "
|
if 'v' in p:
|
||||||
+ json.dumps(certificate.result.__dict__, indent=2))
|
# Vaccination
|
||||||
|
print("Informations de vaccination :",
|
||||||
|
json.dumps(p['v'][0], indent=2))
|
||||||
|
elif 't' in p:
|
||||||
|
print("Informations de test :",
|
||||||
|
json.dumps(p['t'][0], indent=2))
|
||||||
|
elif 'r' in p:
|
||||||
|
print("Informations de test :",
|
||||||
|
json.dumps(p['r'][0], indent=2))
|
||||||
else:
|
else:
|
||||||
logging.info("Pass sanitaire invalide")
|
print("Pass sanitaire invalide")
|
||||||
|
|
||||||
return valid, certificate
|
return valid
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@ -71,16 +177,8 @@ def main():
|
|||||||
help="Affiche toutes les informations.")
|
help="Affiche toutes les informations.")
|
||||||
parser.add_argument('--dontcheck', action='store_true',
|
parser.add_argument('--dontcheck', action='store_true',
|
||||||
help="Ne pas vérifier la signature.")
|
help="Ne pas vérifier la signature.")
|
||||||
parser.add_argument('--logging', '-l', default=logging.DEBUG,
|
|
||||||
choices=['DEBUG', 'INFO', 'WARNING',
|
|
||||||
'ERROR', 'CRITICAL'],
|
|
||||||
help="Niveau de verbosité.")
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
logging.basicConfig(level=args.logging)
|
|
||||||
|
|
||||||
qrcode = read_qrcode(args.file)
|
qrcode = read_qrcode(args.file)
|
||||||
valid = analyse_qrcode(qrcode, args.full, not args.dontcheck)
|
exit(analyse_qrcode(qrcode, args.full, not args.dontcheck))
|
||||||
|
|
||||||
exit(0 if valid else 2)
|
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
[metadata]
|
[metadata]
|
||||||
name = pscheck
|
name = pscheck
|
||||||
version = 1.1.1
|
|
||||||
long_description = file: README.rst
|
long_description = file: README.rst
|
||||||
long_description_content_type = text/markdown
|
long_description_content_type = text/markdown
|
||||||
description = Vérificateur de pass sanitaire
|
description = Vérificateur de pass sanitaire
|
||||||
@ -24,9 +23,6 @@ install_requires =
|
|||||||
cryptography>=3.3.2
|
cryptography>=3.3.2
|
||||||
python_requires = >=3.6
|
python_requires = >=3.6
|
||||||
|
|
||||||
[options.package_data]
|
|
||||||
pscheck = certs/*.pem
|
|
||||||
|
|
||||||
[options.entry_points]
|
[options.entry_points]
|
||||||
console_scripts =
|
console_scripts =
|
||||||
pscheck = pscheck.pscheck:main
|
pscheck = pscheck.pscheck:main
|
||||||
|
Reference in New Issue
Block a user