#!/usr/bin/python3
import sys
import time
import json
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from jwcrypto import jwk, jws
from jwcrypto.common import base64url_encode
api_url = 'https://acme-staging-v02.api.letsencrypt.org/directory'
#api_url = 'https://acme-v02.api.letsencrypt.org/directory'
api_client_key_file = '/home/bw/work/dev/acmeclient-data/api-client-key.json'
answers_prefix = '/var/www/html/.well-known/acme-challenge/'
class Session:
def __init__(self, url, key, csr, answers_prefix, out_cert_file):
self.url = str(url)
self.key = key
self.csr = csr
self.answers_prefix = str(answers_prefix)
self.out_cert_file = str(out_cert_file)
# common method uses in many steps to send unsigned requests
# returns: requests.Response
def get_request(self, url):
print(' get ' + url)
r = requests.get(url)
if r.status_code != 200:
print('response status code: ' + str(r.status_code))
print('response headers')
print(r.headers)
print('response body')
print(r.text)
raise Exception('unexpected server answer')
return r
# common method uses in many steps to send signed requests
# in: self.nonce, self.key, self.kid
# out: self.nonce
# returns: requests.Response
def post_signed_request(self, url, claims):
print(' post signed ' + url)
header = {
'alg': 'RS256',
'nonce': self.nonce,
'url': url }
if self.kid is None:
header['jwk'] = json.loads(self.key.export_public())
else:
header['kid'] = self.kid
signer = jws.JWS(json.dumps(claims))
signer.add_signature(self.key, protected = json.dumps(header))
data = signer.serialize()
headers = { 'Content-Type': 'application/jose+json' }
r = requests.post(url, headers = headers, data = data)
if r.status_code != 200 and r.status_code != 201:
print('response status code ' + str(r.status_code))
print('response headers')
print(r.headers)
print('response body')
print(r.text)
raise Exception('unexpected server answer')
self.nonce = str(r.headers['Replay-Nonce'])
print(' nonce: ' + self.nonce)
return r
# step 1
# uses: get_request
# in: self.url
# out: self.url_newnonce, self.url_newaccount, self.url_neworder
def fetch_directory(self):
print('fetch directory')
r = self.get_request(self.url)
json = r.json()
self.url_newnonce = str(json['newNonce'])
self.url_newaccount = str(json['newAccount'])
self.url_neworder = str(json['newOrder'])
print(' url for new nonce: ' + self.url_newnonce)
print(' url for new account: ' + self.url_newaccount)
print(' url for new order: ' + self.url_neworder)
# step 2
# in: self.url_newnonce
# out: self.nonce
def fetch_nonce(self):
print('fetch nonce')
print(' head ' + self.url_newnonce)
r = requests.head(self.url_newnonce)
if r.status_code != 200:
print('response status code: ' + r.status_code)
print('response headers')
print(r.headers)
print('response body')
print(r.text)
raise Exception('unexpected server answer')
self.nonce = str(r.headers['Replay-Nonce'])
print(' nonce: ' + self.nonce)
# step 3
# uses: post_signed_request (also see 'in' and 'out' there)
# in: self.url_newaccount
# out: self.kid
def fetch_account(self):
print('fetch account')
self.kid = None
claims = { 'termsOfServiceAgreed': True }
r = self.post_signed_request(self.url_newaccount, claims)
self.kid = str(r.headers['Location'])
print(' kid: ' + self.kid)
# step 4
# uses: post_signed_request (also see 'in' and 'out' there)
# in: self.csr, self.url_neworder
# out: self.url_order, self.url_authorizations, self.url_finalize
def create_order(self):
print('create order')
common_name = str( self.csr.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value )
print(' csr common name: ' + common_name)
names = { common_name }
try:
extension = self.csr.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
for x in extension.value.get_values_for_type(x509.DNSName):
names.add(str(x))
except x509.ExtensionNotFound:
pass
identifiers = []
for x in names:
identifiers.append({
'type': 'dns',
'value': x })
print(' csr name: ' + x)
claims = { 'identifiers': identifiers }
r = self.post_signed_request(self.url_neworder, claims)
self.url_order = str(r.headers['Location'])
json = r.json()
self.url_authorizations = []
for x in json['authorizations']:
self.url_authorizations.append(str(x))
print(' url for authorization: ' + str(x))
self.url_finalize = str(json['finalize'])
print(' url for finalize: ' + self.url_finalize)
# step 5.1
# uses: get_request
# returns: url_chall (str), token (str)
def fetch_authorization(self, url_authorization):
print('fetch authorization')
r = self.get_request(url_authorization)
json = r.json()
print(' identifier type: ' + str(json['identifier']['type']))
print(' identifier value: ' + str(json['identifier']['value']))
url_chall = None
token = None
for x in json['challenges']:
challenge = str(x['type'])
if challenge == 'http-01':
print(' suitable challenge: ' + challenge)
url_chall = str(x['url'])
token = str(x['token'])
break
assert(not url_chall is None)
assert(not token is None)
print(' url for challenge: ' + url_chall)
print(' url for challenge: ' + url_chall)
print(' challenge token: ' + token)
print(' verify token')
if len(token) > 256:
raise Exception('token too long, maximum length is 256')
valid_chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
for char in token:
if not char in valid_chars:
raise Exception('wrong token, allowed following chars only: ' + valid_chars)
return url_chall, token
# step 5.2
# in: self.answers_prefix
def prepare_challenge_answer(self, token):
print('prepare challenge answer')
answer = token + '.' + str(key.thumbprint(jwk.hashes.SHA256()))
print(' answer: ' + answer)
filename = answers_prefix + token
print(' write answer to file: ' + filename)
with open(filename, 'w') as f:
f.write( answer )
# step 5.3
# uses: post_signed_request (also see 'in' and 'out' there)
def notify_challenge_ready(self, url_chall):
print('notify that challenge is ready')
self.post_signed_request(url_chall, dict())
# step 5.4
# uses: get_request
def wait_authorization(self, url_authorization):
for i in range(0, 10):
print('wait 5 seconds')
time.sleep(5)
print('check authorization')
r = self.get_request(url_authorization)
json = r.json()
status = str(json['status'])
print(' authorization status: ' + status)
if status == 'valid':
print('authorization success')
return
assert(status == 'pending')
raise Exception('authorization was not happened')
# step 5 all
# uses: get_request, post_signed_request (also see 'in' and 'out' there)
# in: self.url_authorizations
def process_authorizations(self):
print('process authorizations')
for url_authorization in self.url_authorizations:
url_chall, token = self.fetch_authorization(url_authorization)
self.prepare_challenge_answer(token)
self.notify_challenge_ready(url_chall)
self.wait_authorization(url_authorization)
print('all authorizations success')
# step 6
# uses: get_request
# in: self.url_order
def wait_order_ready(self):
for i in range(0, 10):
print('wait 5 seconds')
time.sleep(5)
print('check order status')
r = self.get_request(self.url_order)
json = r.json()
status = str(json['status'])
print(' order status: ' + status)
if status == 'ready':
print('order is ready')
return
assert(status == 'pending')
raise Exception('order was not became ready')
# step 7
# uses: post_signed_request (also see 'in' and 'out' there)
# in: self.csr, self.url_finalize
def finalize_order(self):
print('finalize order (send CSR)')
csr_data = base64url_encode(self.csr.public_bytes(Encoding.DER))
print(' csr data: ' + csr_data)
claims = { 'csr': csr_data }
self.post_signed_request(self.url_finalize, claims)
# step 8
# uses: get_request
# in: self.url_order
# out: self.url_cert
def wait_order_valid(self):
for i in range(0, 10):
print('wait 5 seconds')
time.sleep(5)
print('check order status')
r = self.get_request(self.url_order)
json = r.json()
status = str(json['status'])
print(' order status: ' + status)
if status == 'valid':
self.url_cert = str(json['certificate'])
print(' url for cert: ' + self.url_cert)
print('order success')
return
assert(status == 'processing')
raise Exception('order was not became valid')
# step 9
# uses: get_request
# in: self.url_cert
def fetch_certificate(self):
print('fetch certificate')
r = self.get_request(self.url_cert)
cert = r.text
if len(cert) > 10000000:
raise Exception('certificate too long, max length is 10000000')
print('downloaded certificate:')
print(cert)
print('write certificate to file: ' + out_cert_file)
with open(out_cert_file, 'w') as f:
f.write(cert)
# all steps
def run(self):
print('run')
self.fetch_directory()
self.fetch_nonce()
self.fetch_account()
self.create_order()
self.process_authorizations()
self.wait_order_ready()
self.finalize_order()
self.wait_order_valid()
self.fetch_certificate()
print('done')
if len(sys.argv) != 3 or (len(sys.argv) == 2 and sys.argv[1] == '--help'):
print('Usage: ' + sys.argv[0] + ' /path/to/certificate/sign/request.csr /path/to/out/certificate.crt')
quit()
csr_file = str(sys.argv[1])
out_cert_file = str(sys.argv[2])
print('hello')
print('load api client key from file: ' + api_client_key_file)
with open(api_client_key_file, 'r') as f:
key = jwk.JWK( **json.loads(f.read()) )
print(' loaded fingerprint: ' + str(key.thumbprint()))
print('load CSF from file: ' + csr_file)
with open(csr_file, 'rb') as f:
csr = x509.load_pem_x509_csr(f.read(), default_backend())
session = Session(api_url, key, csr, answers_prefix, out_cert_file)
session.run()