first commit

This commit is contained in:
unknown
2025-12-08 21:35:55 +09:00
commit f343f405f7
5357 changed files with 923703 additions and 0 deletions

View File

@@ -0,0 +1,61 @@
# Copyright (c) 2014-2021 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""A shim for accessing cryptographic primitives needed by asyncssh"""
from .cipher import BasicCipher, GCMCipher, register_cipher, get_cipher_params
from .dsa import DSAPrivateKey, DSAPublicKey
from .dh import DH
from .ec import ECDSAPrivateKey, ECDSAPublicKey, ECDH
from .ed import ed25519_available, ed448_available
from .ed import curve25519_available, curve448_available
from .ed import EdDSAPrivateKey, EdDSAPublicKey, Curve25519DH, Curve448DH
from .ec_params import lookup_ec_curve_by_params
from .kdf import pbkdf2_hmac
from .misc import CryptoKey, PyCAKey
from .rsa import RSAPrivateKey, RSAPublicKey
from .sntrup import sntrup761_available
from .sntrup import sntrup761_pubkey_bytes, sntrup761_ciphertext_bytes
from .sntrup import sntrup761_keypair, sntrup761_encaps, sntrup761_decaps
# Import chacha20-poly1305 cipher if available
from .chacha import ChachaCipher, chacha_available
# Import umac cryptographic hash if available
try:
from .umac import umac32, umac64, umac96, umac128
except (ImportError, AttributeError, OSError): # pragma: no cover
pass
# Import X.509 certificate support if available
try:
from .x509 import X509Certificate, X509Name, X509NamePattern
from .x509 import generate_x509_certificate, import_x509_certificate
except ImportError: # pragma: no cover
pass

View File

@@ -0,0 +1,162 @@
# Copyright (c) 2015-2021 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""Chacha20-Poly1305 symmetric encryption handler"""
from ctypes import c_ulonglong, create_string_buffer
from typing import Optional, Tuple
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends.openssl import backend
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers.algorithms import ChaCha20
from cryptography.hazmat.primitives.poly1305 import Poly1305
from .cipher import register_cipher
if backend.poly1305_supported():
_CTR_0 = (0).to_bytes(8, 'little')
_CTR_1 = (1).to_bytes(8, 'little')
_POLY1305_KEYBYTES = 32
def chacha20(key: bytes, data: bytes, nonce: bytes, ctr: int) -> bytes:
"""Encrypt/decrypt a block of data with the ChaCha20 cipher"""
return Cipher(ChaCha20(key, (_CTR_1 if ctr else _CTR_0) + nonce),
mode=None).encryptor().update(data)
def poly1305_key(key: bytes, nonce: bytes) -> bytes:
"""Derive a Poly1305 key"""
return chacha20(key, _POLY1305_KEYBYTES * b'\0', nonce, 0)
def poly1305(key: bytes, data: bytes, nonce: bytes) -> bytes:
"""Compute a Poly1305 tag for a block of data"""
return Poly1305.generate_tag(poly1305_key(key, nonce), data)
def poly1305_verify(key: bytes, data: bytes,
nonce: bytes, tag: bytes) -> bool:
"""Verify a Poly1305 tag for a block of data"""
try:
Poly1305.verify_tag(poly1305_key(key, nonce), data, tag)
return True
except InvalidSignature:
return False
chacha_available = True
else: # pragma: no cover
try:
from libnacl import nacl
_chacha20 = nacl.crypto_stream_chacha20
_chacha20_xor_ic = nacl.crypto_stream_chacha20_xor_ic
_POLY1305_BYTES = nacl.crypto_onetimeauth_poly1305_bytes()
_POLY1305_KEYBYTES = nacl.crypto_onetimeauth_poly1305_keybytes()
_poly1305 = nacl.crypto_onetimeauth_poly1305
_poly1305_verify = nacl.crypto_onetimeauth_poly1305_verify
def chacha20(key: bytes, data: bytes, nonce: bytes, ctr: int) -> bytes:
"""Encrypt/decrypt a block of data with the ChaCha20 cipher"""
datalen = len(data)
result = create_string_buffer(datalen)
ull_datalen = c_ulonglong(datalen)
ull_ctr = c_ulonglong(ctr)
_chacha20_xor_ic(result, data, ull_datalen, nonce, ull_ctr, key)
return result.raw
def poly1305_key(key: bytes, nonce: bytes) -> bytes:
"""Derive a Poly1305 key"""
polykey = create_string_buffer(_POLY1305_KEYBYTES)
ull_polykeylen = c_ulonglong(_POLY1305_KEYBYTES)
_chacha20(polykey, ull_polykeylen, nonce, key)
return polykey.raw
def poly1305(key: bytes, data: bytes, nonce: bytes) -> bytes:
"""Compute a Poly1305 tag for a block of data"""
tag = create_string_buffer(_POLY1305_BYTES)
ull_datalen = c_ulonglong(len(data))
polykey = poly1305_key(key, nonce)
_poly1305(tag, data, ull_datalen, polykey)
return tag.raw
def poly1305_verify(key: bytes, data: bytes,
nonce: bytes, tag: bytes) -> bool:
"""Verify a Poly1305 tag for a block of data"""
ull_datalen = c_ulonglong(len(data))
polykey = poly1305_key(key, nonce)
return _poly1305_verify(tag, data, ull_datalen, polykey) == 0
chacha_available = True
except (ImportError, OSError, AttributeError):
chacha_available = False
class ChachaCipher:
"""Shim for Chacha20-Poly1305 symmetric encryption"""
def __init__(self, key: bytes):
keylen = len(key) // 2
self._key = key[:keylen]
self._adkey = key[keylen:]
def encrypt_and_sign(self, header: bytes, data: bytes,
nonce: bytes) -> Tuple[bytes, bytes]:
"""Encrypt and sign a block of data"""
header = chacha20(self._adkey, header, nonce, 0)
data = chacha20(self._key, data, nonce, 1)
tag = poly1305(self._key, header + data, nonce)
return header + data, tag
def decrypt_header(self, header: bytes, nonce: bytes) -> bytes:
"""Decrypt header data"""
return chacha20(self._adkey, header, nonce, 0)
def verify_and_decrypt(self, header: bytes, data: bytes,
nonce: bytes, tag: bytes) -> Optional[bytes]:
"""Verify the signature of and decrypt a block of data"""
if poly1305_verify(self._key, header + data, nonce, tag):
return chacha20(self._key, data, nonce, 1)
else:
return None
if chacha_available: # pragma: no branch
register_cipher('chacha20-poly1305', 64, 0, 1)

View File

@@ -0,0 +1,166 @@
# Copyright (c) 2014-2021 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""A shim around PyCA for accessing symmetric ciphers needed by AsyncSSH"""
from typing import Any, MutableMapping, Optional, Tuple
import warnings
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.primitives.ciphers import Cipher, CipherContext
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.ciphers.algorithms import AES, ARC4
from cryptography.hazmat.primitives.ciphers.algorithms import TripleDES
from cryptography.hazmat.primitives.ciphers.modes import CBC, CTR
with warnings.catch_warnings():
warnings.simplefilter('ignore')
from cryptography.hazmat.primitives.ciphers.algorithms import Blowfish
from cryptography.hazmat.primitives.ciphers.algorithms import CAST5
from cryptography.hazmat.primitives.ciphers.algorithms import SEED
_CipherAlgs = Tuple[Any, Any, int]
_CipherParams = Tuple[int, int, int]
_GCM_MAC_SIZE = 16
_cipher_algs: MutableMapping[str, _CipherAlgs] = {}
_cipher_params: MutableMapping[str, _CipherParams] = {}
class BasicCipher:
"""Shim for basic ciphers"""
def __init__(self, cipher_name: str, key: bytes, iv: bytes):
cipher, mode, initial_bytes = _cipher_algs[cipher_name]
self._cipher = Cipher(cipher(key), mode(iv) if mode else None)
self._initial_bytes = initial_bytes
self._encryptor: Optional[CipherContext] = None
self._decryptor: Optional[CipherContext] = None
def encrypt(self, data: bytes) -> bytes:
"""Encrypt a block of data"""
if not self._encryptor:
self._encryptor = self._cipher.encryptor()
if self._initial_bytes:
assert self._encryptor is not None
self._encryptor.update(self._initial_bytes * b'\0')
assert self._encryptor is not None
return self._encryptor.update(data)
def decrypt(self, data: bytes) -> bytes:
"""Decrypt a block of data"""
if not self._decryptor:
self._decryptor = self._cipher.decryptor()
if self._initial_bytes:
assert self._decryptor is not None
self._decryptor.update(self._initial_bytes * b'\0')
assert self._decryptor is not None
return self._decryptor.update(data)
class GCMCipher:
"""Shim for GCM ciphers"""
def __init__(self, cipher_name: str, key: bytes, iv: bytes):
self._cipher = _cipher_algs[cipher_name][0]
self._key = key
self._iv = iv
def _update_iv(self) -> None:
"""Update the IV after each encrypt/decrypt operation"""
invocation = int.from_bytes(self._iv[4:], 'big')
invocation = (invocation + 1) & 0xffffffffffffffff
self._iv = self._iv[:4] + invocation.to_bytes(8, 'big')
def encrypt_and_sign(self, header: bytes,
data: bytes) -> Tuple[bytes, bytes]:
"""Encrypt and sign a block of data"""
data = AESGCM(self._key).encrypt(self._iv, data, header)
self._update_iv()
return header + data[:-_GCM_MAC_SIZE], data[-_GCM_MAC_SIZE:]
def verify_and_decrypt(self, header: bytes, data: bytes,
mac: bytes) -> Optional[bytes]:
"""Verify the signature of and decrypt a block of data"""
try:
decrypted_data: Optional[bytes] = \
AESGCM(self._key).decrypt(self._iv, data + mac, header)
except InvalidTag:
decrypted_data = None
self._update_iv()
return decrypted_data
def register_cipher(cipher_name: str, key_size: int,
iv_size: int, block_size: int) -> None:
"""Register a symmetric cipher"""
_cipher_params[cipher_name] = (key_size, iv_size, block_size)
def get_cipher_params(cipher_name: str) -> _CipherParams:
"""Get parameters of a symmetric cipher"""
return _cipher_params[cipher_name]
_cipher_alg_list = (
('aes128-cbc', AES, CBC, 0, 16, 16, 16),
('aes192-cbc', AES, CBC, 0, 24, 16, 16),
('aes256-cbc', AES, CBC, 0, 32, 16, 16),
('aes128-ctr', AES, CTR, 0, 16, 16, 16),
('aes192-ctr', AES, CTR, 0, 24, 16, 16),
('aes256-ctr', AES, CTR, 0, 32, 16, 16),
('aes128-gcm', None, None, 0, 16, 12, 16),
('aes256-gcm', None, None, 0, 32, 12, 16),
('arcfour', ARC4, None, 0, 16, 1, 1),
('arcfour40', ARC4, None, 0, 5, 1, 1),
('arcfour128', ARC4, None, 1536, 16, 1, 1),
('arcfour256', ARC4, None, 1536, 32, 1, 1),
('blowfish-cbc', Blowfish, CBC, 0, 16, 8, 8),
('cast128-cbc', CAST5, CBC, 0, 16, 8, 8),
('des-cbc', TripleDES, CBC, 0, 8, 8, 8),
('des2-cbc', TripleDES, CBC, 0, 16, 8, 8),
('des3-cbc', TripleDES, CBC, 0, 24, 8, 8),
('seed-cbc', SEED, CBC, 0, 16, 16, 16)
)
for _cipher_name, _cipher, _mode, _initial_bytes, \
_key_size, _iv_size, _block_size in _cipher_alg_list:
_cipher_algs[_cipher_name] = (_cipher, _mode, _initial_bytes)
register_cipher(_cipher_name, _key_size, _iv_size, _block_size)

View File

@@ -0,0 +1,46 @@
# Copyright (c) 2022 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""A shim around PyCA for Diffie Hellman key exchange"""
from cryptography.hazmat.primitives.asymmetric import dh
class DH:
"""A shim around PyCA for Diffie Hellman key exchange"""
def __init__(self, g: int, p: int):
self._pn = dh.DHParameterNumbers(p, g)
self._priv_key = self._pn.parameters().generate_private_key()
def get_public(self) -> int:
"""Return the public key to send in the handshake"""
pub_key = self._priv_key.public_key()
return pub_key.public_numbers().y
def get_shared(self, peer_public: int) -> int:
"""Return the shared key from the peer's public key"""
peer_key = dh.DHPublicNumbers(peer_public, self._pn).public_key()
shared_key = self._priv_key.exchange(peer_key)
return int.from_bytes(shared_key, 'big')

View File

@@ -0,0 +1,132 @@
# Copyright (c) 2014-2023 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""A shim around PyCA for DSA public and private keys"""
from typing import Optional, cast
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric import dsa
from .misc import CryptoKey, PyCAKey, hashes
# Short variable names are used here, matching names in the spec
# pylint: disable=invalid-name
class _DSAKey(CryptoKey):
"""Base class for shim around PyCA for DSA keys"""
def __init__(self, pyca_key: PyCAKey, params: dsa.DSAParameterNumbers,
pub: dsa.DSAPublicNumbers,
priv: Optional[dsa.DSAPrivateNumbers] = None):
super().__init__(pyca_key)
self._params = params
self._pub = pub
self._priv = priv
@property
def p(self) -> int:
"""Return the DSA public modulus"""
return self._params.p
@property
def q(self) -> int:
"""Return the DSA sub-group order"""
return self._params.q
@property
def g(self) -> int:
"""Return the DSA generator"""
return self._params.g
@property
def y(self) -> int:
"""Return the DSA public value"""
return self._pub.y
@property
def x(self) -> Optional[int]:
"""Return the DSA private value"""
return self._priv.x if self._priv else None
class DSAPrivateKey(_DSAKey):
"""A shim around PyCA for DSA private keys"""
@classmethod
def construct(cls, p: int, q: int, g: int,
y: int, x: int) -> 'DSAPrivateKey':
"""Construct a DSA private key"""
params = dsa.DSAParameterNumbers(p, q, g)
pub = dsa.DSAPublicNumbers(y, params)
priv = dsa.DSAPrivateNumbers(x, pub)
priv_key = priv.private_key()
return cls(priv_key, params, pub, priv)
@classmethod
def generate(cls, key_size: int) -> 'DSAPrivateKey':
"""Generate a new DSA private key"""
priv_key = dsa.generate_private_key(key_size)
priv = priv_key.private_numbers()
pub = priv.public_numbers
params = pub.parameter_numbers
return cls(priv_key, params, pub, priv)
def sign(self, data: bytes, hash_name: str = '') -> bytes:
"""Sign a block of data"""
priv_key = cast('dsa.DSAPrivateKey', self.pyca_key)
return priv_key.sign(data, hashes[hash_name]())
class DSAPublicKey(_DSAKey):
"""A shim around PyCA for DSA public keys"""
@classmethod
def construct(cls, p: int, q: int, g: int, y: int) -> 'DSAPublicKey':
"""Construct a DSA public key"""
params = dsa.DSAParameterNumbers(p, q, g)
pub = dsa.DSAPublicNumbers(y, params)
pub_key = pub.public_key()
return cls(pub_key, params, pub)
def verify(self, data: bytes, sig: bytes, hash_name: str = '') -> bool:
"""Verify the signature on a block of data"""
try:
pub_key = cast('dsa.DSAPublicKey', self.pyca_key)
pub_key.verify(sig, data, hashes[hash_name]())
return True
except InvalidSignature:
return False

View File

@@ -0,0 +1,205 @@
# Copyright (c) 2015-2021 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""A shim around PyCA for elliptic curve keys and key exchange"""
from typing import Mapping, Optional, Type, cast
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PublicFormat
from .misc import CryptoKey, PyCAKey, hashes
# Short variable names are used here, matching names in the spec
# pylint: disable=invalid-name
_curves: Mapping[bytes, Type[ec.EllipticCurve]] = {
b'1.3.132.0.10': ec.SECP256K1,
b'nistp256': ec.SECP256R1,
b'nistp384': ec.SECP384R1,
b'nistp521': ec.SECP521R1
}
class _ECKey(CryptoKey):
"""Base class for shim around PyCA for EC keys"""
def __init__(self, pyca_key: PyCAKey, curve_id: bytes,
pub: ec.EllipticCurvePublicNumbers, point: bytes,
priv: Optional[ec.EllipticCurvePrivateNumbers] = None):
super().__init__(pyca_key)
self._curve_id = curve_id
self._pub = pub
self._point = point
self._priv = priv
@classmethod
def lookup_curve(cls, curve_id: bytes) -> Type[ec.EllipticCurve]:
"""Look up curve and hash algorithm"""
try:
return _curves[curve_id]
except KeyError: # pragma: no cover, other curves not registered
raise ValueError('Unknown EC curve %s' %
curve_id.decode()) from None
@property
def curve_id(self) -> bytes:
"""Return the EC curve name"""
return self._curve_id
@property
def x(self) -> int:
"""Return the EC public x coordinate"""
return self._pub.x
@property
def y(self) -> int:
"""Return the EC public y coordinate"""
return self._pub.y
@property
def d(self) -> Optional[int]:
"""Return the EC private value as an integer"""
return self._priv.private_value if self._priv else None
@property
def public_value(self) -> bytes:
"""Return the EC public point value encoded as a byte string"""
return self._point
@property
def private_value(self) -> Optional[bytes]:
"""Return the EC private value encoded as a byte string"""
if self._priv:
keylen = (self._pub.curve.key_size + 7) // 8
return self._priv.private_value.to_bytes(keylen, 'big')
else:
return None
class ECDSAPrivateKey(_ECKey):
"""A shim around PyCA for ECDSA private keys"""
@classmethod
def construct(cls, curve_id: bytes, public_value: bytes,
private_value: int) -> 'ECDSAPrivateKey':
"""Construct an ECDSA private key"""
curve = cls.lookup_curve(curve_id)
priv_key = ec.derive_private_key(private_value, curve())
priv = priv_key.private_numbers()
pub = priv.public_numbers
return cls(priv_key, curve_id, pub, public_value, priv)
@classmethod
def generate(cls, curve_id: bytes) -> 'ECDSAPrivateKey':
"""Generate a new ECDSA private key"""
curve = cls.lookup_curve(curve_id)
priv_key = ec.generate_private_key(curve())
priv = priv_key.private_numbers()
pub_key = priv_key.public_key()
pub = pub_key.public_numbers()
public_value = pub_key.public_bytes(Encoding.X962,
PublicFormat.UncompressedPoint)
return cls(priv_key, curve_id, pub, public_value, priv)
def sign(self, data: bytes, hash_name: str = '') -> bytes:
"""Sign a block of data"""
# pylint: disable=unused-argument
priv_key = cast('ec.EllipticCurvePrivateKey', self.pyca_key)
return priv_key.sign(data, ec.ECDSA(hashes[hash_name]()))
class ECDSAPublicKey(_ECKey):
"""A shim around PyCA for ECDSA public keys"""
@classmethod
def construct(cls, curve_id: bytes,
public_value: bytes) -> 'ECDSAPublicKey':
"""Construct an ECDSA public key"""
curve = cls.lookup_curve(curve_id)
pub_key = ec.EllipticCurvePublicKey.from_encoded_point(curve(),
public_value)
pub = pub_key.public_numbers()
return cls(pub_key, curve_id, pub, public_value)
def verify(self, data: bytes, sig: bytes, hash_name: str = '') -> bool:
"""Verify the signature on a block of data"""
try:
pub_key = cast('ec.EllipticCurvePublicKey', self.pyca_key)
pub_key.verify(sig, data, ec.ECDSA(hashes[hash_name]()))
return True
except InvalidSignature:
return False
class ECDH:
"""A shim around PyCA for ECDH key exchange"""
def __init__(self, curve_id: bytes):
try:
curve = _curves[curve_id]
except KeyError: # pragma: no cover, other curves not registered
raise ValueError('Unknown EC curve %s' %
curve_id.decode()) from None
self._priv_key = ec.generate_private_key(curve())
def get_public(self) -> bytes:
"""Return the public key to send in the handshake"""
pub_key = self._priv_key.public_key()
return pub_key.public_bytes(Encoding.X962,
PublicFormat.UncompressedPoint)
def get_shared(self, peer_public: bytes) -> int:
"""Return the shared key from the peer's public key"""
peer_key = ec.EllipticCurvePublicKey.from_encoded_point(
self._priv_key.curve, peer_public)
shared_key = self._priv_key.exchange(ec.ECDH(), peer_key)
return int.from_bytes(shared_key, 'big')

View File

@@ -0,0 +1,87 @@
# Copyright (c) 2013-2021 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""Functions for looking up named elliptic curves by their parameters"""
_curve_param_map = {}
# Short variable names are used here, matching names in the spec
# pylint: disable=invalid-name
def register_prime_curve(curve_id: bytes, p: int, a: int, b: int,
point: bytes, n: int) -> None:
"""Register an elliptic curve prime domain
This function registers an elliptic curve prime domain by
specifying the SSH identifier for the curve and the set of
parameters describing the curve, generator point, and order.
This allows EC keys encoded with explicit parameters to be
mapped back into their SSH curve IDs.
"""
_curve_param_map[p, a % p, b % p, point, n] = curve_id
def lookup_ec_curve_by_params(p: int, a: int, b: int,
point: bytes, n: int) -> bytes:
"""Look up an elliptic curve by its parameters
This function looks up an elliptic curve by its parameters
and returns the curve's name.
"""
try:
return _curve_param_map[p, a % p, b % p, point, n]
except (KeyError, ValueError):
raise ValueError('Unknown elliptic curve parameters') from None
# pylint: disable=line-too-long
register_prime_curve(b'nistp521',
6864797660130609714981900799081393217269435300143305409394463459185543183397656052122559640661454554977296311391480858037121987999716643812574028291115057151,
-3,
1093849038073734274511112390766805569936207598951683748994586394495953116150735016013708737573759623248592132296706313309438452531591012912142327488478985984,
b'\x04\x00\xc6\x85\x8e\x06\xb7\x04\x04\xe9\xcd\x9e>\xcbf#\x95\xb4B\x9cd\x819\x05?\xb5!\xf8(\xaf`kM=\xba\xa1K^w\xef\xe7Y(\xfe\x1d\xc1\'\xa2\xff\xa8\xde3H\xb3\xc1\x85jB\x9b\xf9~~1\xc2\xe5\xbdf\x01\x189)jx\x9a;\xc0\x04\\\x8a_\xb4,}\x1b\xd9\x98\xf5DIW\x9bDh\x17\xaf\xbd\x17\'>f,\x97\xeer\x99^\xf4&@\xc5P\xb9\x01?\xad\x07a5<p\x86\xa2r\xc2@\x88\xbe\x94v\x9f\xd1fP',
6864797660130609714981900799081393217269435300143305409394463459185543183397655394245057746333217197532963996371363321113864768612440380340372808892707005449)
register_prime_curve(b'nistp384',
39402006196394479212279040100143613805079739270465446667948293404245721771496870329047266088258938001861606973112319,
-3,
27580193559959705877849011840389048093056905856361568521428707301988689241309860865136260764883745107765439761230575,
b'\x04\xaa\x87\xca"\xbe\x8b\x057\x8e\xb1\xc7\x1e\xf3 \xadtn\x1d;b\x8b\xa7\x9b\x98Y\xf7A\xe0\x82T*8U\x02\xf2]\xbfU)l:T^8rv\n\xb76\x17\xdeJ\x96&,o]\x9e\x98\xbf\x92\x92\xdc)\xf8\xf4\x1d\xbd(\x9a\x14|\xe9\xda1\x13\xb5\xf0\xb8\xc0\n`\xb1\xce\x1d~\x81\x9dzC\x1d|\x90\xea\x0e_',
39402006196394479212279040100143613805079739270465446667946905279627659399113263569398956308152294913554433653942643)
register_prime_curve(b'nistp256',
115792089210356248762697446949407573530086143415290314195533631308867097853951,
-3,
41058363725152142129326129780047268409114441015993725554835256314039467401291,
b'\x04k\x17\xd1\xf2\xe1,BG\xf8\xbc\xe6\xe5c\xa4@\xf2w\x03}\x81-\xeb3\xa0\xf4\xa19E\xd8\x98\xc2\x96O\xe3B\xe2\xfe\x1a\x7f\x9b\x8e\xe7\xebJ|\x0f\x9e\x16+\xce3Wk1^\xce\xcb\xb6@h7\xbfQ\xf5',
115792089210356248762697446949407573529996955224135760342422259061068512044369)
register_prime_curve(b'1.3.132.0.10',
115792089237316195423570985008687907853269984665640564039457584007908834671663,
0,
7,
b'\x04y\xbef~\xf9\xdc\xbb\xacU\xa0b\x95\xce\x87\x0b\x07\x02\x9b\xfc\xdb-\xce(\xd9Y\xf2\x81[\x16\xf8\x17\x98H:\xdaw&\xa3\xc4e]\xa4\xfb\xfc\x0e\x11\x08\xa8\xfd\x17\xb4H\xa6\x85T\x19\x9cG\xd0\x8f\xfb\x10\xd4\xb8',
115792089237316195423570985008687907852837564279074904382605163141518161494337)

View File

@@ -0,0 +1,325 @@
# Copyright (c) 2019-2023 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""A shim around PyCA and libnacl for Edwards-curve keys and key exchange"""
import ctypes
import os
from typing import Dict, Optional, Union, cast
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends.openssl import backend
from cryptography.hazmat.primitives.asymmetric import ed25519, ed448
from cryptography.hazmat.primitives.asymmetric import x25519, x448
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import PublicFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from .misc import CryptoKey, PyCAKey
_EdPrivateKey = Union[ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey]
_EdPublicKey = Union[ed25519.Ed25519PublicKey, ed448.Ed448PublicKey]
ed25519_available = backend.ed25519_supported()
ed448_available = backend.ed448_supported()
curve25519_available = backend.x25519_supported()
curve448_available = backend.x448_supported()
if ed25519_available or ed448_available: # pragma: no branch
class _EdDSAKey(CryptoKey):
"""Base class for shim around PyCA for EdDSA keys"""
def __init__(self, pyca_key: PyCAKey, pub: bytes,
priv: Optional[bytes] = None):
super().__init__(pyca_key)
self._pub = pub
self._priv = priv
@property
def public_value(self) -> bytes:
"""Return the public value encoded as a byte string"""
return self._pub
@property
def private_value(self) -> Optional[bytes]:
"""Return the private value encoded as a byte string"""
return self._priv
class EdDSAPrivateKey(_EdDSAKey):
"""A shim around PyCA for EdDSA private keys"""
_priv_classes: Dict[bytes, object] = {}
if ed25519_available: # pragma: no branch
_priv_classes[b'ed25519'] = ed25519.Ed25519PrivateKey
if ed448_available: # pragma: no branch
_priv_classes[b'ed448'] = ed448.Ed448PrivateKey
@classmethod
def construct(cls, curve_id: bytes, priv: bytes) -> 'EdDSAPrivateKey':
"""Construct an EdDSA private key"""
priv_cls = cast('_EdPrivateKey', cls._priv_classes[curve_id])
priv_key = priv_cls.from_private_bytes(priv)
pub_key = priv_key.public_key()
pub = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw)
return cls(priv_key, pub, priv)
@classmethod
def generate(cls, curve_id: bytes) -> 'EdDSAPrivateKey':
"""Generate a new EdDSA private key"""
priv_cls = cast('_EdPrivateKey', cls._priv_classes[curve_id])
priv_key = priv_cls.generate()
priv = priv_key.private_bytes(Encoding.Raw, PrivateFormat.Raw,
NoEncryption())
pub_key = priv_key.public_key()
pub = pub_key.public_bytes(Encoding.Raw, PublicFormat.Raw)
return cls(priv_key, pub, priv)
def sign(self, data: bytes, hash_name: str = '') -> bytes:
"""Sign a block of data"""
# pylint: disable=unused-argument
priv_key = cast('_EdPrivateKey', self.pyca_key)
return priv_key.sign(data)
class EdDSAPublicKey(_EdDSAKey):
"""A shim around PyCA for EdDSA public keys"""
_pub_classes: Dict[bytes, object] = {
b'ed25519': ed25519.Ed25519PublicKey,
b'ed448': ed448.Ed448PublicKey
}
@classmethod
def construct(cls, curve_id: bytes, pub: bytes) -> 'EdDSAPublicKey':
"""Construct an EdDSA public key"""
pub_cls = cast('_EdPublicKey', cls._pub_classes[curve_id])
pub_key = pub_cls.from_public_bytes(pub)
return cls(pub_key, pub)
def verify(self, data: bytes, sig: bytes, hash_name: str = '') -> bool:
"""Verify the signature on a block of data"""
# pylint: disable=unused-argument
try:
pub_key = cast('_EdPublicKey', self.pyca_key)
pub_key.verify(sig, data)
return True
except InvalidSignature:
return False
else: # pragma: no cover
class _EdDSANaclKey:
"""Base class for shim around libnacl for EdDSA keys"""
def __init__(self, pub: bytes, priv: Optional[bytes] = None):
self._pub = pub
self._priv = priv
@property
def public_value(self) -> bytes:
"""Return the public value encoded as a byte string"""
return self._pub
@property
def private_value(self) -> Optional[bytes]:
"""Return the private value encoded as a byte string"""
return self._priv[:-len(self._pub)] if self._priv else None
class EdDSAPrivateKey(_EdDSANaclKey): # type: ignore
"""A shim around libnacl for EdDSA private keys"""
@classmethod
def construct(cls, curve_id: bytes, priv: bytes) -> 'EdDSAPrivateKey':
"""Construct an EdDSA private key"""
# pylint: disable=unused-argument
return cls(*_ed25519_construct_keypair(priv))
@classmethod
def generate(cls, curve_id: str) -> 'EdDSAPrivateKey':
"""Generate a new EdDSA private key"""
# pylint: disable=unused-argument
return cls(*_ed25519_generate_keypair())
def sign(self, data: bytes, hash_name: str = '') -> bytes:
"""Sign a block of data"""
# pylint: disable=unused-argument
assert self._priv is not None
return _ed25519_sign(data, self._priv)[:-len(data)]
class EdDSAPublicKey(_EdDSANaclKey): # type: ignore
"""A shim around libnacl for EdDSA public keys"""
@classmethod
def construct(cls, curve_id: bytes, pub: bytes) -> 'EdDSAPublicKey':
"""Construct an EdDSA public key"""
# pylint: disable=unused-argument
if len(pub) != _ED25519_PUBLIC_BYTES:
raise ValueError('Invalid EdDSA public key')
return cls(pub)
def verify(self, data: bytes, sig: bytes, hash_name: str = '') -> bool:
"""Verify the signature on a block of data"""
# pylint: disable=unused-argument
try:
return _ed25519_verify(sig + data, self._pub) == data
except ValueError:
return False
try:
import libnacl
_ED25519_PUBLIC_BYTES = libnacl.crypto_sign_ed25519_PUBLICKEYBYTES
_ed25519_construct_keypair = libnacl.crypto_sign_seed_keypair
_ed25519_generate_keypair = libnacl.crypto_sign_keypair
_ed25519_sign = libnacl.crypto_sign
_ed25519_verify = libnacl.crypto_sign_open
ed25519_available = True
except (ImportError, OSError, AttributeError):
pass
if curve25519_available: # pragma: no branch
class Curve25519DH:
"""Curve25519 Diffie Hellman implementation based on PyCA"""
def __init__(self) -> None:
self._priv_key = x25519.X25519PrivateKey.generate()
def get_public(self) -> bytes:
"""Return the public key to send in the handshake"""
return self._priv_key.public_key().public_bytes(Encoding.Raw,
PublicFormat.Raw)
def get_shared_bytes(self, peer_public: bytes) -> bytes:
"""Return the shared key from the peer's public key"""
peer_key = x25519.X25519PublicKey.from_public_bytes(peer_public)
return self._priv_key.exchange(peer_key)
def get_shared(self, peer_public: bytes) -> int:
"""Return the shared key from the peer's public key as bytes"""
return int.from_bytes(self.get_shared_bytes(peer_public), 'big')
else: # pragma: no cover
class Curve25519DH: # type: ignore
"""Curve25519 Diffie Hellman implementation based on libnacl"""
def __init__(self) -> None:
self._private = os.urandom(_CURVE25519_SCALARBYTES)
def get_public(self) -> bytes:
"""Return the public key to send in the handshake"""
public = ctypes.create_string_buffer(_CURVE25519_BYTES)
if _curve25519_base(public, self._private) != 0:
# This error is never returned by libsodium
raise ValueError('Curve25519 failed') # pragma: no cover
return public.raw
def get_shared_bytes(self, peer_public: bytes) -> bytes:
"""Return the shared key from the peer's public key as bytes"""
if len(peer_public) != _CURVE25519_BYTES:
raise ValueError('Invalid curve25519 public key size')
shared = ctypes.create_string_buffer(_CURVE25519_BYTES)
if _curve25519(shared, self._private, peer_public) != 0:
raise ValueError('Curve25519 failed')
return shared.raw
def get_shared(self, peer_public: bytes) -> int:
"""Return the shared key from the peer's public key"""
return int.from_bytes(self.get_shared_bytes(peer_public), 'big')
try:
from libnacl import nacl
_CURVE25519_BYTES = nacl.crypto_scalarmult_curve25519_bytes()
_CURVE25519_SCALARBYTES = \
nacl.crypto_scalarmult_curve25519_scalarbytes()
_curve25519 = nacl.crypto_scalarmult_curve25519
_curve25519_base = nacl.crypto_scalarmult_curve25519_base
curve25519_available = True
except (ImportError, OSError, AttributeError):
pass
class Curve448DH:
"""Curve448 Diffie Hellman implementation based on PyCA"""
def __init__(self) -> None:
self._priv_key = x448.X448PrivateKey.generate()
def get_public(self) -> bytes:
"""Return the public key to send in the handshake"""
return self._priv_key.public_key().public_bytes(Encoding.Raw,
PublicFormat.Raw)
def get_shared(self, peer_public: bytes) -> int:
"""Return the shared key from the peer's public key"""
peer_key = x448.X448PublicKey.from_public_bytes(peer_public)
shared = self._priv_key.exchange(peer_key)
return int.from_bytes(shared, 'big')

View File

@@ -0,0 +1,33 @@
# Copyright (c) 2017-2021 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""A shim around PyCA for key derivation functions"""
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from .misc import hashes
def pbkdf2_hmac(hash_name: str, passphrase: bytes, salt: bytes,
count: int, key_size: int) -> bytes:
"""A shim around PyCA for PBKDF2 HMAC key derivation"""
return PBKDF2HMAC(hashes[hash_name](), key_size, salt,
count).derive(passphrase)

View File

@@ -0,0 +1,70 @@
# Copyright (c) 2017-2023 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""Miscellaneous PyCA utility classes and functions"""
from typing import Callable, Mapping, Union
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
from cryptography.hazmat.primitives.asymmetric import ed25519, ed448
from cryptography.hazmat.primitives.hashes import HashAlgorithm
from cryptography.hazmat.primitives.hashes import MD5, SHA1, SHA224
from cryptography.hazmat.primitives.hashes import SHA256, SHA384, SHA512
PyCAPrivateKey = Union[dsa.DSAPrivateKey, rsa.RSAPrivateKey,
ec.EllipticCurvePrivateKey,
ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey]
PyCAPublicKey = Union[dsa.DSAPublicKey, rsa.RSAPublicKey,
ec.EllipticCurvePublicKey,
ed25519.Ed25519PublicKey, ed448.Ed448PublicKey]
PyCAKey = Union[PyCAPrivateKey, PyCAPublicKey]
hashes: Mapping[str, Callable[[], HashAlgorithm]] = {
str(h.name): h for h in (MD5, SHA1, SHA224, SHA256, SHA384, SHA512)
}
class CryptoKey:
"""Base class for PyCA private/public keys"""
def __init__(self, pyca_key: PyCAKey):
self._pyca_key = pyca_key
@property
def pyca_key(self) -> PyCAKey:
"""Return the PyCA object associated with this key"""
return self._pyca_key
def sign(self, data: bytes, hash_name: str = '') -> bytes:
"""Sign a block of data"""
# pylint: disable=no-self-use
raise RuntimeError # pragma: no cover
def verify(self, data: bytes, sig: bytes, hash_name: str = '') -> bool:
"""Verify the signature on a block of data"""
# pylint: disable=no-self-use
raise RuntimeError # pragma: no cover

View File

@@ -0,0 +1,169 @@
# Copyright (c) 2014-2023 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""A shim around PyCA for RSA public and private keys"""
from typing import Optional, cast
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.padding import MGF1, OAEP
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.asymmetric import rsa
from .misc import CryptoKey, PyCAKey, hashes
# Short variable names are used here, matching names in the spec
# pylint: disable=invalid-name
class _RSAKey(CryptoKey):
"""Base class for shim around PyCA for RSA keys"""
def __init__(self, pyca_key: PyCAKey, pub: rsa.RSAPublicNumbers,
priv: Optional[rsa.RSAPrivateNumbers] = None):
super().__init__(pyca_key)
self._pub = pub
self._priv = priv
@property
def n(self) -> int:
"""Return the RSA public modulus"""
return self._pub.n
@property
def e(self) -> int:
"""Return the RSA public exponent"""
return self._pub.e
@property
def d(self) -> Optional[int]:
"""Return the RSA private exponent"""
return self._priv.d if self._priv else None
@property
def p(self) -> Optional[int]:
"""Return the RSA first private prime"""
return self._priv.p if self._priv else None
@property
def q(self) -> Optional[int]:
"""Return the RSA second private prime"""
return self._priv.q if self._priv else None
@property
def dmp1(self) -> Optional[int]:
"""Return d modulo p-1"""
return self._priv.dmp1 if self._priv else None
@property
def dmq1(self) -> Optional[int]:
"""Return q modulo p-1"""
return self._priv.dmq1 if self._priv else None
@property
def iqmp(self) -> Optional[int]:
"""Return the inverse of q modulo p"""
return self._priv.iqmp if self._priv else None
class RSAPrivateKey(_RSAKey):
"""A shim around PyCA for RSA private keys"""
@classmethod
def construct(cls, n: int, e: int, d: int, p: int, q: int,
dmp1: int, dmq1: int, iqmp: int,
skip_validation: bool) -> 'RSAPrivateKey':
"""Construct an RSA private key"""
pub = rsa.RSAPublicNumbers(e, n)
priv = rsa.RSAPrivateNumbers(p, q, d, dmp1, dmq1, iqmp, pub)
priv_key = priv.private_key(
unsafe_skip_rsa_key_validation=skip_validation)
return cls(priv_key, pub, priv)
@classmethod
def generate(cls, key_size: int, exponent: int) -> 'RSAPrivateKey':
"""Generate a new RSA private key"""
priv_key = rsa.generate_private_key(exponent, key_size)
priv = priv_key.private_numbers()
pub = priv.public_numbers
return cls(priv_key, pub, priv)
def decrypt(self, data: bytes, hash_name: str) -> Optional[bytes]:
"""Decrypt a block of data"""
try:
hash_alg = hashes[hash_name]()
priv_key = cast('rsa.RSAPrivateKey', self.pyca_key)
return priv_key.decrypt(data, OAEP(MGF1(hash_alg), hash_alg, None))
except ValueError:
return None
def sign(self, data: bytes, hash_name: str = '') -> bytes:
"""Sign a block of data"""
priv_key = cast('rsa.RSAPrivateKey', self.pyca_key)
return priv_key.sign(data, PKCS1v15(), hashes[hash_name]())
class RSAPublicKey(_RSAKey):
"""A shim around PyCA for RSA public keys"""
@classmethod
def construct(cls, n: int, e: int) -> 'RSAPublicKey':
"""Construct an RSA public key"""
pub = rsa.RSAPublicNumbers(e, n)
pub_key = pub.public_key()
return cls(pub_key, pub)
def encrypt(self, data: bytes, hash_name: str) -> Optional[bytes]:
"""Encrypt a block of data"""
try:
hash_alg = hashes[hash_name]()
pub_key = cast('rsa.RSAPublicKey', self.pyca_key)
return pub_key.encrypt(data, OAEP(MGF1(hash_alg), hash_alg, None))
except ValueError:
return None
def verify(self, data: bytes, sig: bytes, hash_name: str = '') -> bool:
"""Verify the signature on a block of data"""
try:
pub_key = cast('rsa.RSAPublicKey', self.pyca_key)
pub_key.verify(sig, data, PKCS1v15(), hashes[hash_name]())
return True
except InvalidSignature:
return False

View File

@@ -0,0 +1,88 @@
# Copyright (c) 2022 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""A shim around liboqs for Streamlined NTRU Prime post-quantum encryption"""
import ctypes
import ctypes.util
from typing import Tuple
sntrup761_available = False
sntrup761_pubkey_bytes = 1158
sntrup761_privkey_bytes = 1763
sntrup761_ciphertext_bytes = 1039
sntrup761_secret_bytes = 32
for lib in ('oqs', 'liboqs'):
_oqs_lib = ctypes.util.find_library(lib)
if _oqs_lib: # pragma: no branch
break
else: # pragma: no cover
_oqs_lib = None
if _oqs_lib: # pragma: no branch
_oqs = ctypes.cdll.LoadLibrary(_oqs_lib)
_sntrup761_keypair = _oqs.OQS_KEM_ntruprime_sntrup761_keypair
_sntrup761_encaps = _oqs.OQS_KEM_ntruprime_sntrup761_encaps
_sntrup761_decaps = _oqs.OQS_KEM_ntruprime_sntrup761_decaps
sntrup761_available = True
def sntrup761_keypair() -> Tuple[bytes, bytes]:
"""Make a SNTRUP761 key pair"""
pubkey = ctypes.create_string_buffer(sntrup761_pubkey_bytes)
privkey = ctypes.create_string_buffer(sntrup761_privkey_bytes)
_sntrup761_keypair(pubkey, privkey)
return pubkey.raw, privkey.raw
def sntrup761_encaps(pubkey: bytes) -> Tuple[bytes, bytes]:
"""Generate a random secret and encrypt it with a public key"""
if len(pubkey) != sntrup761_pubkey_bytes:
raise ValueError('Invalid SNTRUP761 public key')
ciphertext = ctypes.create_string_buffer(sntrup761_ciphertext_bytes)
secret = ctypes.create_string_buffer(sntrup761_secret_bytes)
_sntrup761_encaps(ciphertext, secret, pubkey)
return secret.raw, ciphertext.raw
def sntrup761_decaps(ciphertext: bytes, privkey: bytes) -> bytes:
"""Decrypt an encrypted secret using a private key"""
if len(ciphertext) != sntrup761_ciphertext_bytes:
raise ValueError('Invalid SNTRUP761 ciphertext')
secret = ctypes.create_string_buffer(sntrup761_secret_bytes)
_sntrup761_decaps(secret, ciphertext, privkey)
return secret.raw

View File

@@ -0,0 +1,139 @@
# Copyright (c) 2016-2021 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""UMAC cryptographic hash (RFC 4418) wrapper for Nettle library"""
import binascii
import ctypes
import ctypes.util
from typing import TYPE_CHECKING, Callable, Optional
if TYPE_CHECKING:
_ByteArray = ctypes.Array[ctypes.c_char]
_SetKey = Callable[[_ByteArray, bytes], None]
_SetNonce = Callable[[_ByteArray, ctypes.c_size_t, bytes], None]
_Update = Callable[[_ByteArray, ctypes.c_size_t, bytes], None]
_Digest = Callable[[_ByteArray, ctypes.c_size_t, _ByteArray], None]
_New = Callable[[bytes, Optional[bytes], Optional[bytes]], object]
_UMAC_BLOCK_SIZE = 1024
_UMAC_DEFAULT_CTX_SIZE = 4096
def _build_umac(size: int) -> '_New':
"""Function to build UMAC wrapper for a specific digest size"""
_name = 'umac%d' % size
_prefix = 'nettle_%s_' % _name
try:
_context_size: int = getattr(_nettle, _prefix + '_ctx_size')()
except AttributeError:
_context_size = _UMAC_DEFAULT_CTX_SIZE
_set_key: _SetKey = getattr(_nettle, _prefix + 'set_key')
_set_nonce: _SetNonce = getattr(_nettle, _prefix + 'set_nonce')
_update: _Update = getattr(_nettle, _prefix + 'update')
_digest: _Digest = getattr(_nettle, _prefix + 'digest')
class _UMAC:
"""Wrapper for UMAC cryptographic hash
This class supports the cryptographic hash API defined in PEP 452.
"""
name = _name
block_size = _UMAC_BLOCK_SIZE
digest_size = size // 8
def __init__(self, ctx: '_ByteArray', nonce: Optional[bytes] = None,
msg: Optional[bytes] = None):
self._ctx = ctx
if nonce:
self.set_nonce(nonce)
if msg:
self.update(msg)
@classmethod
def new(cls, key: bytes, msg: Optional[bytes] = None,
nonce: Optional[bytes] = None) -> '_UMAC':
"""Construct a new UMAC hash object"""
ctx = ctypes.create_string_buffer(_context_size)
_set_key(ctx, key)
return cls(ctx, nonce, msg)
def copy(self) -> '_UMAC':
"""Return a new hash object with this object's state"""
ctx = ctypes.create_string_buffer(self._ctx.raw)
return self.__class__(ctx)
def set_nonce(self, nonce: bytes) -> None:
"""Reset the nonce associated with this object"""
_set_nonce(self._ctx, ctypes.c_size_t(len(nonce)), nonce)
def update(self, msg: bytes) -> None:
"""Add the data in msg to the hash"""
_update(self._ctx, ctypes.c_size_t(len(msg)), msg)
def digest(self) -> bytes:
"""Return the hash and increment nonce to begin a new message
.. note:: The hash is reset and the nonce is incremented
when this function is called. This doesn't match
the behavior defined in PEP 452.
"""
result = ctypes.create_string_buffer(self.digest_size)
_digest(self._ctx, ctypes.c_size_t(self.digest_size), result)
return result.raw
def hexdigest(self) -> str:
"""Return the digest as a string of hexadecimal digits"""
return binascii.b2a_hex(self.digest()).decode('ascii')
return _UMAC.new
for lib in ('nettle', 'libnettle', 'libnettle-6'):
_nettle_lib = ctypes.util.find_library(lib)
if _nettle_lib: # pragma: no branch
break
else: # pragma: no cover
_nettle_lib = None
if _nettle_lib: # pragma: no branch
_nettle = ctypes.cdll.LoadLibrary(_nettle_lib)
umac32, umac64, umac96, umac128 = map(_build_umac, (32, 64, 96, 128))

View File

@@ -0,0 +1,417 @@
# Copyright (c) 2017-2021 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""A shim around PyCA and PyOpenSSL for X.509 certificates"""
from datetime import datetime, timezone
import re
import sys
from typing import Iterable, List, Optional, Sequence, Set, Union, cast
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PublicFormat
from cryptography import x509
from OpenSSL import crypto
from ..asn1 import IA5String, der_decode, der_encode
from ..misc import ip_address
from .misc import PyCAKey, PyCAPrivateKey, PyCAPublicKey, hashes
_Comment = Union[None, bytes, str]
_Principals = Union[str, Sequence[str]]
_Purposes = Union[None, str, Sequence[str]]
_PurposeOIDs = Union[None, Set[x509.ObjectIdentifier]]
_GeneralNameList = List[x509.GeneralName]
_NameInit = Union[str, x509.Name, Iterable[x509.RelativeDistinguishedName]]
_purpose_to_oid = {
'serverAuth': x509.ExtendedKeyUsageOID.SERVER_AUTH,
'clientAuth': x509.ExtendedKeyUsageOID.CLIENT_AUTH,
'secureShellClient': x509.ObjectIdentifier('1.3.6.1.5.5.7.3.21'),
'secureShellServer': x509.ObjectIdentifier('1.3.6.1.5.5.7.3.22')}
_purpose_any = '2.5.29.37.0'
_nscomment_oid = x509.ObjectIdentifier('2.16.840.1.113730.1.13')
_datetime_min = datetime.fromtimestamp(0, timezone.utc).replace(microsecond=1)
_datetime_32bit_max = datetime.fromtimestamp(2**31 - 1, timezone.utc)
if sys.platform == 'win32': # pragma: no cover
# Windows' datetime.max is year 9999, but timestamps that large don't work
_datetime_max = datetime.max.replace(year=2999, tzinfo=timezone.utc)
else:
_datetime_max = datetime.max.replace(tzinfo=timezone.utc)
def _to_generalized_time(t: int) -> datetime:
"""Convert a timestamp value to a datetime"""
if t <= 0:
return _datetime_min
else:
try:
return datetime.fromtimestamp(t, timezone.utc)
except (OSError, OverflowError):
try:
# Work around a bug in cryptography which shows up on
# systems with a small time_t.
datetime.fromtimestamp(_datetime_max.timestamp() - 1,
timezone.utc)
return _datetime_max
except (OSError, OverflowError): # pragma: no cover
return _datetime_32bit_max
def _to_purpose_oids(purposes: _Purposes) -> _PurposeOIDs:
"""Convert a list of purposes to purpose OIDs"""
if isinstance(purposes, str):
purposes = [p.strip() for p in purposes.split(',')]
if not purposes or 'any' in purposes or _purpose_any in purposes:
purpose_oids = None
else:
purpose_oids = set(_purpose_to_oid.get(p) or x509.ObjectIdentifier(p)
for p in purposes)
return purpose_oids
def _encode_user_principals(principals: _Principals) -> _GeneralNameList:
"""Encode user principals as e-mail addresses"""
if isinstance(principals, str):
principals = [p.strip() for p in principals.split(',')]
return [x509.RFC822Name(name) for name in principals]
def _encode_host_principals(principals: _Principals) -> _GeneralNameList:
"""Encode host principals as DNS names or IP addresses"""
def _encode_host(name: str) -> x509.GeneralName:
"""Encode a host principal as a DNS name or IP address"""
try:
return x509.IPAddress(ip_address(name))
except ValueError:
return x509.DNSName(name)
if isinstance(principals, str):
principals = [p.strip() for p in principals.split(',')]
return [_encode_host(name) for name in principals]
class X509Name(x509.Name):
"""A shim around PyCA for X.509 distinguished names"""
_escape = re.compile(r'([,+\\])')
_unescape = re.compile(r'\\([,+\\])')
_split_rdn = re.compile(r'(?:[^+\\]+|\\.)+')
_split_name = re.compile(r'(?:[^,\\]+|\\.)+')
_attrs = (
('C', x509.NameOID.COUNTRY_NAME),
('ST', x509.NameOID.STATE_OR_PROVINCE_NAME),
('L', x509.NameOID.LOCALITY_NAME),
('O', x509.NameOID.ORGANIZATION_NAME),
('OU', x509.NameOID.ORGANIZATIONAL_UNIT_NAME),
('CN', x509.NameOID.COMMON_NAME),
('DC', x509.NameOID.DOMAIN_COMPONENT))
_to_oid = dict((k, v) for k, v in _attrs)
_from_oid = dict((v, k) for k, v in _attrs)
def __init__(self, name: _NameInit):
if isinstance(name, str):
rdns = self._parse_name(name)
elif isinstance(name, x509.Name):
rdns = name.rdns
else:
rdns = name
super().__init__(rdns)
def __str__(self) -> str:
return ','.join(self._format_rdn(rdn) for rdn in self.rdns)
def _format_rdn(self, rdn: x509.RelativeDistinguishedName) -> str:
"""Format an X.509 RelativeDistinguishedName as a string"""
return '+'.join(sorted(self._format_attr(nameattr) for nameattr in rdn))
def _format_attr(self, nameattr: x509.NameAttribute) -> str:
"""Format an X.509 NameAttribute as a string"""
attr = self._from_oid.get(nameattr.oid) or nameattr.oid.dotted_string
return attr + '=' + self._escape.sub(r'\\\1', cast(str, nameattr.value))
def _parse_name(self, name: str) -> \
Iterable[x509.RelativeDistinguishedName]:
"""Parse an X.509 distinguished name"""
return [self._parse_rdn(rdn) for rdn in self._split_name.findall(name)]
def _parse_rdn(self, rdn: str) -> x509.RelativeDistinguishedName:
"""Parse an X.509 relative distinguished name"""
return x509.RelativeDistinguishedName(
self._parse_nameattr(av) for av in self._split_rdn.findall(rdn))
def _parse_nameattr(self, av: str) -> x509.NameAttribute:
"""Parse an X.509 name attribute/value pair"""
try:
attr, value = av.split('=', 1)
except ValueError:
raise ValueError('Invalid X.509 name attribute: ' + av) from None
try:
attr = attr.strip()
oid = self._to_oid.get(attr) or x509.ObjectIdentifier(attr)
except ValueError:
raise ValueError('Unknown X.509 attribute: ' + attr) from None
return x509.NameAttribute(oid, self._unescape.sub(r'\1', value))
class X509NamePattern:
"""Match X.509 distinguished names"""
def __init__(self, pattern: str):
if pattern.endswith(',*'):
self._pattern = X509Name(pattern[:-2])
self._prefix_len: Optional[int] = len(self._pattern.rdns)
else:
self._pattern = X509Name(pattern)
self._prefix_len = None
def __eq__(self, other: object) -> bool:
# This isn't protected access - both objects are _RSAKey instances
# pylint: disable=protected-access
if not isinstance(other, X509NamePattern): # pragma: no cover
return NotImplemented
return (self._pattern == other._pattern and
self._prefix_len == other._prefix_len)
def __hash__(self) -> int:
return hash((self._pattern, self._prefix_len))
def matches(self, name: X509Name) -> bool:
"""Return whether an X.509 name matches this pattern"""
return self._pattern.rdns == name.rdns[:self._prefix_len]
class X509Certificate:
"""A shim around PyCA and PyOpenSSL for X.509 certificates"""
def __init__(self, cert: x509.Certificate, data: bytes):
self.data = data
self.subject = X509Name(cert.subject)
self.issuer = X509Name(cert.issuer)
self.key_data = cert.public_key().public_bytes(
Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
self.openssl_cert = crypto.X509.from_cryptography(cert)
self.subject_hash = hex(self.openssl_cert.get_subject().hash())[2:]
self.issuer_hash = hex(self.openssl_cert.get_issuer().hash())[2:]
try:
self.purposes: Optional[Set[bytes]] = \
set(cert.extensions.get_extension_for_class(
x509.ExtendedKeyUsage).value)
except x509.ExtensionNotFound:
self.purposes = None
try:
sans = cert.extensions.get_extension_for_class(
x509.SubjectAlternativeName).value
self.user_principals = sans.get_values_for_type(x509.RFC822Name)
self.host_principals = sans.get_values_for_type(x509.DNSName) + \
[str(ip) for ip in sans.get_values_for_type(x509.IPAddress)]
except x509.ExtensionNotFound:
cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
principals = [cast(str, attr.value) for attr in cn]
self.user_principals = principals
self.host_principals = principals
try:
comment = cert.extensions.get_extension_for_oid(_nscomment_oid)
comment_der = cast(x509.UnrecognizedExtension, comment.value).value
self.comment: Optional[bytes] = \
cast(IA5String, der_decode(comment_der)).value
except x509.ExtensionNotFound:
self.comment = None
def __eq__(self, other: object) -> bool:
if not isinstance(other, X509Certificate): # pragma: no cover
return NotImplemented
return self.data == other.data
def __hash__(self) -> int:
return hash(self.data)
def validate(self, trust_store: Sequence['X509Certificate'],
purposes: _Purposes, user_principal: str,
host_principal: str) -> None:
"""Validate an X.509 certificate"""
purpose_oids = _to_purpose_oids(purposes)
if purpose_oids and self.purposes and not purpose_oids & self.purposes:
raise ValueError('Certificate purpose mismatch')
if user_principal and user_principal not in self.user_principals:
raise ValueError('Certificate user principal mismatch')
if host_principal and host_principal not in self.host_principals:
raise ValueError('Certificate host principal mismatch')
x509_store = crypto.X509Store()
for c in trust_store:
x509_store.add_cert(c.openssl_cert)
try:
x509_ctx = crypto.X509StoreContext(x509_store, self.openssl_cert,
None)
x509_ctx.verify_certificate()
except crypto.X509StoreContextError as exc:
raise ValueError(f'X.509 chain validation error: {exc}') from None
def generate_x509_certificate(signing_key: PyCAKey, key: PyCAKey,
subject: _NameInit, issuer: Optional[_NameInit],
serial: Optional[int], valid_after: int,
valid_before: int, ca: bool,
ca_path_len: Optional[int], purposes: _Purposes,
user_principals: _Principals,
host_principals: _Principals,
hash_name: str,
comment: _Comment) -> X509Certificate:
"""Generate a new X.509 certificate"""
builder = x509.CertificateBuilder()
subject = X509Name(subject)
issuer = X509Name(issuer) if issuer else subject
self_signed = subject == issuer
builder = builder.subject_name(subject)
builder = builder.issuer_name(issuer)
if serial is None:
serial = x509.random_serial_number()
builder = builder.serial_number(serial)
builder = builder.not_valid_before(_to_generalized_time(valid_after))
builder = builder.not_valid_after(_to_generalized_time(valid_before))
builder = builder.public_key(cast(PyCAPublicKey, key))
if ca:
basic_constraints = x509.BasicConstraints(ca=True,
path_length=ca_path_len)
key_usage = x509.KeyUsage(digital_signature=False,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False, key_cert_sign=True,
crl_sign=True, encipher_only=False,
decipher_only=False)
else:
basic_constraints = x509.BasicConstraints(ca=False, path_length=None)
key_usage = x509.KeyUsage(digital_signature=True,
content_commitment=False,
key_encipherment=True,
data_encipherment=False,
key_agreement=True, key_cert_sign=False,
crl_sign=False, encipher_only=False,
decipher_only=False)
builder = builder.add_extension(basic_constraints, critical=True)
if ca or not self_signed:
builder = builder.add_extension(key_usage, critical=True)
purpose_oids = _to_purpose_oids(purposes)
if purpose_oids:
builder = builder.add_extension(x509.ExtendedKeyUsage(purpose_oids),
critical=False)
skid = x509.SubjectKeyIdentifier.from_public_key(cast(PyCAPublicKey, key))
builder = builder.add_extension(skid, critical=False)
if not self_signed:
issuer_pk = cast(PyCAPrivateKey, signing_key).public_key()
akid = x509.AuthorityKeyIdentifier.from_issuer_public_key(issuer_pk)
builder = builder.add_extension(akid, critical=False)
sans = _encode_user_principals(user_principals) + \
_encode_host_principals(host_principals)
if sans:
builder = builder.add_extension(x509.SubjectAlternativeName(sans),
critical=False)
if comment:
if isinstance(comment, str):
comment_bytes = comment.encode('utf-8')
else:
comment_bytes = comment
comment_bytes = der_encode(IA5String(comment_bytes))
builder = builder.add_extension(
x509.UnrecognizedExtension(_nscomment_oid, comment_bytes),
critical=False)
try:
hash_alg = hashes[hash_name]() if hash_name else None
except KeyError:
raise ValueError('Unknown hash algorithm') from None
cert = builder.sign(cast(PyCAPrivateKey, signing_key), hash_alg)
data = cert.public_bytes(Encoding.DER)
return X509Certificate(cert, data)
def import_x509_certificate(data: bytes) -> X509Certificate:
"""Construct an X.509 certificate from DER data"""
cert = x509.load_der_x509_certificate(data)
return X509Certificate(cert, data)