diff --git a/main.py b/main.py index d5663ec..c956892 100755 --- a/main.py +++ b/main.py @@ -6,78 +6,33 @@ import json import base64 import requests from cryptorgaphy import x509 +from cryptography.hazmat.backends import default_backend from jwcrypto import jwk, jwt +api_url = 'https://acme-staging-v02.api.letsencrypt.org/directory' api_client_key_file = './api-client-key.json' -api_url = 'https://acme-staging-v02.api.letsencrypt.org/directory' -answers_dir = '/tmp/' - - -def post_jws_request(key, kid, nonce, url, claims): - header = { - 'alg': 'RS256', - 'nonce': nonce, - 'url': url } - if kid is None: - header['jwk'] = json.loads(self.key.export_public()) - else: - header['kid'] = kid - token = jwt.JWT(header, claims) - token.make_signed_token(self.key) - headers = { 'Content-Type': 'application/jose+json' } - data = token.serialize(False) - return requests.post(url, headers = headers, data = data) - - -def is_valid_token(token): - +answers_prefix = '/tmp/' +csr_file = './cert.csr' +out_cert_file = './cert.crt' class Session: - def __init__(self, url, key, answers_dir, csr): - self.csr = csr + def __init__(self, url, key, csr, answers_prefix, out_cert_file): + self.url = str(url) self.key = key - self.answers_dir = answers_dir - self.url = url - self.url_newnonce = None - self.url_newaccount = None - self.url_neworder = None - self.kid = None - self.nonce = None - - - def get_url_newnonce(self): - if self.url_newnonce is None: - self.fetch_directory() - return self.url_newnonce - - def get_url_newaccount(self): - if self.url_newaccount is None: - self.fetch_directory() - return self.url_newaccount - - def get_url_neworder(self): - if self.url_neworder is None: - self.fetch_directory() - return self.url_neworder - - def get_nonce(self): - if self.nonce is None: - self.fetch_nonce() - return self.nonce - - def get_kid(self): - if self.kid is None: - self.fetch_account() - return self.kid + self.csr = csr + self.answers_prefix = str(answers_prefix) + self.out_cert_file = str(out_cert_file) + # step 1 + # in: self.url + # out: self.url_newnonce, self.url_newaccount, self.url_neworder def fetch_directory(self): - url = self.url print('fetch directory') - print(' get ' + url) - r = requests.get(url) + print(' get ' + self.url) + r = requests.get(self.url) if r.status_code != 200: raise r json = r.json() @@ -88,45 +43,72 @@ class Session: print(' url for new account: ' + self.url_newaccount) print(' url for new order: ' + self.url_neworder) + + # step 2 + # in: self.url_newnonce + # out: self.nonce def fetch_nonce(self): - url = self.get_url_newnonce() print('fetch nonce') - print(' head ' + url) - r = requests.head(url) + print(' head ' + self.url_newnonce) + r = requests.head(self.url_newnonce) if r.status_code != 200: raise r self.nonce = str(r.headers['Replay-Nonce']) print(' nonce: ' + self.nonce) - def fetch_account(self): - url = self.get_url_newaccount() - key = self.key - nonce = self.get_nonce() - print('fetch account') + + # common method using in all following steps to send signed requests + # in: self.nonce, self.key, self.kid + # out: self.nonce + # raises: requests.Response + # returns: requests.Response + def post_jws_request(url, claims, expected_status_code): print(' post jws ' + url) - claims = { 'termsOfServiceAgreed': True } - r = post_jws_request(key, None, nonce, url, claims) - if (r.status_code != 200) and (r.status_code != 201): + + header = { + 'alg': 'RS256', + 'nonce': self.nonce, + 'url': url } + if self.kid is None: + header['jwk'] = json.loads(self.key.export_public()) + else: + header['kid'] = self.kid + + token = jwt.JWT(header, claims) + token.make_signed_token(self.key) + data = token.serialize(False) + + headers = { 'Content-Type': 'application/jose+json' } + r = requests.post(url, headers = headers, data = data) + if r.status_code != expected_status_code: + print(' wrong status code ' + str(r.status_code)) + print(' expected ' + str(expected_status_code)) raise r self.nonce = str(r.headers['Replay-Nonce']) - self.kid = str(r.headers['Location']) print(' nonce: ' + self.nonce) + return r + + + # step 3 + # uses: post_jws_request (also see 'in', 'out' and 'raises' there) + # in: self.url_newaccount + # out: self.kid + def fetch_account(self): + print('fetch account') + self.kid = None + claims = { 'termsOfServiceAgreed': True } + r = self.post_jws_request(url, claims, 201) + self.kid = str(r.headers['Location']) print(' kid: ' + self.kid) + - def process_order(self): - url = self.get_url_neworder() - key = self.key - kid = self.get_kid() - nonce = self.get_nonce() - csr = self.csr - answers_dir = self.answers_dir - out_cert_file = self.out_cert_file - print('process order') - - # step 1. create order - + # step 4 + # uses: post_jws_request (also see 'in', 'out' and 'raises' there) + # in: self.url_neworder + # out: self.url_order, self.url_authorizations, self.url_finalize, self.url_cert + def create_order(self): print('create order') - print(' post jws ' + url) + identifiers = [] for attribute in csr.subject: if attribute.oid == x509.NameOID.COMMON_NAME: @@ -136,190 +118,218 @@ class Session: print(' csr name: ' + str(attribute.value)) else: print(' WARNING: not supported attribute in csr: ' + str(attribute.rfc4514_string())) - - end = str(cert.not_valid_after.isoformat()) - claims = { - 'identifiers': identifiers, - 'notBefore': begin, - 'notAfter': end } - r = post_jws_request(key, kid, nonce, url, claims) - if r.status_code != 201: - raise r - - nonce = str(r.headers['Replay-Nonce']) - url_order = str(r.headers['Location']) - print(' nonce: ' + self.nonce) - print(' url for order: ' + url_order) + claims = { 'identifiers': identifiers } + r = self.post_jws_request(url, claims, 201) + + self.url_order = str(r.headers['Location']) json = r.json() - url_authz = [] + self.url_authorizations = [] for x in json['authorizations'] - url_authorizations.append(str(x)) + self.url_authorizations.append(str(x)) print(' url for authorization: ' + str(x)) - url_finalize = str(json['finalize']) - url_cert = str(json['certificate']) - print(' url for finalize: ' + url_finalize) - print(' url for cert: ' + url_cert) - - # step 2. process authorizations - - for url in url_authz: - # step 2.1. get challenge - - print('process authorization') - print(' post jws ' + url) - r = post_jws_request(key, kid, nonce, url, '') - if r.status_code != 200: - raise r - json = r.json() - - print(' identifier type: ' + str(json['identifier']['type'])) - print(' identifier value: ' + str(json['identifier']['value'])) - - url_chall = None - token = None - for x in json['challenges'] - challenge = str(x['type']) - if challenge == 'http-01': - print(' suitable challenge: ' + challenge) - url_chall = str(x['url']) - token = str(x['token']) - break - assert(not url_chall is None) - assert(not token is None) - - print(' url for challenge: ' + url_chall) - print(' url for challenge: ' + url_chall) - print(' challenge token: ' + token) - - print(' verify token') - if len(token) > 256: - raise Exception('token too long, maximum length is 256') - valid_chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_' - for char in token: - if not char in valid_chars: - raise Exception('wrong token, allowed following chars only: ' + valid_chars) - - answer = token + '.' + str(key.thumbprint(jwk.hashes.SHA256())) - print(' challenge answer: ' + answer) - filename = answers_dir + token - print(' write answer to file: ' + filename) - with open(filename, 'w') as f: - f.write( answer ) - - # step 2.2. notify that challenge is ready - - print('notify that challenge is ready') - print(' post jws ' + url_chall) - r = post_jws_request(key, kid, nonce, url_chall, dict()) - if r.status_code != 200: - raise r - - # step 2.3. wait validation - - for i in range(0, 10): - print('wait 5 seconds') - time.sleep(5) - print('check authorization') - print(' post jws ' + url) - r = post_jws_request(key, kid, nonce, url, '') - if r.status_code != 200: - raise r - json = r.json() - status = str(json['status']) - print(' authorization status: ' + status) - if status == 'valid': - print('authorization success') - break - assert(status == 'pending') - - # step 3. wait ready status of order + self.url_finalize = str(json['finalize']) + self.url_cert = str(json['certificate']) + print(' url for finalize: ' + self.url_finalize) + print(' url for cert: ' + self.url_cert) + + + # step 5.1 + # uses: post_jws_request (also see 'in', 'out' and 'raises' there) + # returns: url_chall (str), token (str) + def fetch_authorization(self, url_authorization): + print('fetch authorization') + r = self.post_jws_request(url_authorization, '', 200) + json = r.json() + print(' identifier type: ' + str(json['identifier']['type'])) + print(' identifier value: ' + str(json['identifier']['value'])) + + url_chall = None + token = None + for x in json['challenges'] + challenge = str(x['type']) + if challenge == 'http-01': + print(' suitable challenge: ' + challenge) + url_chall = str(x['url']) + token = str(x['token']) + break + assert(not url_chall is None) + assert(not token is None) - url = url_order + print(' url for challenge: ' + url_chall) + print(' url for challenge: ' + url_chall) + print(' challenge token: ' + token) + + print(' verify token') + if len(token) > 256: + raise Exception('token too long, maximum length is 256') + valid_chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_' + for char in token: + if not char in valid_chars: + raise Exception('wrong token, allowed following chars only: ' + valid_chars) + + return url_chall, token + + + # step 5.2 + # in: self.answers_prefix + def prepare_challenge_answer(self, token): + print('prepare challenge answer') + answer = token + '.' + str(key.thumbprint(jwk.hashes.SHA256())) + print(' answer: ' + answer) + filename = answers_prefix + token + print(' write answer to file: ' + filename) + with open(filename, 'w') as f: + f.write( answer ) + + + # step 5.3 + # uses: post_jws_request (also see 'in', 'out' and 'raises' there) + def notify_challenge_ready(self, url_chall): + print('notify that challenge is ready') + self.post_jws_request(url_chall, dict()) + + + # step 5.4 + # uses: post_jws_request (also see 'in', 'out' and 'raises' there) + def wait_authorization(self, url_authorization): + for i in range(0, 10): + print('wait 5 seconds') + time.sleep(5) + print('check authorization') + r = self.post_jws_request(url_authorization, '', 200) + json = r.json() + status = str(json['status']) + print(' authorization status: ' + status) + if status == 'valid': + print('authorization success') + return + assert(status == 'pending') + raise Exception('authorization was not happened') + + + # step 5 all + # uses: post_jws_request (also see 'in', 'out' and 'raises' there) + # in: self.url_authorizations + def process_authorizations(self): + print('process authorizations') + for url_authorization in self.url_authorizations: + url_chall, token = self.fetch_authorization(url_authorization) + self.prepare_challenge_answer(token) + self.notify_challenge_ready(url_chall) + self.wait_authorization(url_authorization) + print('all authorizations success') + + + # step 6 + # uses: post_jws_request (also see 'in', 'out' and 'raises' there) + # in: self.url_order + def wait_order_ready(self): for i in range(0, 10): print('wait 5 seconds') time.sleep(5) - print('check order') - print(' post jws ' + url) - r = post_jws_request(key, kid, nonce, url, '') - if r.status_code != 200: - raise r + print('check order status') + r = self.post_jws_request(self.url_order, '', 200) json = r.json() status = str(json['status']) print(' order status: ' + status) if status == 'ready': - print('all authorizations success') - break + print('order is ready') + return assert(status == 'pending') - - # step 4. send sertificate signing request - + raise Exception('order was not became ready') + + + # step 7 + # uses: post_jws_request (also see 'in', 'out' and 'raises' there) + # in: self.csr, self.url_finalize + def finalize_order(self): print('finalize order (send CSR)') - print(' post jws ' + url_finalize) csr_data = base64.urlsafe_b64encode( csr.public_bytes(Encoding.DER) ).decode().replace('=', '') print(' csr adta ' + csr_data) claims = { 'csr': csr_data } - r = post_jws_request(key, kid, nonce, url_finalize, claims) + self.post_jws_request(self.url_finalize, claims) if r.status_code != 200: raise r - - # step 5. wait valid status of order - + + + # step 8 + # uses: post_jws_request (also see 'in', 'out' and 'raises' there) + # in: self.url_order + def wait_order_valid(self): for i in range(0, 10): print('wait 5 seconds') time.sleep(5) - print('check order') - print(' post jws ' + url) - r = post_jws_request(key, kid, nonce, url, '') - if r.status_code != 200: - raise r + print('check order status') + r = self.post_jws_request(self.url_order, '', 200) json = r.json() status = str(json['status']) print(' order status: ' + status) if status == 'valid': print('order success') - break + return assert(status == 'processing') - - # step 5. download certificate - - print('download certificate') - print(' post jws ' + url_cert) - r = post_jws_request(key, kid, nonce, url_finalize, '') - if r.status_code != 200: - raise r + raise Exception('order was not became valid') + + + # step 9 + # uses: post_jws_request (also see 'in', 'out' and 'raises' there) + # in: self.url_cert + def fetch_certificate(self): + print('fetch certificate') + r = self.post_jws_request(self.url_cert, '') cert = r.text - + print('downloaded certificate:') print(cert) + if len(cert) > 10000000: + raise Exception('certificate too long, maximum length is 10000000') + print('write certificate to file: ' + out_cert_file) with open(out_cert_file, 'w') as f: f.write(cert) - + + + # all steps + def run(self): + print('run') + self.fetch_directory() + self.fetch_nonce() + self.fetch_account() + self.create_order() + self.process_authorizations() + self.wait_order_ready() + self.finalize_order() + self.wait_order_valid() + self.fetch_certificate() print('done') -print('hello') +print('hello') if not os.path.isfile(api_client_key_file): print('api client key file not found: ' + api_client_key_file) print('generate') key = jwk.JWK(generate = 'RSA', size = 4096) - print('generated key id: ' + str(key.key_id)) - print('generated fingerprint: ' + str(key.thumbprint())) - print('save generated kay to file: ' + api_client_key_file) + print(' generated key id: ' + str(key.key_id)) + print(' generated fingerprint: ' + str(key.thumbprint())) + print(' save generated kay to file: ' + api_client_key_file) with open(api_client_key_file, 'w') as f: f.write( key.export() ) print('load api client key from file: ' + api_client_key_file) with open(api_client_key_file, 'r') as f: key = jwk.JWK( **json.loads(f.read()) ) -print('loaded key id: ' + str(key.key_id)) -print('loaded fingerprint: ' + str(key.thumbprint())) +print(' loaded key id: ' + str(key.key_id)) +print(' loaded fingerprint: ' + str(key.thumbprint())) + -session = Session(key, api_url) -session.create_account() +print('load CSF from file: ' + csr_file) +with open(csr_file, 'rb') as f: + csr = x509.load_pem_x509_csr(f.read(), default_backend()) +session = Session(api_url, key, csr, answers_prefix, out_cert_file) +session.run()