#!/usr/bin/python3
import os
import time
import json
import base64
import requests
from cryptorgaphy import x509
from jwcrypto import jwk, jwt
api_client_key_file = './api-client-key.json'
api_url = 'https://acme-staging-v02.api.letsencrypt.org/directory'
answers_dir = '/tmp/'
def post_jws_request(key, kid, nonce, url, claims):
header = {
'alg': 'RS256',
'nonce': nonce,
'url': url }
if kid is None:
header['jwk'] = json.loads(self.key.export_public())
else:
header['kid'] = kid
token = jwt.JWT(header, claims)
token.make_signed_token(self.key)
headers = { 'Content-Type': 'application/jose+json' }
data = token.serialize(False)
return requests.post(url, headers = headers, data = data)
def is_valid_token(token):
class Session:
def __init__(self, url, key, answers_dir, csr):
self.csr = csr
self.key = key
self.answers_dir = answers_dir
self.url = url
self.url_newnonce = None
self.url_newaccount = None
self.url_neworder = None
self.kid = None
self.nonce = None
def get_url_newnonce(self):
if self.url_newnonce is None:
self.fetch_directory()
return self.url_newnonce
def get_url_newaccount(self):
if self.url_newaccount is None:
self.fetch_directory()
return self.url_newaccount
def get_url_neworder(self):
if self.url_neworder is None:
self.fetch_directory()
return self.url_neworder
def get_nonce(self):
if self.nonce is None:
self.fetch_nonce()
return self.nonce
def get_kid(self):
if self.kid is None:
self.fetch_account()
return self.kid
def fetch_directory(self):
url = self.url
print('fetch directory')
print(' get ' + url)
r = requests.get(url)
if r.status_code != 200:
raise r
json = r.json()
self.url_newnonce = str(json['newNonce'])
self.url_newaccount = str(json['newAccount'])
self.url_neworder = str(json['newOrder'])
print(' url for new nonce: ' + self.url_newnonce)
print(' url for new account: ' + self.url_newaccount)
print(' url for new order: ' + self.url_neworder)
def fetch_nonce(self):
url = self.get_url_newnonce()
print('fetch nonce')
print(' head ' + url)
r = requests.head(url)
if r.status_code != 200:
raise r
self.nonce = str(r.headers['Replay-Nonce'])
print(' nonce: ' + self.nonce)
def fetch_account(self):
url = self.get_url_newaccount()
key = self.key
nonce = self.get_nonce()
print('fetch account')
print(' post jws ' + url)
claims = { 'termsOfServiceAgreed': True }
r = post_jws_request(key, None, nonce, url, claims)
if (r.status_code != 200) and (r.status_code != 201):
raise r
self.nonce = str(r.headers['Replay-Nonce'])
self.kid = str(r.headers['Location'])
print(' nonce: ' + self.nonce)
print(' kid: ' + self.kid)
def process_order(self):
url = self.get_url_neworder()
key = self.key
kid = self.get_kid()
nonce = self.get_nonce()
csr = self.csr
answers_dir = self.answers_dir
out_cert_file = self.out_cert_file
print('process order')
# step 1. create order
print('create order')
print(' post jws ' + url)
identifiers = []
for attribute in csr.subject:
if attribute.oid == x509.NameOID.COMMON_NAME:
identifiers.append({
'type': 'dns',
'value': str(attribute.value) })
print(' csr name: ' + str(attribute.value))
else:
print(' WARNING: not supported attribute in csr: ' + str(attribute.rfc4514_string()))
end = str(cert.not_valid_after.isoformat())
claims = {
'identifiers': identifiers,
'notBefore': begin,
'notAfter': end }
r = post_jws_request(key, kid, nonce, url, claims)
if r.status_code != 201:
raise r
nonce = str(r.headers['Replay-Nonce'])
url_order = str(r.headers['Location'])
print(' nonce: ' + self.nonce)
print(' url for order: ' + url_order)
json = r.json()
url_authz = []
for x in json['authorizations']
url_authorizations.append(str(x))
print(' url for authorization: ' + str(x))
url_finalize = str(json['finalize'])
url_cert = str(json['certificate'])
print(' url for finalize: ' + url_finalize)
print(' url for cert: ' + url_cert)
# step 2. process authorizations
for url in url_authz:
# step 2.1. get challenge
print('process authorization')
print(' post jws ' + url)
r = post_jws_request(key, kid, nonce, url, '')
if r.status_code != 200:
raise r
json = r.json()
print(' identifier type: ' + str(json['identifier']['type']))
print(' identifier value: ' + str(json['identifier']['value']))
url_chall = None
token = None
for x in json['challenges']
challenge = str(x['type'])
if challenge == 'http-01':
print(' suitable challenge: ' + challenge)
url_chall = str(x['url'])
token = str(x['token'])
break
assert(not url_chall is None)
assert(not token is None)
print(' url for challenge: ' + url_chall)
print(' url for challenge: ' + url_chall)
print(' challenge token: ' + token)
print(' verify token')
if len(token) > 256:
raise Exception('token too long, maximum length is 256')
valid_chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
for char in token:
if not char in valid_chars:
raise Exception('wrong token, allowed following chars only: ' + valid_chars)
answer = token + '.' + str(key.thumbprint(jwk.hashes.SHA256()))
print(' challenge answer: ' + answer)
filename = answers_dir + token
print(' write answer to file: ' + filename)
with open(filename, 'w') as f:
f.write( answer )
# step 2.2. notify that challenge is ready
print('notify that challenge is ready')
print(' post jws ' + url_chall)
r = post_jws_request(key, kid, nonce, url_chall, dict())
if r.status_code != 200:
raise r
# step 2.3. wait validation
for i in range(0, 10):
print('wait 5 seconds')
time.sleep(5)
print('check authorization')
print(' post jws ' + url)
r = post_jws_request(key, kid, nonce, url, '')
if r.status_code != 200:
raise r
json = r.json()
status = str(json['status'])
print(' authorization status: ' + status)
if status == 'valid':
print('authorization success')
break
assert(status == 'pending')
# step 3. wait ready status of order
url = url_order
for i in range(0, 10):
print('wait 5 seconds')
time.sleep(5)
print('check order')
print(' post jws ' + url)
r = post_jws_request(key, kid, nonce, url, '')
if r.status_code != 200:
raise r
json = r.json()
status = str(json['status'])
print(' order status: ' + status)
if status == 'ready':
print('all authorizations success')
break
assert(status == 'pending')
# step 4. send sertificate signing request
print('finalize order (send CSR)')
print(' post jws ' + url_finalize)
csr_data = base64.urlsafe_b64encode( csr.public_bytes(Encoding.DER) ).decode().replace('=', '')
print(' csr adta ' + csr_data)
claims = { 'csr': csr_data }
r = post_jws_request(key, kid, nonce, url_finalize, claims)
if r.status_code != 200:
raise r
# step 5. wait valid status of order
for i in range(0, 10):
print('wait 5 seconds')
time.sleep(5)
print('check order')
print(' post jws ' + url)
r = post_jws_request(key, kid, nonce, url, '')
if r.status_code != 200:
raise r
json = r.json()
status = str(json['status'])
print(' order status: ' + status)
if status == 'valid':
print('order success')
break
assert(status == 'processing')
# step 5. download certificate
print('download certificate')
print(' post jws ' + url_cert)
r = post_jws_request(key, kid, nonce, url_finalize, '')
if r.status_code != 200:
raise r
cert = r.text
print('downloaded certificate:')
print(cert)
print('write certificate to file: ' + out_cert_file)
with open(out_cert_file, 'w') as f:
f.write(cert)
print('done')
print('hello')
if not os.path.isfile(api_client_key_file):
print('api client key file not found: ' + api_client_key_file)
print('generate')
key = jwk.JWK(generate = 'RSA', size = 4096)
print('generated key id: ' + str(key.key_id))
print('generated fingerprint: ' + str(key.thumbprint()))
print('save generated kay to file: ' + api_client_key_file)
with open(api_client_key_file, 'w') as f:
f.write( key.export() )
print('load api client key from file: ' + api_client_key_file)
with open(api_client_key_file, 'r') as f:
key = jwk.JWK( **json.loads(f.read()) )
print('loaded key id: ' + str(key.key_id))
print('loaded fingerprint: ' + str(key.thumbprint()))
session = Session(key, api_url)
session.create_account()