2022-02-24 18:12:58 +00:00
|
|
|
import gnupg
|
2022-02-06 23:29:56 +00:00
|
|
|
|
|
|
|
class DeadHand:
|
|
|
|
|
|
|
|
def __init__(self, message, fpr_sender, fpr_recipient, fpr_validators=None, homedir=None):
|
|
|
|
|
|
|
|
self.message = message
|
|
|
|
self.fpr_sender = fpr_sender
|
|
|
|
self.fpr_validators = fpr_validators
|
|
|
|
self.fpr_recipient = fpr_recipient
|
|
|
|
self.homedir = homedir
|
|
|
|
|
2022-02-24 18:12:58 +00:00
|
|
|
def crypto(self):
|
2022-02-06 23:29:56 +00:00
|
|
|
|
2022-02-24 18:12:58 +00:00
|
|
|
config = {}
|
2022-02-06 23:29:56 +00:00
|
|
|
if not self.homedir is None:
|
2022-02-24 18:12:58 +00:00
|
|
|
config['gnupghome'] = self.homedir
|
|
|
|
|
|
|
|
c = gnupg.GPG(**config)
|
|
|
|
c.encoding = 'utf-8'
|
2022-02-06 23:29:56 +00:00
|
|
|
|
2022-02-24 18:12:58 +00:00
|
|
|
return c
|
2022-02-06 23:29:56 +00:00
|
|
|
|
2022-02-11 23:07:29 +00:00
|
|
|
def create_heartbeat(self):
|
2022-02-06 23:29:56 +00:00
|
|
|
|
|
|
|
"""
|
|
|
|
Create an encrypted and signed heartbeat. This is what will be checked
|
2022-02-11 23:07:29 +00:00
|
|
|
on the receiving side with `check_heartbeat`.
|
2022-02-06 23:29:56 +00:00
|
|
|
"""
|
|
|
|
|
2022-02-24 18:12:58 +00:00
|
|
|
import pudb; pudb.set_trace()
|
|
|
|
crypto = self.crypto()
|
|
|
|
|
|
|
|
#ctx.signers_add(ctx.get_key(self.fpr_sender))
|
|
|
|
heartbeat = crypto.encrypt(self.message, self.fpr_recipient, sign=self.fpr_sender, passphrase=None)
|
2022-02-06 23:29:56 +00:00
|
|
|
|
2022-02-11 23:07:29 +00:00
|
|
|
#TODO: validate signature success
|
|
|
|
return heartbeat.decode('utf-8')
|
2022-02-06 23:29:56 +00:00
|
|
|
|
2022-02-11 23:07:29 +00:00
|
|
|
def check_heartbeat(self, ciphertext):
|
2022-02-06 23:29:56 +00:00
|
|
|
|
2022-02-24 18:12:58 +00:00
|
|
|
crypto = self.crypto()
|
|
|
|
|
|
|
|
plaintext, result, verify_result = crypto.decrypt(ciphertext)
|
2022-02-06 23:29:56 +00:00
|
|
|
|
2022-02-24 18:12:58 +00:00
|
|
|
return plaintext == self.message #and\
|
|
|
|
#verify_result.signatures[0].fpr == self.fpr_sender
|
2022-02-11 23:07:29 +00:00
|
|
|
|
|
|
|
def create_validation(self, fpr, heartbeat):
|
|
|
|
|
|
|
|
if not self.check_heartbeat(heartbeat):
|
2022-02-24 18:12:58 +00:00
|
|
|
raise ValueError("Invalid heartbeat!") # TODO: specific error type
|
2022-02-11 23:07:29 +00:00
|
|
|
|
|
|
|
with self.context() as ctx:
|
|
|
|
ctx.signers_add(ctx.get_key(fpr))
|
|
|
|
validation, result, sign_result = ctx.encrypt(heartbeat.encode('utf-8'), [ctx.get_key(self.fpr_recipient)])
|
|
|
|
|
|
|
|
#TODO: validate signature success
|
|
|
|
return validation.decode('utf-8')
|
|
|
|
|
|
|
|
def check_validation(self, fpr, validation):
|
|
|
|
|
|
|
|
if not fpr in self.fpr_validators:
|
|
|
|
raise KeyError(f"DeadHand not configured to take validation from PGP finterprint {fpr}.")
|
|
|
|
|
|
|
|
with self.context() as ctx:
|
|
|
|
heartbeat, result, verify_result = ctx.decrypt(validation.encode('utf-8'))
|
|
|
|
|
|
|
|
return self.check_heartbeat(heartbeat.decode('utf-8')) and\
|
|
|
|
verify_result.signatures[0].fpr == fpr
|