Blame sign-cert.py

7a50dc
#!/usr/bin/python3
7a50dc
cc2683
import sys
7a50dc
import time
7a50dc
import json
7a50dc
import requests
cc2683
from cryptography import x509
b6b0ba
from cryptography.hazmat.backends import default_backend
7bd19b
from cryptography.hazmat.primitives.serialization import Encoding
cc2683
from jwcrypto import jwk, jws
7bd19b
from jwcrypto.common import base64url_encode
7a50dc
7a50dc
b6b0ba
api_url             = 'https://acme-staging-v02.api.letsencrypt.org/directory'
356442
#api_url             = 'https://acme-v02.api.letsencrypt.org/directory'
cc2683
api_client_key_file = '/home/bw/work/dev/acmeclient-data/api-client-key.json'
cc2683
answers_prefix      = '/var/www/html/.well-known/acme-challenge/'
7a50dc
7a50dc
7a50dc
class Session:
b6b0ba
  def __init__(self, url, key, csr, answers_prefix, out_cert_file):
b6b0ba
    self.url = str(url)
7a50dc
    self.key = key
b6b0ba
    self.csr = csr
b6b0ba
    self.answers_prefix = str(answers_prefix)
b6b0ba
    self.out_cert_file = str(out_cert_file)
7a50dc
7a50dc
7bd19b
  # common method uses in many steps to send unsigned requests
cc2683
  # returns: requests.Response
7bd19b
  def get_request(self, url):
7bd19b
    print('  get ' + url)
7bd19b
    r = requests.get(url)
7bd19b
    if r.status_code != 200:
7bd19b
      print('response status code: ' + str(r.status_code))
cc2683
      print('response headers')
cc2683
      print(r.headers)
cc2683
      print('response body')
cc2683
      print(r.text)
cc2683
      raise Exception('unexpected server answer')
cc2683
    return r
cc2683
cc2683
7bd19b
  # common method uses in many steps to send signed requests
cc2683
  # in: self.nonce, self.key, self.kid
cc2683
  # out: self.nonce
cc2683
  # returns: requests.Response
7bd19b
  def post_signed_request(self, url, claims):
cc2683
    print('  post signed ' + url)
cc2683
    
cc2683
    header = {
cc2683
      'alg': 'RS256',
cc2683
      'nonce': self.nonce,
cc2683
      'url': url }
cc2683
    if self.kid is None:
cc2683
      header['jwk'] = json.loads(self.key.export_public())
cc2683
    else:
cc2683
      header['kid'] = self.kid
cc2683
    
cc2683
    signer = jws.JWS(json.dumps(claims))
cc2683
    signer.add_signature(self.key, protected = json.dumps(header))
cc2683
    data = signer.serialize()
cc2683
    
cc2683
    headers = { 'Content-Type': 'application/jose+json' }
cc2683
    r = requests.post(url, headers = headers, data = data)
7bd19b
    if r.status_code != 200 and r.status_code != 201:
7bd19b
      print('response status code ' + str(r.status_code))
cc2683
      print('response headers')
cc2683
      print(r.headers)
cc2683
      print('response body')
cc2683
      print(r.text)
cc2683
      raise Exception('unexpected server answer')
cc2683
    self.nonce = str(r.headers['Replay-Nonce'])
cc2683
    print('  nonce: ' + self.nonce)
cc2683
    return r
cc2683
cc2683
b6b0ba
  # step 1
7bd19b
  # uses: get_request
b6b0ba
  # in: self.url
b6b0ba
  # out: self.url_newnonce, self.url_newaccount, self.url_neworder
7a50dc
  def fetch_directory(self):
7a50dc
    print('fetch directory')
7bd19b
    r = self.get_request(self.url)
7a50dc
    json = r.json()
7a50dc
    self.url_newnonce = str(json['newNonce'])
7a50dc
    self.url_newaccount = str(json['newAccount'])
7a50dc
    self.url_neworder = str(json['newOrder'])
7a50dc
    print('  url for new nonce: ' + self.url_newnonce)
7a50dc
    print('  url for new account: ' + self.url_newaccount)
7a50dc
    print('  url for new order: ' + self.url_neworder)
7a50dc
  
b6b0ba
  
b6b0ba
  # step 2
b6b0ba
  # in: self.url_newnonce
b6b0ba
  # out: self.nonce
7a50dc
  def fetch_nonce(self):
7a50dc
    print('fetch nonce')
b6b0ba
    print('  head ' + self.url_newnonce)
b6b0ba
    r = requests.head(self.url_newnonce)
7a50dc
    if r.status_code != 200:
cc2683
      print('response status code: ' + r.status_code)
cc2683
      print('response headers')
cc2683
      print(r.headers)
cc2683
      print('response body')
cc2683
      print(r.text)
cc2683
      raise Exception('unexpected server answer')
7a50dc
    self.nonce = str(r.headers['Replay-Nonce'])
7a50dc
    print('  nonce: ' + self.nonce)
7a50dc
  
b6b0ba
  
b6b0ba
  # step 3
7bd19b
  # uses: post_signed_request (also see 'in' and 'out' there)
b6b0ba
  # in: self.url_newaccount
b6b0ba
  # out: self.kid
b6b0ba
  def fetch_account(self):
b6b0ba
    print('fetch account')
b6b0ba
    self.kid = None
b6b0ba
    claims = { 'termsOfServiceAgreed': True }
7bd19b
    r = self.post_signed_request(self.url_newaccount, claims)
b6b0ba
    self.kid = str(r.headers['Location'])
7a50dc
    print('  kid: ' + self.kid)
b6b0ba
7a50dc
  
b6b0ba
  # step 4
7bd19b
  # uses: post_signed_request (also see 'in' and 'out' there)
7bd19b
  # in: self.csr, self.url_neworder
cc2683
  # out: self.url_order, self.url_authorizations, self.url_finalize
b6b0ba
  def create_order(self):
7a50dc
    print('create order')
7bd19b
7bd19b
    common_name = str( self.csr.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value )
7bd19b
    print('  csr common name: ' + common_name)
b6b0ba
    
7bd19b
    names = { common_name }
7bd19b
    try:
7bd19b
      extension = self.csr.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
7bd19b
      for x in extension.value.get_values_for_type(x509.DNSName):
7bd19b
        names.add(str(x))
7bd19b
    except x509.ExtensionNotFound:
7bd19b
        pass
7a50dc
    
7bd19b
    identifiers = []
7bd19b
    for x in names:
7bd19b
      identifiers.append({
7bd19b
        'type': 'dns',
7bd19b
        'value': x })
7bd19b
      print('  csr name: ' + x)
b6b0ba
    claims = { 'identifiers': identifiers }
7bd19b
    
7bd19b
    r = self.post_signed_request(self.url_neworder, claims)
b6b0ba
b6b0ba
    self.url_order = str(r.headers['Location'])
7a50dc
    
7a50dc
    json = r.json()
b6b0ba
    self.url_authorizations = []
cc2683
    for x in json['authorizations']:
b6b0ba
      self.url_authorizations.append(str(x))
7a50dc
      print('  url for authorization: ' + str(x))
b6b0ba
    self.url_finalize = str(json['finalize'])
b6b0ba
    print('  url for finalize: ' + self.url_finalize)
b6b0ba
  
b6b0ba
b6b0ba
  # step 5.1
7bd19b
  # uses: get_request
b6b0ba
  # returns: url_chall (str), token (str)
b6b0ba
  def fetch_authorization(self, url_authorization):
b6b0ba
    print('fetch authorization')
7bd19b
    r = self.get_request(url_authorization)
b6b0ba
    json = r.json()
b6b0ba
    print('  identifier type: ' + str(json['identifier']['type']))
b6b0ba
    print('  identifier value: ' + str(json['identifier']['value']))
b6b0ba
    
b6b0ba
    url_chall = None
b6b0ba
    token = None
cc2683
    for x in json['challenges']:
b6b0ba
      challenge = str(x['type'])
b6b0ba
      if challenge == 'http-01':
b6b0ba
        print('  suitable challenge: ' + challenge)
b6b0ba
        url_chall = str(x['url'])
b6b0ba
        token = str(x['token'])
b6b0ba
        break
b6b0ba
    assert(not url_chall is None)
b6b0ba
    assert(not token is None)
7a50dc
    
b6b0ba
    print('  url for challenge: ' + url_chall)
b6b0ba
    print('  url for challenge: ' + url_chall)
b6b0ba
    print('  challenge token: ' + token)
b6b0ba
b6b0ba
    print('  verify token')
b6b0ba
    if len(token) > 256:
b6b0ba
        raise Exception('token too long, maximum length is 256')
b6b0ba
    valid_chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
b6b0ba
    for char in token:
b6b0ba
      if not char in valid_chars:
b6b0ba
        raise Exception('wrong token, allowed following chars only: ' + valid_chars)
b6b0ba
    
b6b0ba
    return url_chall, token
b6b0ba
b6b0ba
b6b0ba
  # step 5.2
b6b0ba
  # in: self.answers_prefix
b6b0ba
  def prepare_challenge_answer(self, token):
b6b0ba
    print('prepare challenge answer')
b6b0ba
    answer = token + '.' + str(key.thumbprint(jwk.hashes.SHA256()))
b6b0ba
    print('  answer: ' + answer)
b6b0ba
    filename = answers_prefix + token
b6b0ba
    print('  write answer to file: ' + filename)
b6b0ba
    with open(filename, 'w') as f:
b6b0ba
      f.write( answer )
b6b0ba
b6b0ba
b6b0ba
  # step 5.3
7bd19b
  # uses: post_signed_request (also see 'in' and 'out' there)
b6b0ba
  def notify_challenge_ready(self, url_chall):
b6b0ba
    print('notify that challenge is ready')
7bd19b
    self.post_signed_request(url_chall, dict())
b6b0ba
b6b0ba
b6b0ba
  # step 5.4
7bd19b
  # uses: get_request
b6b0ba
  def wait_authorization(self, url_authorization):
b6b0ba
    for i in range(0, 10):
b6b0ba
      print('wait 5 seconds')
b6b0ba
      time.sleep(5)
b6b0ba
      print('check authorization')
7bd19b
      r = self.get_request(url_authorization)
b6b0ba
      json = r.json()
b6b0ba
      status = str(json['status'])
b6b0ba
      print('  authorization status: ' + status)
b6b0ba
      if status == 'valid':
b6b0ba
        print('authorization success')
b6b0ba
        return
b6b0ba
      assert(status == 'pending')
b6b0ba
    raise Exception('authorization was not happened')
b6b0ba
b6b0ba
b6b0ba
  # step 5 all
7bd19b
  # uses: get_request, post_signed_request (also see 'in' and 'out' there)
b6b0ba
  # in: self.url_authorizations
b6b0ba
  def process_authorizations(self):
b6b0ba
    print('process authorizations')
b6b0ba
    for url_authorization in self.url_authorizations:
b6b0ba
      url_chall, token = self.fetch_authorization(url_authorization)
b6b0ba
      self.prepare_challenge_answer(token)
b6b0ba
      self.notify_challenge_ready(url_chall)
b6b0ba
      self.wait_authorization(url_authorization)
b6b0ba
    print('all authorizations success')
b6b0ba
b6b0ba
b6b0ba
  # step 6
7bd19b
  # uses: get_request
b6b0ba
  # in: self.url_order
b6b0ba
  def wait_order_ready(self):
7a50dc
    for i in range(0, 10):
7a50dc
      print('wait 5 seconds')
7a50dc
      time.sleep(5)
b6b0ba
      print('check order status')
7bd19b
      r = self.get_request(self.url_order)
7a50dc
      json = r.json()
7a50dc
      status = str(json['status'])
7a50dc
      print('  order status: ' + status)
7a50dc
      if status == 'ready':
b6b0ba
        print('order is ready')
b6b0ba
        return
7a50dc
      assert(status == 'pending')
b6b0ba
    raise Exception('order was not became ready')
b6b0ba
b6b0ba
b6b0ba
  # step 7
7bd19b
  # uses: post_signed_request (also see 'in' and 'out' there)
b6b0ba
  # in: self.csr, self.url_finalize
b6b0ba
  def finalize_order(self):
7a50dc
    print('finalize order (send CSR)')
7bd19b
    csr_data = base64url_encode(self.csr.public_bytes(Encoding.DER))
7bd19b
    print('  csr data: ' + csr_data)
7a50dc
    claims = { 'csr': csr_data }
7bd19b
    self.post_signed_request(self.url_finalize, claims)
b6b0ba
  
b6b0ba
  
b6b0ba
  # step 8
7bd19b
  # uses: get_request
b6b0ba
  # in: self.url_order
cc2683
  # out: self.url_cert
b6b0ba
  def wait_order_valid(self):
7a50dc
    for i in range(0, 10):
7a50dc
      print('wait 5 seconds')
7a50dc
      time.sleep(5)
b6b0ba
      print('check order status')
7bd19b
      r = self.get_request(self.url_order)
7a50dc
      json = r.json()
7a50dc
      status = str(json['status'])
7a50dc
      print('  order status: ' + status)
7a50dc
      if status == 'valid':
7bd19b
        self.url_cert = str(json['certificate'])
7bd19b
        print('  url for cert: ' + self.url_cert)
7a50dc
        print('order success')
b6b0ba
        return
7a50dc
      assert(status == 'processing')
b6b0ba
    raise Exception('order was not became valid')
b6b0ba
b6b0ba
b6b0ba
  # step 9
7bd19b
  # uses: get_request
b6b0ba
  # in: self.url_cert
b6b0ba
  def fetch_certificate(self):
b6b0ba
    print('fetch certificate')
7bd19b
    r = self.get_request(self.url_cert)
7bd19b
    cert = r.text
b6b0ba
    
7bd19b
    if len(cert) > 10000000:
7bd19b
      raise Exception('certificate too long, max length is 10000000')
7bd19b
      
cc2683
    print('downloaded certificate:')
7bd19b
    print(cert)
7a50dc
    print('write certificate to file: ' + out_cert_file)
7a50dc
    with open(out_cert_file, 'w') as f:
7bd19b
      f.write(cert)
b6b0ba
b6b0ba
b6b0ba
  # all steps
b6b0ba
  def run(self):
b6b0ba
    print('run')
b6b0ba
    self.fetch_directory()
b6b0ba
    self.fetch_nonce()
b6b0ba
    self.fetch_account()
b6b0ba
    self.create_order()
b6b0ba
    self.process_authorizations()
b6b0ba
    self.wait_order_ready()
b6b0ba
    self.finalize_order()
b6b0ba
    self.wait_order_valid()
b6b0ba
    self.fetch_certificate()
7a50dc
    print('done')
7a50dc
7a50dc
7a50dc
cc2683
if len(sys.argv) != 3 or (len(sys.argv) == 2 and sys.argv[1] == '--help'):
cc2683
  print('Usage: ' + sys.argv[0] + ' /path/to/certificate/sign/request.csr /path/to/out/certificate.crt')
cc2683
  quit()
cc2683
cc2683
cc2683
csr_file = str(sys.argv[1])
cc2683
out_cert_file = str(sys.argv[2])
cc2683
cc2683
print('hello')
7a50dc
7a50dc
print('load api client key from file: ' + api_client_key_file)
7a50dc
with open(api_client_key_file, 'r') as f:
7a50dc
  key = jwk.JWK( **json.loads(f.read()) )
b6b0ba
print('  loaded fingerprint: ' + str(key.thumbprint()))
b6b0ba
7a50dc
b6b0ba
print('load CSF from file: ' + csr_file)
b6b0ba
with open(csr_file, 'rb') as f:
b6b0ba
  csr = x509.load_pem_x509_csr(f.read(), default_backend())
7a50dc
b6b0ba
session = Session(api_url, key, csr, answers_prefix, out_cert_file)
b6b0ba
session.run()
7a50dc