Blob Blame History Raw
#!/usr/bin/python3

import os
import sys
import time
import json
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from jwcrypto import jwk, jws
from jwcrypto.common import base64url_encode


#api_url             = 'https://acme-staging-v02.api.letsencrypt.org/directory'
api_url             = 'https://acme-v02.api.letsencrypt.org/directory'
api_client_key_file = '/home/acmeclient/acmeclient-root/api-client-key.json'
answers_prefix      = '/home/acmeclient/acmeclient-root/challenge/'


def log(*args, **kwargs):
  print(*args, **kwargs, flush = True)


class Session:
  def __init__(self, url, key, csr, answers_prefix, out_cert_file):
    self.url = str(url)
    self.key = key
    self.csr = csr
    self.answers_prefix = str(answers_prefix)
    self.out_cert_file = str(out_cert_file)


  # common method uses in many steps to send unsigned requests
  # returns: requests.Response
  def get_request(self, url):
    log('  get ' + url)
    r = requests.get(url)
    if r.status_code != 200:
      log('response status code: ' + str(r.status_code))
      log('response headers')
      log(r.headers)
      log('response body')
      log(r.text)
      raise Exception('unexpected server answer')
    return r


  # common method uses in many steps to send signed requests
  # in: self.nonce, self.key, self.kid
  # out: self.nonce
  # returns: requests.Response
  def post_signed_request(self, url, claims):
    log('  post signed ' + url)

    header = {
      'alg': 'RS256',
      'nonce': self.nonce,
      'url': url }
    if self.kid is None:
      header['jwk'] = json.loads(self.key.export_public())
    else:
      header['kid'] = self.kid

    signer = jws.JWS(json.dumps(claims))
    signer.add_signature(self.key, protected = json.dumps(header))
    data = signer.serialize()

    headers = { 'Content-Type': 'application/jose+json' }
    r = requests.post(url, headers = headers, data = data)
    if r.status_code != 200 and r.status_code != 201:
      log('response status code ' + str(r.status_code))
      log('response headers')
      log(r.headers)
      log('response body')
      log(r.text)
      raise Exception('unexpected server answer')
    self.nonce = str(r.headers['Replay-Nonce'])
    log('  nonce: ' + self.nonce)
    return r


  # step 1
  # uses: get_request
  # in: self.url
  # out: self.url_newnonce, self.url_newaccount, self.url_neworder
  def fetch_directory(self):
    log('fetch directory')
    r = self.get_request(self.url)
    json = r.json()
    self.url_newnonce = str(json['newNonce'])
    self.url_newaccount = str(json['newAccount'])
    self.url_neworder = str(json['newOrder'])
    log('  url for new nonce: ' + self.url_newnonce)
    log('  url for new account: ' + self.url_newaccount)
    log('  url for new order: ' + self.url_neworder)


  # step 2
  # in: self.url_newnonce
  # out: self.nonce
  def fetch_nonce(self):
    log('fetch nonce')
    log('  head ' + self.url_newnonce)
    r = requests.head(self.url_newnonce)
    if r.status_code != 200:
      log('response status code: ' + r.status_code)
      log('response headers')
      log(r.headers)
      log('response body')
      log(r.text)
      raise Exception('unexpected server answer')
    self.nonce = str(r.headers['Replay-Nonce'])
    log('  nonce: ' + self.nonce)


  # step 3
  # uses: post_signed_request (also see 'in' and 'out' there)
  # in: self.url_newaccount
  # out: self.kid
  def fetch_account(self):
    log('fetch account')
    self.kid = None
    claims = { 'termsOfServiceAgreed': True }
    r = self.post_signed_request(self.url_newaccount, claims)
    self.kid = str(r.headers['Location'])
    log('  kid: ' + self.kid)


  # step 4
  # uses: post_signed_request (also see 'in' and 'out' there)
  # in: self.csr, self.url_neworder
  # out: self.url_order, self.url_authorizations, self.url_finalize
  def create_order(self):
    log('create order')

    common_name = str( self.csr.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value )
    log('  csr common name: ' + common_name)

    names = { common_name }
    try:
      extension = self.csr.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
      for x in extension.value.get_values_for_type(x509.DNSName):
        names.add(str(x))
    except x509.ExtensionNotFound:
        pass

    identifiers = []
    for x in names:
      identifiers.append({
        'type': 'dns',
        'value': x })
      log('  csr name: ' + x)
    claims = { 'identifiers': identifiers }

    r = self.post_signed_request(self.url_neworder, claims)

    self.url_order = str(r.headers['Location'])

    json = r.json()
    self.url_authorizations = []
    for x in json['authorizations']:
      self.url_authorizations.append(str(x))
      log('  url for authorization: ' + str(x))
    self.url_finalize = str(json['finalize'])
    log('  url for finalize: ' + self.url_finalize)


  # step 5.1
  # uses: get_request
  # returns: url_chall (str), token (str)
  def fetch_authorization(self, url_authorization):
    log('fetch authorization')
    r = self.get_request(url_authorization)
    json = r.json()
    log('  identifier type: ' + str(json['identifier']['type']))
    log('  identifier value: ' + str(json['identifier']['value']))

    url_chall = None
    token = None
    for x in json['challenges']:
      challenge = str(x['type'])
      if challenge == 'http-01':
        log('  suitable challenge: ' + challenge)
        url_chall = str(x['url'])
        token = str(x['token'])
        break
    assert(not url_chall is None)
    assert(not token is None)

    log('  url for challenge: ' + url_chall)
    log('  url for challenge: ' + url_chall)
    log('  challenge token: ' + token)

    log('  verify token')
    if len(token) > 256:
        raise Exception('token too long, maximum length is 256')
    valid_chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
    for char in token:
      if not char in valid_chars:
        raise Exception('wrong token, allowed following chars only: ' + valid_chars)

    return url_chall, token


  # step 5.2
  # in: self.answers_prefix
  # returns: temporary filename to remove after authorization
  def prepare_challenge_answer(self, token):
    log('prepare challenge answer')
    answer = token + '.' + str(key.thumbprint(jwk.hashes.SHA256()))
    log('  answer: ' + answer)
    filename = answers_prefix + token
    log('  write answer to file: ' + filename)
    with open(filename, 'w') as f:
      f.write( answer )
    return filename


  # step 5.3
  # uses: post_signed_request (also see 'in' and 'out' there)
  def notify_challenge_ready(self, url_chall):
    log('notify that challenge is ready')
    self.post_signed_request(url_chall, dict())


  # step 5.4
  # uses: get_request
  def wait_authorization(self, url_authorization):
    for i in range(0, 10):
      log('wait 5 seconds')
      time.sleep(5)
      log('check authorization')
      r = self.get_request(url_authorization)
      json = r.json()
      status = str(json['status'])
      log('  authorization status: ' + status)
      if status == 'valid':
        log('authorization success')
        return
      assert(status == 'pending')
    raise Exception('authorization was not happened')


  # step 5.5
  # in: self.answers_prefix
  def remove_challenge_answer(self, filename):
    log('remove challenge answer file: ' + filename)
    os.remove(filename)


  # step 5 all
  # uses: get_request, post_signed_request (also see 'in' and 'out' there)
  # in: self.url_authorizations
  def process_authorizations(self):
    log('process authorizations')
    for url_authorization in self.url_authorizations:
      url_chall, token = self.fetch_authorization(url_authorization)
      tmpfile = self.prepare_challenge_answer(token)
      self.notify_challenge_ready(url_chall)
      self.wait_authorization(url_authorization)
      self.remove_challenge_answer(tmpfile)
    log('all authorizations success')


  # step 6
  # uses: get_request
  # in: self.url_order
  def wait_order_ready(self):
    for i in range(0, 10):
      log('wait 5 seconds')
      time.sleep(5)
      log('check order status')
      r = self.get_request(self.url_order)
      json = r.json()
      status = str(json['status'])
      log('  order status: ' + status)
      if status == 'ready':
        log('order is ready')
        return
      assert(status == 'pending')
    raise Exception('order was not became ready')


  # step 7
  # uses: post_signed_request (also see 'in' and 'out' there)
  # in: self.csr, self.url_finalize
  def finalize_order(self):
    log('finalize order (send CSR)')
    csr_data = base64url_encode(self.csr.public_bytes(Encoding.DER))
    log('  csr data: ' + csr_data)
    claims = { 'csr': csr_data }
    self.post_signed_request(self.url_finalize, claims)


  # step 8
  # uses: get_request
  # in: self.url_order
  # out: self.url_cert
  def wait_order_valid(self):
    for i in range(0, 10):
      log('wait 5 seconds')
      time.sleep(5)
      log('check order status')
      r = self.get_request(self.url_order)
      json = r.json()
      status = str(json['status'])
      log('  order status: ' + status)
      if status == 'valid':
        self.url_cert = str(json['certificate'])
        log('  url for cert: ' + self.url_cert)
        log('order success')
        return
      assert(status == 'processing')
    raise Exception('order was not became valid')


  # step 9
  # uses: get_request
  # in: self.url_cert
  def fetch_certificate(self):
    log('fetch certificate')
    r = self.get_request(self.url_cert)
    cert = r.text

    if len(cert) > 10000000:
      raise Exception('certificate too long, max length is 10000000')

    log('downloaded certificate:')
    log(cert)
    log('write certificate to file: ' + out_cert_file)
    with open(out_cert_file, 'w') as f:
      f.write(cert)


  # all steps
  def run(self):
    log('run')
    self.fetch_directory()
    self.fetch_nonce()
    self.fetch_account()
    self.create_order()
    self.process_authorizations()
    self.wait_order_ready()
    self.finalize_order()
    self.wait_order_valid()
    self.fetch_certificate()
    log('done')



if len(sys.argv) != 3 or (len(sys.argv) == 2 and sys.argv[1] == '--help'):
  log('Usage: ' + sys.argv[0] + ' /path/to/certificate/sign/request.csr /path/to/out/certificate.crt')
  quit()


csr_file = str(sys.argv[1])
out_cert_file = str(sys.argv[2])

log('hello')

log('load api client key from file: ' + api_client_key_file)
with open(api_client_key_file, 'r') as f:
  key = jwk.JWK( **json.loads(f.read()) )
log('  loaded fingerprint: ' + str(key.thumbprint()))


log('load CSF from file: ' + csr_file)
with open(csr_file, 'rb') as f:
  csr = x509.load_pem_x509_csr(f.read(), default_backend())

session = Session(api_url, key, csr, answers_prefix, out_cert_file)
session.run()