Blob Blame Raw
#!/usr/bin/python3

import os
import time
import json
import base64
import requests
from cryptorgaphy import x509
from jwcrypto import jwk, jwt


api_client_key_file = './api-client-key.json'
api_url = 'https://acme-staging-v02.api.letsencrypt.org/directory'
answers_dir = '/tmp/'


def post_jws_request(key, kid, nonce, url, claims):
  header = {
    'alg': 'RS256',
    'nonce': nonce,
    'url': url }
  if kid is None:
    header['jwk'] = json.loads(self.key.export_public())
  else:
    header['kid'] = kid
  token = jwt.JWT(header, claims)
  token.make_signed_token(self.key)
  headers = { 'Content-Type': 'application/jose+json' }
  data = token.serialize(False)
  return requests.post(url, headers = headers, data = data)


def is_valid_token(token):
  


class Session:
  def __init__(self, url, key, answers_dir, csr):
    self.csr = csr
    self.key = key
    self.answers_dir = answers_dir
    self.url = url
    self.url_newnonce = None
    self.url_newaccount = None
    self.url_neworder = None
    self.kid = None
    self.nonce = None


  def get_url_newnonce(self):
    if self.url_newnonce is None:
      self.fetch_directory()
    return self.url_newnonce
  
  def get_url_newaccount(self):
    if self.url_newaccount is None:
      self.fetch_directory()
    return self.url_newaccount
  
  def get_url_neworder(self):
    if self.url_neworder is None:
      self.fetch_directory()
    return self.url_neworder

  def get_nonce(self):
    if self.nonce is None:
      self.fetch_nonce()
    return self.nonce

  def get_kid(self):
    if self.kid is None:
      self.fetch_account()
    return self.kid


  def fetch_directory(self):
    url = self.url
    print('fetch directory')
    print('  get ' + url)
    r = requests.get(url)
    if r.status_code != 200:
      raise r
    json = r.json()
    self.url_newnonce = str(json['newNonce'])
    self.url_newaccount = str(json['newAccount'])
    self.url_neworder = str(json['newOrder'])
    print('  url for new nonce: ' + self.url_newnonce)
    print('  url for new account: ' + self.url_newaccount)
    print('  url for new order: ' + self.url_neworder)
  
  def fetch_nonce(self):
    url = self.get_url_newnonce()
    print('fetch nonce')
    print('  head ' + url)
    r = requests.head(url)
    if r.status_code != 200:
      raise r
    self.nonce = str(r.headers['Replay-Nonce'])
    print('  nonce: ' + self.nonce)
  
  def fetch_account(self):
    url = self.get_url_newaccount()
    key = self.key
    nonce = self.get_nonce()
    print('fetch account')
    print('  post jws ' + url)
    claims = { 'termsOfServiceAgreed': True }
    r = post_jws_request(key, None, nonce, url, claims)
    if (r.status_code != 200) and (r.status_code != 201):
      raise r
    self.nonce = str(r.headers['Replay-Nonce'])
    self.kid = str(r.headers['Location'])
    print('  nonce: ' + self.nonce)
    print('  kid: ' + self.kid)
  
  def process_order(self):
    url = self.get_url_neworder()
    key = self.key
    kid = self.get_kid()
    nonce = self.get_nonce()
    csr = self.csr
    answers_dir = self.answers_dir
    out_cert_file = self.out_cert_file
    print('process order')
    
    # step 1. create order
    
    print('create order')
    print('  post jws ' + url)
    identifiers = []
    for attribute in csr.subject:
      if attribute.oid == x509.NameOID.COMMON_NAME:
        identifiers.append({
            'type': 'dns',
            'value': str(attribute.value) })
        print('  csr name: ' + str(attribute.value))
      else:
        print('  WARNING: not supported attribute in csr: ' + str(attribute.rfc4514_string()))
      
    end = str(cert.not_valid_after.isoformat())
    
    claims = {
      'identifiers': identifiers,
      'notBefore': begin,
      'notAfter': end }
    r = post_jws_request(key, kid, nonce, url, claims)
    if r.status_code != 201:
      raise r
    
    nonce = str(r.headers['Replay-Nonce'])
    url_order = str(r.headers['Location'])
    print('  nonce: ' + self.nonce)
    print('  url for order: ' + url_order)
    
    json = r.json()
    url_authz = []
    for x in json['authorizations']
      url_authorizations.append(str(x))
      print('  url for authorization: ' + str(x))
    url_finalize = str(json['finalize'])
    url_cert = str(json['certificate'])
    print('  url for finalize: ' + url_finalize)
    print('  url for cert: ' + url_cert)

    # step 2. process authorizations

    for url in url_authz:
      # step 2.1. get challenge
      
      print('process authorization')
      print('  post jws ' + url)
      r = post_jws_request(key, kid, nonce, url, '')
      if r.status_code != 200:
        raise r
      json = r.json()
      
      print('  identifier type: ' + str(json['identifier']['type']))
      print('  identifier value: ' + str(json['identifier']['value']))
      
      url_chall = None
      token = None
      for x in json['challenges']
        challenge = str(x['type'])
        if challenge == 'http-01':
          print('  suitable challenge: ' + challenge)
          url_chall = str(x['url'])
          token = str(x['token'])
          break
      assert(not url_chall is None)
      assert(not token is None)
      
      print('  url for challenge: ' + url_chall)
      print('  url for challenge: ' + url_chall)
      print('  challenge token: ' + token)

      print('  verify token')
      if len(token) > 256:
          raise Exception('token too long, maximum length is 256')
      valid_chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
      for char in token:
        if not char in valid_chars:
          raise Exception('wrong token, allowed following chars only: ' + valid_chars)
      
      answer = token + '.' + str(key.thumbprint(jwk.hashes.SHA256()))
      print('  challenge answer: ' + answer)
      filename = answers_dir + token
      print('  write answer to file: ' + filename)
      with open(filename, 'w') as f:
        f.write( answer )
      
      # step 2.2. notify that challenge is ready

      print('notify that challenge is ready')
      print('  post jws ' + url_chall)
      r = post_jws_request(key, kid, nonce, url_chall, dict())
      if r.status_code != 200:
        raise r
      
      # step 2.3. wait validation
      
      for i in range(0, 10):
        print('wait 5 seconds')
        time.sleep(5)
        print('check authorization')
        print('  post jws ' + url)
        r = post_jws_request(key, kid, nonce, url, '')
        if r.status_code != 200:
          raise r
        json = r.json()
        status = str(json['status'])
        print('  authorization status: ' + status)
        if status == 'valid':
          print('authorization success')
          break
        assert(status == 'pending')
        
    # step 3. wait ready status of order
    
    url = url_order
    for i in range(0, 10):
      print('wait 5 seconds')
      time.sleep(5)
      print('check order')
      print('  post jws ' + url)
      r = post_jws_request(key, kid, nonce, url, '')
      if r.status_code != 200:
        raise r
      json = r.json()
      status = str(json['status'])
      print('  order status: ' + status)
      if status == 'ready':
        print('all authorizations success')
        break
      assert(status == 'pending')
        
    # step 4. send sertificate signing request
    
    print('finalize order (send CSR)')
    print('  post jws ' + url_finalize)
    csr_data = base64.urlsafe_b64encode( csr.public_bytes(Encoding.DER) ).decode().replace('=', '')
    print('  csr adta ' + csr_data)
    claims = { 'csr': csr_data }
    r = post_jws_request(key, kid, nonce, url_finalize, claims)
    if r.status_code != 200:
      raise r
    
    # step 5. wait valid status of order
    
    for i in range(0, 10):
      print('wait 5 seconds')
      time.sleep(5)
      print('check order')
      print('  post jws ' + url)
      r = post_jws_request(key, kid, nonce, url, '')
      if r.status_code != 200:
        raise r
      json = r.json()
      status = str(json['status'])
      print('  order status: ' + status)
      if status == 'valid':
        print('order success')
        break
      assert(status == 'processing')
    
    # step 5. download certificate
    
    print('download certificate')
    print('  post jws ' + url_cert)
    r = post_jws_request(key, kid, nonce, url_finalize, '')
    if r.status_code != 200:
      raise r
    cert = r.text
    
    print('downloaded certificate:')
    print(cert)
    print('write certificate to file: ' + out_cert_file)
    with open(out_cert_file, 'w') as f:
      f.write(cert)
    
    print('done')
    

print('hello')


if not os.path.isfile(api_client_key_file):
  print('api client key file not found: ' + api_client_key_file)
  print('generate')
  key = jwk.JWK(generate = 'RSA', size = 4096)
  print('generated key id: ' + str(key.key_id))
  print('generated fingerprint: ' + str(key.thumbprint()))
  print('save generated kay to file: ' + api_client_key_file)
  with open(api_client_key_file, 'w') as f:
    f.write( key.export() )

print('load api client key from file: ' + api_client_key_file)
with open(api_client_key_file, 'r') as f:
  key = jwk.JWK( **json.loads(f.read()) )
print('loaded key id: ' + str(key.key_id))
print('loaded fingerprint: ' + str(key.thumbprint()))

session = Session(key, api_url)
session.create_account()