#!/usr/bin/python3
import os
import sys
import time
import json
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from jwcrypto import jwk, jws
from jwcrypto.common import base64url_encode
#api_url = 'https://acme-staging-v02.api.letsencrypt.org/directory'
api_url = 'https://acme-v02.api.letsencrypt.org/directory'
api_client_key_file = '/home/acmeclient/acmeclient-root/api-client-key.json'
answers_prefix = '/home/acmeclient/acmeclient-root/challenge/'
def log(*args, **kwargs):
print(*args, **kwargs, flush = True)
class Session:
def __init__(self, url, key, csr, answers_prefix, out_cert_file):
self.url = str(url)
self.key = key
self.csr = csr
self.answers_prefix = str(answers_prefix)
self.out_cert_file = str(out_cert_file)
# common method uses in many steps to send unsigned requests
# returns: requests.Response
def get_request(self, url):
log(' get ' + url)
r = requests.get(url)
if r.status_code != 200:
log('response status code: ' + str(r.status_code))
log('response headers')
log(r.headers)
log('response body')
log(r.text)
raise Exception('unexpected server answer')
return r
# common method uses in many steps to send signed requests
# in: self.nonce, self.key, self.kid
# out: self.nonce
# returns: requests.Response
def post_signed_request(self, url, claims):
log(' post signed ' + url)
header = {
'alg': 'RS256',
'nonce': self.nonce,
'url': url }
if self.kid is None:
header['jwk'] = json.loads(self.key.export_public())
else:
header['kid'] = self.kid
signer = jws.JWS(json.dumps(claims))
signer.add_signature(self.key, protected = json.dumps(header))
data = signer.serialize()
headers = { 'Content-Type': 'application/jose+json' }
r = requests.post(url, headers = headers, data = data)
if r.status_code != 200 and r.status_code != 201:
log('response status code ' + str(r.status_code))
log('response headers')
log(r.headers)
log('response body')
log(r.text)
raise Exception('unexpected server answer')
self.nonce = str(r.headers['Replay-Nonce'])
log(' nonce: ' + self.nonce)
return r
# step 1
# uses: get_request
# in: self.url
# out: self.url_newnonce, self.url_newaccount, self.url_neworder
def fetch_directory(self):
log('fetch directory')
r = self.get_request(self.url)
json = r.json()
self.url_newnonce = str(json['newNonce'])
self.url_newaccount = str(json['newAccount'])
self.url_neworder = str(json['newOrder'])
log(' url for new nonce: ' + self.url_newnonce)
log(' url for new account: ' + self.url_newaccount)
log(' url for new order: ' + self.url_neworder)
# step 2
# in: self.url_newnonce
# out: self.nonce
def fetch_nonce(self):
log('fetch nonce')
log(' head ' + self.url_newnonce)
r = requests.head(self.url_newnonce)
if r.status_code != 200:
log('response status code: ' + r.status_code)
log('response headers')
log(r.headers)
log('response body')
log(r.text)
raise Exception('unexpected server answer')
self.nonce = str(r.headers['Replay-Nonce'])
log(' nonce: ' + self.nonce)
# step 3
# uses: post_signed_request (also see 'in' and 'out' there)
# in: self.url_newaccount
# out: self.kid
def fetch_account(self):
log('fetch account')
self.kid = None
claims = { 'termsOfServiceAgreed': True }
r = self.post_signed_request(self.url_newaccount, claims)
self.kid = str(r.headers['Location'])
log(' kid: ' + self.kid)
# step 4
# uses: post_signed_request (also see 'in' and 'out' there)
# in: self.csr, self.url_neworder
# out: self.url_order, self.url_authorizations, self.url_finalize
def create_order(self):
log('create order')
common_name = str( self.csr.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value )
log(' csr common name: ' + common_name)
names = { common_name }
try:
extension = self.csr.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
for x in extension.value.get_values_for_type(x509.DNSName):
names.add(str(x))
except x509.ExtensionNotFound:
pass
identifiers = []
for x in names:
identifiers.append({
'type': 'dns',
'value': x })
log(' csr name: ' + x)
claims = { 'identifiers': identifiers }
r = self.post_signed_request(self.url_neworder, claims)
self.url_order = str(r.headers['Location'])
json = r.json()
self.url_authorizations = []
for x in json['authorizations']:
self.url_authorizations.append(str(x))
log(' url for authorization: ' + str(x))
self.url_finalize = str(json['finalize'])
log(' url for finalize: ' + self.url_finalize)
# step 5.1
# uses: get_request
# returns: url_chall (str), token (str)
def fetch_authorization(self, url_authorization):
log('fetch authorization')
r = self.get_request(url_authorization)
json = r.json()
log(' identifier type: ' + str(json['identifier']['type']))
log(' identifier value: ' + str(json['identifier']['value']))
url_chall = None
token = None
for x in json['challenges']:
challenge = str(x['type'])
if challenge == 'http-01':
log(' suitable challenge: ' + challenge)
url_chall = str(x['url'])
token = str(x['token'])
break
assert(not url_chall is None)
assert(not token is None)
log(' url for challenge: ' + url_chall)
log(' url for challenge: ' + url_chall)
log(' challenge token: ' + token)
log(' verify token')
if len(token) > 256:
raise Exception('token too long, maximum length is 256')
valid_chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
for char in token:
if not char in valid_chars:
raise Exception('wrong token, allowed following chars only: ' + valid_chars)
return url_chall, token
# step 5.2
# in: self.answers_prefix
# returns: temporary filename to remove after authorization
def prepare_challenge_answer(self, token):
log('prepare challenge answer')
answer = token + '.' + str(key.thumbprint(jwk.hashes.SHA256()))
log(' answer: ' + answer)
filename = answers_prefix + token
log(' write answer to file: ' + filename)
with open(filename, 'w') as f:
f.write( answer )
return filename
# step 5.3
# uses: post_signed_request (also see 'in' and 'out' there)
def notify_challenge_ready(self, url_chall):
log('notify that challenge is ready')
self.post_signed_request(url_chall, dict())
# step 5.4
# uses: get_request
def wait_authorization(self, url_authorization):
for i in range(0, 10):
log('wait 5 seconds')
time.sleep(5)
log('check authorization')
r = self.get_request(url_authorization)
json = r.json()
status = str(json['status'])
log(' authorization status: ' + status)
if status == 'valid':
log('authorization success')
return
assert(status == 'pending')
raise Exception('authorization was not happened')
# step 5.5
# in: self.answers_prefix
def remove_challenge_answer(self, filename):
log('remove challenge answer file: ' + filename)
os.remove(filename)
# step 5 all
# uses: get_request, post_signed_request (also see 'in' and 'out' there)
# in: self.url_authorizations
def process_authorizations(self):
log('process authorizations')
for url_authorization in self.url_authorizations:
url_chall, token = self.fetch_authorization(url_authorization)
tmpfile = self.prepare_challenge_answer(token)
self.notify_challenge_ready(url_chall)
self.wait_authorization(url_authorization)
self.remove_challenge_answer(tmpfile)
log('all authorizations success')
# step 6
# uses: get_request
# in: self.url_order
def wait_order_ready(self):
for i in range(0, 10):
log('wait 5 seconds')
time.sleep(5)
log('check order status')
r = self.get_request(self.url_order)
json = r.json()
status = str(json['status'])
log(' order status: ' + status)
if status == 'ready':
log('order is ready')
return
assert(status == 'pending')
raise Exception('order was not became ready')
# step 7
# uses: post_signed_request (also see 'in' and 'out' there)
# in: self.csr, self.url_finalize
def finalize_order(self):
log('finalize order (send CSR)')
csr_data = base64url_encode(self.csr.public_bytes(Encoding.DER))
log(' csr data: ' + csr_data)
claims = { 'csr': csr_data }
self.post_signed_request(self.url_finalize, claims)
# step 8
# uses: get_request
# in: self.url_order
# out: self.url_cert
def wait_order_valid(self):
for i in range(0, 10):
log('wait 5 seconds')
time.sleep(5)
log('check order status')
r = self.get_request(self.url_order)
json = r.json()
status = str(json['status'])
log(' order status: ' + status)
if status == 'valid':
self.url_cert = str(json['certificate'])
log(' url for cert: ' + self.url_cert)
log('order success')
return
assert(status == 'processing')
raise Exception('order was not became valid')
# step 9
# uses: get_request
# in: self.url_cert
def fetch_certificate(self):
log('fetch certificate')
r = self.get_request(self.url_cert)
cert = r.text
if len(cert) > 10000000:
raise Exception('certificate too long, max length is 10000000')
log('downloaded certificate:')
log(cert)
log('write certificate to file: ' + out_cert_file)
with open(out_cert_file, 'w') as f:
f.write(cert)
# all steps
def run(self):
log('run')
self.fetch_directory()
self.fetch_nonce()
self.fetch_account()
self.create_order()
self.process_authorizations()
self.wait_order_ready()
self.finalize_order()
self.wait_order_valid()
self.fetch_certificate()
log('done')
if len(sys.argv) != 3 or (len(sys.argv) == 2 and sys.argv[1] == '--help'):
log('Usage: ' + sys.argv[0] + ' /path/to/certificate/sign/request.csr /path/to/out/certificate.crt')
quit()
csr_file = str(sys.argv[1])
out_cert_file = str(sys.argv[2])
log('hello')
log('load api client key from file: ' + api_client_key_file)
with open(api_client_key_file, 'r') as f:
key = jwk.JWK( **json.loads(f.read()) )
log(' loaded fingerprint: ' + str(key.thumbprint()))
log('load CSF from file: ' + csr_file)
with open(csr_file, 'rb') as f:
csr = x509.load_pem_x509_csr(f.read(), default_backend())
session = Session(api_url, key, csr, answers_prefix, out_cert_file)
session.run()