mirror of https://github.com/watcha-fr/synapse
pull/4/merge
commit
bab2846513
@ -0,0 +1,43 @@ |
||||
from synapse.crypto.event_signing import * |
||||
from syutil.base64util import encode_base64 |
||||
|
||||
import argparse |
||||
import hashlib |
||||
import sys |
||||
import json |
||||
|
||||
class dictobj(dict): |
||||
def __init__(self, *args, **kargs): |
||||
dict.__init__(self, *args, **kargs) |
||||
self.__dict__ = self |
||||
|
||||
def get_dict(self): |
||||
return dict(self) |
||||
|
||||
|
||||
def main(): |
||||
parser = parser = argparse.ArgumentParser() |
||||
parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'), |
||||
default=sys.stdin) |
||||
args = parser.parse_args() |
||||
logging.basicConfig() |
||||
|
||||
event_json = dictobj(json.load(args.input_json)) |
||||
|
||||
algorithms = { |
||||
"sha256": hashlib.sha256, |
||||
} |
||||
|
||||
for alg_name in event_json.hashes: |
||||
if check_event_pdu_content_hash(event_json, algorithms[alg_name]): |
||||
print "PASS content hash %s" % (alg_name,) |
||||
else: |
||||
print "FAIL content hash %s" % (alg_name,) |
||||
|
||||
for algorithm in algorithms.values(): |
||||
name, h_bytes = compute_pdu_event_reference_hash(event_json, algorithm) |
||||
print "Reference hash %s: %s" % (name, encode_base64(bytes)) |
||||
|
||||
if __name__=="__main__": |
||||
main() |
||||
|
@ -0,0 +1,74 @@ |
||||
|
||||
from synapse.crypto.event_signing import verify_signed_event_pdu |
||||
from syutil.crypto.jsonsign import verify_signed_json |
||||
from syutil.crypto.signing_key import ( |
||||
decode_verify_key_bytes, write_signing_keys |
||||
) |
||||
from syutil.base64util import decode_base64 |
||||
|
||||
import urllib2 |
||||
import json |
||||
import sys |
||||
import dns.resolver |
||||
import pprint |
||||
import argparse |
||||
import logging |
||||
|
||||
def get_targets(server_name): |
||||
if ":" in server_name: |
||||
target, port = server_name.split(":") |
||||
yield (target, int(port)) |
||||
return |
||||
try: |
||||
answers = dns.resolver.query("_matrix._tcp." + server_name, "SRV") |
||||
for srv in answers: |
||||
yield (srv.target, srv.port) |
||||
except dns.resolver.NXDOMAIN: |
||||
yield (server_name, 8480) |
||||
|
||||
def get_server_keys(server_name, target, port): |
||||
url = "https://%s:%i/_matrix/key/v1" % (target, port) |
||||
keys = json.load(urllib2.urlopen(url)) |
||||
verify_keys = {} |
||||
for key_id, key_base64 in keys["verify_keys"].items(): |
||||
verify_key = decode_verify_key_bytes(key_id, decode_base64(key_base64)) |
||||
verify_signed_json(keys, server_name, verify_key) |
||||
verify_keys[key_id] = verify_key |
||||
return verify_keys |
||||
|
||||
def main(): |
||||
|
||||
parser = argparse.ArgumentParser() |
||||
parser.add_argument("signature_name") |
||||
parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'), |
||||
default=sys.stdin) |
||||
|
||||
args = parser.parse_args() |
||||
logging.basicConfig() |
||||
|
||||
server_name = args.signature_name |
||||
keys = {} |
||||
for target, port in get_targets(server_name): |
||||
try: |
||||
keys = get_server_keys(server_name, target, port) |
||||
print "Using keys from https://%s:%s/_matrix/key/v1" % (target, port) |
||||
write_signing_keys(sys.stdout, keys.values()) |
||||
break |
||||
except: |
||||
logging.exception("Error talking to %s:%s", target, port) |
||||
|
||||
json_to_check = json.load(args.input_json) |
||||
print "Checking JSON:" |
||||
for key_id in json_to_check["signatures"][args.signature_name]: |
||||
try: |
||||
key = keys[key_id] |
||||
verify_signed_json(json_to_check, args.signature_name, key) |
||||
print "PASS %s" % (key_id,) |
||||
except: |
||||
logging.exception("Check for key %s failed" % (key_id,)) |
||||
print "FAIL %s" % (key_id,) |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
main() |
||||
|
Loading…
Reference in new issue