Blob Blame Raw
#!/usr/bin/python3

import sys
import time
import json
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from jwcrypto import jwk, jws
from jwcrypto.common import base64url_encode


api_url             = 'https://acme-staging-v02.api.letsencrypt.org/directory'
#api_url             = 'https://acme-v02.api.letsencrypt.org/directory'
api_client_key_file = '/home/bw/work/dev/acmeclient-data/api-client-key.json'
answers_prefix      = '/var/www/html/.well-known/acme-challenge/'


class Session:
  def __init__(self, url, key, csr, answers_prefix, out_cert_file):
    self.url = str(url)
    self.key = key
    self.csr = csr
    self.answers_prefix = str(answers_prefix)
    self.out_cert_file = str(out_cert_file)


  # common method uses in many steps to send unsigned requests
  # returns: requests.Response
  def get_request(self, url):
    print('  get ' + url)
    r = requests.get(url)
    if r.status_code != 200:
      print('response status code: ' + str(r.status_code))
      print('response headers')
      print(r.headers)
      print('response body')
      print(r.text)
      raise Exception('unexpected server answer')
    return r


  # common method uses in many steps to send signed requests
  # in: self.nonce, self.key, self.kid
  # out: self.nonce
  # returns: requests.Response
  def post_signed_request(self, url, claims):
    print('  post signed ' + url)
    
    header = {
      'alg': 'RS256',
      'nonce': self.nonce,
      'url': url }
    if self.kid is None:
      header['jwk'] = json.loads(self.key.export_public())
    else:
      header['kid'] = self.kid
    
    signer = jws.JWS(json.dumps(claims))
    signer.add_signature(self.key, protected = json.dumps(header))
    data = signer.serialize()
    
    headers = { 'Content-Type': 'application/jose+json' }
    r = requests.post(url, headers = headers, data = data)
    if r.status_code != 200 and r.status_code != 201:
      print('response status code ' + str(r.status_code))
      print('response headers')
      print(r.headers)
      print('response body')
      print(r.text)
      raise Exception('unexpected server answer')
    self.nonce = str(r.headers['Replay-Nonce'])
    print('  nonce: ' + self.nonce)
    return r


  # step 1
  # uses: get_request
  # in: self.url
  # out: self.url_newnonce, self.url_newaccount, self.url_neworder
  def fetch_directory(self):
    print('fetch directory')
    r = self.get_request(self.url)
    json = r.json()
    self.url_newnonce = str(json['newNonce'])
    self.url_newaccount = str(json['newAccount'])
    self.url_neworder = str(json['newOrder'])
    print('  url for new nonce: ' + self.url_newnonce)
    print('  url for new account: ' + self.url_newaccount)
    print('  url for new order: ' + self.url_neworder)
  
  
  # step 2
  # in: self.url_newnonce
  # out: self.nonce
  def fetch_nonce(self):
    print('fetch nonce')
    print('  head ' + self.url_newnonce)
    r = requests.head(self.url_newnonce)
    if r.status_code != 200:
      print('response status code: ' + r.status_code)
      print('response headers')
      print(r.headers)
      print('response body')
      print(r.text)
      raise Exception('unexpected server answer')
    self.nonce = str(r.headers['Replay-Nonce'])
    print('  nonce: ' + self.nonce)
  
  
  # step 3
  # uses: post_signed_request (also see 'in' and 'out' there)
  # in: self.url_newaccount
  # out: self.kid
  def fetch_account(self):
    print('fetch account')
    self.kid = None
    claims = { 'termsOfServiceAgreed': True }
    r = self.post_signed_request(self.url_newaccount, claims)
    self.kid = str(r.headers['Location'])
    print('  kid: ' + self.kid)

  
  # step 4
  # uses: post_signed_request (also see 'in' and 'out' there)
  # in: self.csr, self.url_neworder
  # out: self.url_order, self.url_authorizations, self.url_finalize
  def create_order(self):
    print('create order')

    common_name = str( self.csr.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value )
    print('  csr common name: ' + common_name)
    
    names = { common_name }
    try:
      extension = self.csr.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
      for x in extension.value.get_values_for_type(x509.DNSName):
        names.add(str(x))
    except x509.ExtensionNotFound:
        pass
    
    identifiers = []
    for x in names:
      identifiers.append({
        'type': 'dns',
        'value': x })
      print('  csr name: ' + x)
    claims = { 'identifiers': identifiers }
    
    r = self.post_signed_request(self.url_neworder, claims)

    self.url_order = str(r.headers['Location'])
    
    json = r.json()
    self.url_authorizations = []
    for x in json['authorizations']:
      self.url_authorizations.append(str(x))
      print('  url for authorization: ' + str(x))
    self.url_finalize = str(json['finalize'])
    print('  url for finalize: ' + self.url_finalize)
  

  # step 5.1
  # uses: get_request
  # returns: url_chall (str), token (str)
  def fetch_authorization(self, url_authorization):
    print('fetch authorization')
    r = self.get_request(url_authorization)
    json = r.json()
    print('  identifier type: ' + str(json['identifier']['type']))
    print('  identifier value: ' + str(json['identifier']['value']))
    
    url_chall = None
    token = None
    for x in json['challenges']:
      challenge = str(x['type'])
      if challenge == 'http-01':
        print('  suitable challenge: ' + challenge)
        url_chall = str(x['url'])
        token = str(x['token'])
        break
    assert(not url_chall is None)
    assert(not token is None)
    
    print('  url for challenge: ' + url_chall)
    print('  url for challenge: ' + url_chall)
    print('  challenge token: ' + token)

    print('  verify token')
    if len(token) > 256:
        raise Exception('token too long, maximum length is 256')
    valid_chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
    for char in token:
      if not char in valid_chars:
        raise Exception('wrong token, allowed following chars only: ' + valid_chars)
    
    return url_chall, token


  # step 5.2
  # in: self.answers_prefix
  def prepare_challenge_answer(self, token):
    print('prepare challenge answer')
    answer = token + '.' + str(key.thumbprint(jwk.hashes.SHA256()))
    print('  answer: ' + answer)
    filename = answers_prefix + token
    print('  write answer to file: ' + filename)
    with open(filename, 'w') as f:
      f.write( answer )


  # step 5.3
  # uses: post_signed_request (also see 'in' and 'out' there)
  def notify_challenge_ready(self, url_chall):
    print('notify that challenge is ready')
    self.post_signed_request(url_chall, dict())


  # step 5.4
  # uses: get_request
  def wait_authorization(self, url_authorization):
    for i in range(0, 10):
      print('wait 5 seconds')
      time.sleep(5)
      print('check authorization')
      r = self.get_request(url_authorization)
      json = r.json()
      status = str(json['status'])
      print('  authorization status: ' + status)
      if status == 'valid':
        print('authorization success')
        return
      assert(status == 'pending')
    raise Exception('authorization was not happened')


  # step 5 all
  # uses: get_request, post_signed_request (also see 'in' and 'out' there)
  # in: self.url_authorizations
  def process_authorizations(self):
    print('process authorizations')
    for url_authorization in self.url_authorizations:
      url_chall, token = self.fetch_authorization(url_authorization)
      self.prepare_challenge_answer(token)
      self.notify_challenge_ready(url_chall)
      self.wait_authorization(url_authorization)
    print('all authorizations success')


  # step 6
  # uses: get_request
  # in: self.url_order
  def wait_order_ready(self):
    for i in range(0, 10):
      print('wait 5 seconds')
      time.sleep(5)
      print('check order status')
      r = self.get_request(self.url_order)
      json = r.json()
      status = str(json['status'])
      print('  order status: ' + status)
      if status == 'ready':
        print('order is ready')
        return
      assert(status == 'pending')
    raise Exception('order was not became ready')


  # step 7
  # uses: post_signed_request (also see 'in' and 'out' there)
  # in: self.csr, self.url_finalize
  def finalize_order(self):
    print('finalize order (send CSR)')
    csr_data = base64url_encode(self.csr.public_bytes(Encoding.DER))
    print('  csr data: ' + csr_data)
    claims = { 'csr': csr_data }
    self.post_signed_request(self.url_finalize, claims)
  
  
  # step 8
  # uses: get_request
  # in: self.url_order
  # out: self.url_cert
  def wait_order_valid(self):
    for i in range(0, 10):
      print('wait 5 seconds')
      time.sleep(5)
      print('check order status')
      r = self.get_request(self.url_order)
      json = r.json()
      status = str(json['status'])
      print('  order status: ' + status)
      if status == 'valid':
        self.url_cert = str(json['certificate'])
        print('  url for cert: ' + self.url_cert)
        print('order success')
        return
      assert(status == 'processing')
    raise Exception('order was not became valid')


  # step 9
  # uses: get_request
  # in: self.url_cert
  def fetch_certificate(self):
    print('fetch certificate')
    r = self.get_request(self.url_cert)
    cert = r.text
    
    if len(cert) > 10000000:
      raise Exception('certificate too long, max length is 10000000')
      
    print('downloaded certificate:')
    print(cert)
    print('write certificate to file: ' + out_cert_file)
    with open(out_cert_file, 'w') as f:
      f.write(cert)


  # all steps
  def run(self):
    print('run')
    self.fetch_directory()
    self.fetch_nonce()
    self.fetch_account()
    self.create_order()
    self.process_authorizations()
    self.wait_order_ready()
    self.finalize_order()
    self.wait_order_valid()
    self.fetch_certificate()
    print('done')



if len(sys.argv) != 3 or (len(sys.argv) == 2 and sys.argv[1] == '--help'):
  print('Usage: ' + sys.argv[0] + ' /path/to/certificate/sign/request.csr /path/to/out/certificate.crt')
  quit()


csr_file = str(sys.argv[1])
out_cert_file = str(sys.argv[2])

print('hello')

print('load api client key from file: ' + api_client_key_file)
with open(api_client_key_file, 'r') as f:
  key = jwk.JWK( **json.loads(f.read()) )
print('  loaded fingerprint: ' + str(key.thumbprint()))


print('load CSF from file: ' + csr_file)
with open(csr_file, 'rb') as f:
  csr = x509.load_pem_x509_csr(f.read(), default_backend())

session = Session(api_url, key, csr, answers_prefix, out_cert_file)
session.run()