Blame main.py

7a50dc
#!/usr/bin/python3
7a50dc
7a50dc
import os
7a50dc
import time
7a50dc
import json
7a50dc
import base64
7a50dc
import requests
7a50dc
from cryptorgaphy import x509
7a50dc
from jwcrypto import jwk, jwt
7a50dc
7a50dc
7a50dc
api_client_key_file = './api-client-key.json'
7a50dc
api_url = 'https://acme-staging-v02.api.letsencrypt.org/directory'
7a50dc
answers_dir = '/tmp/'
7a50dc
7a50dc
7a50dc
def post_jws_request(key, kid, nonce, url, claims):
7a50dc
  header = {
7a50dc
    'alg': 'RS256',
7a50dc
    'nonce': nonce,
7a50dc
    'url': url }
7a50dc
  if kid is None:
7a50dc
    header['jwk'] = json.loads(self.key.export_public())
7a50dc
  else:
7a50dc
    header['kid'] = kid
7a50dc
  token = jwt.JWT(header, claims)
7a50dc
  token.make_signed_token(self.key)
7a50dc
  headers = { 'Content-Type': 'application/jose+json' }
7a50dc
  data = token.serialize(False)
7a50dc
  return requests.post(url, headers = headers, data = data)
7a50dc
7a50dc
7a50dc
def is_valid_token(token):
7a50dc
  
7a50dc
7a50dc
7a50dc
class Session:
7a50dc
  def __init__(self, url, key, answers_dir, csr):
7a50dc
    self.csr = csr
7a50dc
    self.key = key
7a50dc
    self.answers_dir = answers_dir
7a50dc
    self.url = url
7a50dc
    self.url_newnonce = None
7a50dc
    self.url_newaccount = None
7a50dc
    self.url_neworder = None
7a50dc
    self.kid = None
7a50dc
    self.nonce = None
7a50dc
7a50dc
7a50dc
  def get_url_newnonce(self):
7a50dc
    if self.url_newnonce is None:
7a50dc
      self.fetch_directory()
7a50dc
    return self.url_newnonce
7a50dc
  
7a50dc
  def get_url_newaccount(self):
7a50dc
    if self.url_newaccount is None:
7a50dc
      self.fetch_directory()
7a50dc
    return self.url_newaccount
7a50dc
  
7a50dc
  def get_url_neworder(self):
7a50dc
    if self.url_neworder is None:
7a50dc
      self.fetch_directory()
7a50dc
    return self.url_neworder
7a50dc
7a50dc
  def get_nonce(self):
7a50dc
    if self.nonce is None:
7a50dc
      self.fetch_nonce()
7a50dc
    return self.nonce
7a50dc
7a50dc
  def get_kid(self):
7a50dc
    if self.kid is None:
7a50dc
      self.fetch_account()
7a50dc
    return self.kid
7a50dc
7a50dc
7a50dc
  def fetch_directory(self):
7a50dc
    url = self.url
7a50dc
    print('fetch directory')
7a50dc
    print('  get ' + url)
7a50dc
    r = requests.get(url)
7a50dc
    if r.status_code != 200:
7a50dc
      raise r
7a50dc
    json = r.json()
7a50dc
    self.url_newnonce = str(json['newNonce'])
7a50dc
    self.url_newaccount = str(json['newAccount'])
7a50dc
    self.url_neworder = str(json['newOrder'])
7a50dc
    print('  url for new nonce: ' + self.url_newnonce)
7a50dc
    print('  url for new account: ' + self.url_newaccount)
7a50dc
    print('  url for new order: ' + self.url_neworder)
7a50dc
  
7a50dc
  def fetch_nonce(self):
7a50dc
    url = self.get_url_newnonce()
7a50dc
    print('fetch nonce')
7a50dc
    print('  head ' + url)
7a50dc
    r = requests.head(url)
7a50dc
    if r.status_code != 200:
7a50dc
      raise r
7a50dc
    self.nonce = str(r.headers['Replay-Nonce'])
7a50dc
    print('  nonce: ' + self.nonce)
7a50dc
  
7a50dc
  def fetch_account(self):
7a50dc
    url = self.get_url_newaccount()
7a50dc
    key = self.key
7a50dc
    nonce = self.get_nonce()
7a50dc
    print('fetch account')
7a50dc
    print('  post jws ' + url)
7a50dc
    claims = { 'termsOfServiceAgreed': True }
7a50dc
    r = post_jws_request(key, None, nonce, url, claims)
7a50dc
    if (r.status_code != 200) and (r.status_code != 201):
7a50dc
      raise r
7a50dc
    self.nonce = str(r.headers['Replay-Nonce'])
7a50dc
    self.kid = str(r.headers['Location'])
7a50dc
    print('  nonce: ' + self.nonce)
7a50dc
    print('  kid: ' + self.kid)
7a50dc
  
7a50dc
  def process_order(self):
7a50dc
    url = self.get_url_neworder()
7a50dc
    key = self.key
7a50dc
    kid = self.get_kid()
7a50dc
    nonce = self.get_nonce()
7a50dc
    csr = self.csr
7a50dc
    answers_dir = self.answers_dir
7a50dc
    out_cert_file = self.out_cert_file
7a50dc
    print('process order')
7a50dc
    
7a50dc
    # step 1. create order
7a50dc
    
7a50dc
    print('create order')
7a50dc
    print('  post jws ' + url)
7a50dc
    identifiers = []
7a50dc
    for attribute in csr.subject:
7a50dc
      if attribute.oid == x509.NameOID.COMMON_NAME:
7a50dc
        identifiers.append({
7a50dc
            'type': 'dns',
7a50dc
            'value': str(attribute.value) })
7a50dc
        print('  csr name: ' + str(attribute.value))
7a50dc
      else:
7a50dc
        print('  WARNING: not supported attribute in csr: ' + str(attribute.rfc4514_string()))
7a50dc
      
7a50dc
    end = str(cert.not_valid_after.isoformat())
7a50dc
    
7a50dc
    claims = {
7a50dc
      'identifiers': identifiers,
7a50dc
      'notBefore': begin,
7a50dc
      'notAfter': end }
7a50dc
    r = post_jws_request(key, kid, nonce, url, claims)
7a50dc
    if r.status_code != 201:
7a50dc
      raise r
7a50dc
    
7a50dc
    nonce = str(r.headers['Replay-Nonce'])
7a50dc
    url_order = str(r.headers['Location'])
7a50dc
    print('  nonce: ' + self.nonce)
7a50dc
    print('  url for order: ' + url_order)
7a50dc
    
7a50dc
    json = r.json()
7a50dc
    url_authz = []
7a50dc
    for x in json['authorizations']
7a50dc
      url_authorizations.append(str(x))
7a50dc
      print('  url for authorization: ' + str(x))
7a50dc
    url_finalize = str(json['finalize'])
7a50dc
    url_cert = str(json['certificate'])
7a50dc
    print('  url for finalize: ' + url_finalize)
7a50dc
    print('  url for cert: ' + url_cert)
7a50dc
7a50dc
    # step 2. process authorizations
7a50dc
7a50dc
    for url in url_authz:
7a50dc
      # step 2.1. get challenge
7a50dc
      
7a50dc
      print('process authorization')
7a50dc
      print('  post jws ' + url)
7a50dc
      r = post_jws_request(key, kid, nonce, url, '')
7a50dc
      if r.status_code != 200:
7a50dc
        raise r
7a50dc
      json = r.json()
7a50dc
      
7a50dc
      print('  identifier type: ' + str(json['identifier']['type']))
7a50dc
      print('  identifier value: ' + str(json['identifier']['value']))
7a50dc
      
7a50dc
      url_chall = None
7a50dc
      token = None
7a50dc
      for x in json['challenges']
7a50dc
        challenge = str(x['type'])
7a50dc
        if challenge == 'http-01':
7a50dc
          print('  suitable challenge: ' + challenge)
7a50dc
          url_chall = str(x['url'])
7a50dc
          token = str(x['token'])
7a50dc
          break
7a50dc
      assert(not url_chall is None)
7a50dc
      assert(not token is None)
7a50dc
      
7a50dc
      print('  url for challenge: ' + url_chall)
7a50dc
      print('  url for challenge: ' + url_chall)
7a50dc
      print('  challenge token: ' + token)
7a50dc
7a50dc
      print('  verify token')
7a50dc
      if len(token) > 256:
7a50dc
          raise Exception('token too long, maximum length is 256')
7a50dc
      valid_chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
7a50dc
      for char in token:
7a50dc
        if not char in valid_chars:
7a50dc
          raise Exception('wrong token, allowed following chars only: ' + valid_chars)
7a50dc
      
7a50dc
      answer = token + '.' + str(key.thumbprint(jwk.hashes.SHA256()))
7a50dc
      print('  challenge answer: ' + answer)
7a50dc
      filename = answers_dir + token
7a50dc
      print('  write answer to file: ' + filename)
7a50dc
      with open(filename, 'w') as f:
7a50dc
        f.write( answer )
7a50dc
      
7a50dc
      # step 2.2. notify that challenge is ready
7a50dc
7a50dc
      print('notify that challenge is ready')
7a50dc
      print('  post jws ' + url_chall)
7a50dc
      r = post_jws_request(key, kid, nonce, url_chall, dict())
7a50dc
      if r.status_code != 200:
7a50dc
        raise r
7a50dc
      
7a50dc
      # step 2.3. wait validation
7a50dc
      
7a50dc
      for i in range(0, 10):
7a50dc
        print('wait 5 seconds')
7a50dc
        time.sleep(5)
7a50dc
        print('check authorization')
7a50dc
        print('  post jws ' + url)
7a50dc
        r = post_jws_request(key, kid, nonce, url, '')
7a50dc
        if r.status_code != 200:
7a50dc
          raise r
7a50dc
        json = r.json()
7a50dc
        status = str(json['status'])
7a50dc
        print('  authorization status: ' + status)
7a50dc
        if status == 'valid':
7a50dc
          print('authorization success')
7a50dc
          break
7a50dc
        assert(status == 'pending')
7a50dc
        
7a50dc
    # step 3. wait ready status of order
7a50dc
    
7a50dc
    url = url_order
7a50dc
    for i in range(0, 10):
7a50dc
      print('wait 5 seconds')
7a50dc
      time.sleep(5)
7a50dc
      print('check order')
7a50dc
      print('  post jws ' + url)
7a50dc
      r = post_jws_request(key, kid, nonce, url, '')
7a50dc
      if r.status_code != 200:
7a50dc
        raise r
7a50dc
      json = r.json()
7a50dc
      status = str(json['status'])
7a50dc
      print('  order status: ' + status)
7a50dc
      if status == 'ready':
7a50dc
        print('all authorizations success')
7a50dc
        break
7a50dc
      assert(status == 'pending')
7a50dc
        
7a50dc
    # step 4. send sertificate signing request
7a50dc
    
7a50dc
    print('finalize order (send CSR)')
7a50dc
    print('  post jws ' + url_finalize)
7a50dc
    csr_data = base64.urlsafe_b64encode( csr.public_bytes(Encoding.DER) ).decode().replace('=', '')
7a50dc
    print('  csr adta ' + csr_data)
7a50dc
    claims = { 'csr': csr_data }
7a50dc
    r = post_jws_request(key, kid, nonce, url_finalize, claims)
7a50dc
    if r.status_code != 200:
7a50dc
      raise r
7a50dc
    
7a50dc
    # step 5. wait valid status of order
7a50dc
    
7a50dc
    for i in range(0, 10):
7a50dc
      print('wait 5 seconds')
7a50dc
      time.sleep(5)
7a50dc
      print('check order')
7a50dc
      print('  post jws ' + url)
7a50dc
      r = post_jws_request(key, kid, nonce, url, '')
7a50dc
      if r.status_code != 200:
7a50dc
        raise r
7a50dc
      json = r.json()
7a50dc
      status = str(json['status'])
7a50dc
      print('  order status: ' + status)
7a50dc
      if status == 'valid':
7a50dc
        print('order success')
7a50dc
        break
7a50dc
      assert(status == 'processing')
7a50dc
    
7a50dc
    # step 5. download certificate
7a50dc
    
7a50dc
    print('download certificate')
7a50dc
    print('  post jws ' + url_cert)
7a50dc
    r = post_jws_request(key, kid, nonce, url_finalize, '')
7a50dc
    if r.status_code != 200:
7a50dc
      raise r
7a50dc
    cert = r.text
7a50dc
    
7a50dc
    print('downloaded certificate:')
7a50dc
    print(cert)
7a50dc
    print('write certificate to file: ' + out_cert_file)
7a50dc
    with open(out_cert_file, 'w') as f:
7a50dc
      f.write(cert)
7a50dc
    
7a50dc
    print('done')
7a50dc
    
7a50dc
7a50dc
print('hello')
7a50dc
7a50dc
7a50dc
if not os.path.isfile(api_client_key_file):
7a50dc
  print('api client key file not found: ' + api_client_key_file)
7a50dc
  print('generate')
7a50dc
  key = jwk.JWK(generate = 'RSA', size = 4096)
7a50dc
  print('generated key id: ' + str(key.key_id))
7a50dc
  print('generated fingerprint: ' + str(key.thumbprint()))
7a50dc
  print('save generated kay to file: ' + api_client_key_file)
7a50dc
  with open(api_client_key_file, 'w') as f:
7a50dc
    f.write( key.export() )
7a50dc
7a50dc
print('load api client key from file: ' + api_client_key_file)
7a50dc
with open(api_client_key_file, 'r') as f:
7a50dc
  key = jwk.JWK( **json.loads(f.read()) )
7a50dc
print('loaded key id: ' + str(key.key_id))
7a50dc
print('loaded fingerprint: ' + str(key.thumbprint()))
7a50dc
7a50dc
session = Session(key, api_url)
7a50dc
session.create_account()
7a50dc
7a50dc