len1 = bit_size // 8
len2 = bit_size // 16
prime1_offset = len1
prime2_offset = prime1_offset + len2
exponent1_offset = prime2_offset + len2
exponent2_offset = exponent1_offset + len2
coefficient_offset = exponent2_offset + len2
private_exponent_offset = coefficient_offset + len2
public_exponent = blob_struct.rsapubkey.pubexp
modulus = int_from_bytes(blob[0:prime1_offset][::-1])
prime1 = int_from_bytes(blob[prime1_offset:prime2_offset][::-1])
prime2 = int_from_bytes(blob[prime2_offset:exponent1_offset][::-1])
exponent1 = int_from_bytes(blob[exponent1_offset:exponent2_offset][::-1])
exponent2 = int_from_bytes(blob[exponent2_offset:coefficient_offset][::-1])
coefficient = int_from_bytes(blob[coefficient_offset:private_exponent_offset][::-1])
private_exponent = int_from_bytes(blob[private_exponent_offset:private_exponent_offset + len1][::-1])
public_key_info = keys.PublicKeyInfo({
'algorithm': keys.PublicKeyAlgorithm({
'algorithm': 'rsa',
'public_key': keys.RSAPublicKey({
'modulus': modulus,
'public_exponent': public_exponent,
_oid_pair = ('id', 'value')
_oid_specs = {
'1.2.3': core.Integer,
'2.3.4': core.OctetString,
class CopySeq(core.Sequence):
_fields = [
('name', core.UTF8String),
('pair', Seq),
class NestSeqAny(core.Sequence):
_fields = [
('id', core.ObjectIdentifier),
('value', core.Any),
_oid_pair = ('id', 'value')
_oid_specs = {
'3.4.5': Seq,
class NestSeqExplicit(core.Sequence):
_fields = [
('id', core.ObjectIdentifier),
('value', NamedBits),
def read_headers(self):
Reads the HTTP response headers from the socket
On error, None, otherwise a 4-element tuple:
0: A 2-element tuple of integers representing the HTTP version
1: An integer representing the HTTP response code
2: A unicode string of the HTTP response code name
3: An OrderedDict of HTTP headers with lowercase unicode key and unicode values
version = None
code = None
text = None
headers = OrderedDict()
data = self.socket.read_until(b'\r\n\r\n')
string = data.decode('iso-8859-1')
first = False
for line in string.split('\r\n'):
line = line.strip()
if first is False:
if line == '':
match = re.match(r'^HTTP/(1\.[01]) +(\d+) +(.*)$', line)
if not match:
return None
version = tuple(map(int, match.group(1).split('.')))
code = int(match.group(2))
text = match.group(3)
first = True
for j in range(0,10):
r = requests.get("http://ca.example.lan/api/")
if r.status_code != 502:
assert r.status_code == 401, "Timed out starting up the API backend"
# TODO: check that port 8080 is listening, otherwise app probably crashed
# Test CA certificate fetch
r = requests.get("http://ca.example.lan/api/certificate")
assert r.status_code == 200
assert r.headers.get('content-type') == "application/x-x509-ca-cert"
header, _, certificate_der_bytes = pem.unarmor(r.text.encode("ascii"))
cert = x509.Certificate.load(certificate_der_bytes)
assert cert.subject.native.get("common_name") == "Certidude at ca.example.lan"
assert cert.subject.native.get("organizational_unit_name") == "Certificate Authority"
assert cert.serial_number >= 0x150000000000000000000000000000
assert cert.serial_number <= 0xfffffffffffffffffffffffffffffffffffffff
assert cert["tbs_certificate"]["validity"]["not_before"].native.replace(tzinfo=None) < datetime.utcnow()
assert cert["tbs_certificate"]["validity"]["not_after"].native.replace(tzinfo=None) > datetime.utcnow() + timedelta(days=7000)
assert cert["tbs_certificate"]["validity"]["not_before"].native.replace(tzinfo=None) < datetime.utcnow()
extensions = cert["tbs_certificate"]["extensions"].native
assert extensions[0] == OrderedDict([
('extn_id', 'basic_constraints'),
('critical', True),
('extn_value', OrderedDict([
('ca', True),
('path_len_constraint', None)]
def test_iri_with_port(self):
with open(os.path.join(fixtures_dir, 'admin.ch.crt'), 'rb') as f:
cert_bytes = f.read()
if pem.detect(cert_bytes):
_, _, cert_bytes = pem.unarmor(cert_bytes)
cert = x509.Certificate.load(cert_bytes)
[dp.native for dp in cert.crl_distribution_points],
('distribution_point', ['http://www.pki.admin.ch/crl/SSLCA01.crl']),
('reasons', None),
('crl_issuer', None)
'cn=Swiss Government SSL CA 01,'
def test_build_paths(self):
with open(os.path.join(fixtures_dir, 'mozilla.org.crt'), 'rb') as f:
cert_bytes = f.read()
if pem.detect(cert_bytes):
_, _, cert_bytes = pem.unarmor(cert_bytes)
cert = x509.Certificate.load(cert_bytes)
with open(os.path.join(fixtures_dir, 'digicert-sha2-secure-server-ca.crt'), 'rb') as f:
other_certs = [f.read()]
repo = CertificateRegistry(other_certs=other_certs)
paths = repo.build_paths(cert)
self.assertEqual(1, len(paths))
path = paths[0]
self.assertEqual(3, len(path))
def _load_cert(self, relative_path):
with open(os.path.join(fixtures_dir, relative_path), 'rb') as f:
cert_bytes = f.read()
if pem.detect(cert_bytes):
_, _, cert_bytes = pem.unarmor(cert_bytes)
return x509.Certificate.load(cert_bytes)
def test_build_paths_custom_ca_certs(self):
with open(os.path.join(fixtures_dir, 'mozilla.org.crt'), 'rb') as f:
cert_bytes = f.read()
if pem.detect(cert_bytes):
_, _, cert_bytes = pem.unarmor(cert_bytes)
cert = x509.Certificate.load(cert_bytes)
with open(os.path.join(fixtures_dir, 'digicert-sha2-secure-server-ca.crt'), 'rb') as f:
other_certs = [f.read()]
repo = CertificateRegistry(trust_roots=other_certs)
paths = repo.build_paths(cert)
self.assertEqual(1, len(paths))
path = paths[0]
self.assertEqual(2, len(path))
def _load_cert_object(self, *path_components):
with open(os.path.join(fixtures_dir, *path_components), 'rb') as f:
cert_bytes = f.read()
if pem.detect(cert_bytes):
_, _, cert_bytes = pem.unarmor(cert_bytes)
cert = x509.Certificate.load(cert_bytes)
return cert
