import gnupg class DeadHand: def __init__(self, message, fpr_sender, fpr_recipient, fpr_validators=None, homedir=None): self.message = message self.fpr_sender = fpr_sender self.fpr_validators = fpr_validators self.fpr_recipient = fpr_recipient self.homedir = homedir def crypto(self): config = {} if not self.homedir is None: config['gnupghome'] = self.homedir c = gnupg.GPG(**config) c.encoding = 'utf-8' return c def create_heartbeat(self): """ Create an encrypted and signed heartbeat. This is what will be checked on the receiving side with `check_heartbeat`. """ import pudb; pudb.set_trace() crypto = self.crypto() #ctx.signers_add(ctx.get_key(self.fpr_sender)) heartbeat = crypto.encrypt(self.message, self.fpr_recipient, sign=self.fpr_sender, passphrase=None) #TODO: validate signature success return heartbeat.decode('utf-8') def check_heartbeat(self, ciphertext): crypto = self.crypto() plaintext, result, verify_result = crypto.decrypt(ciphertext) return plaintext == self.message #and\ #verify_result.signatures[0].fpr == self.fpr_sender def create_validation(self, fpr, heartbeat): if not self.check_heartbeat(heartbeat): raise ValueError("Invalid heartbeat!") # TODO: specific error type with self.context() as ctx: ctx.signers_add(ctx.get_key(fpr)) validation, result, sign_result = ctx.encrypt(heartbeat.encode('utf-8'), [ctx.get_key(self.fpr_recipient)]) #TODO: validate signature success return validation.decode('utf-8') def check_validation(self, fpr, validation): if not fpr in self.fpr_validators: raise KeyError(f"DeadHand not configured to take validation from PGP finterprint {fpr}.") with self.context() as ctx: heartbeat, result, verify_result = ctx.decrypt(validation.encode('utf-8')) return self.check_heartbeat(heartbeat.decode('utf-8')) and\ verify_result.signatures[0].fpr == fpr