Replace PyOpenSSL by cryptography

Using The crypto module of PyOpenSSL is discouraged now in favor of
the cryptography library[1].

[1] https://www.pyopenssl.org/en/latest/api/crypto.html

pyca/cryptography is likely a better choice than using this module.
It contains a complete set of cryptographic primitives as well as
a significantly better and more powerful X509 API.

Change-Id: Ic9c2bf44b6996ccf9645e0f1e52c1bfcbae33b19
This commit is contained in:
Takashi Kajinami 2024-09-27 15:17:18 +09:00
parent 8ebec483df
commit 1158d5e1f7
4 changed files with 171 additions and 101 deletions

View File

@ -11,11 +11,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import datetime
import os import os
import yaml import yaml
from OpenSSL import crypto from cryptography.hazmat.primitives.asymmetric import rsa
from time import time from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography import x509
from cryptography.x509 import oid
from packstack.installer import basedefs from packstack.installer import basedefs
from packstack.installer import utils from packstack.installer import utils
from packstack.installer.setup_controller import Controller from packstack.installer.setup_controller import Controller
@ -93,64 +98,97 @@ def generateHieraDataFile():
def generate_ssl_cert(config, host, service, ssl_key_file, ssl_cert_file): def generate_ssl_cert(config, host, service, ssl_key_file, ssl_cert_file):
""" """
Wrapper on top of openssl Generate SSL certificate
""" """
# We have to check whether the certificate already exists # We have to check whether the certificate already exists
cert_dir = os.path.join(config['CONFIG_SSL_CERT_DIR'], 'certs') cert_dir = os.path.join(config['CONFIG_SSL_CERT_DIR'], 'certs')
local_cert_name = host + os.path.basename(ssl_cert_file) local_cert_name = host + os.path.basename(ssl_cert_file)
local_cert_path = os.path.join(cert_dir, local_cert_name) local_cert_path = os.path.join(cert_dir, local_cert_name)
if not os.path.exists(local_cert_path): if os.path.exists(local_cert_path):
ca_file = open(config['CONFIG_SSL_CACERT_FILE'], 'rt').read() return
ca_key_file = open(config['CONFIG_SSL_CACERT_KEY_FILE'], 'rt').read()
ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, ca_key_file)
ca = crypto.load_certificate(crypto.FILETYPE_PEM, ca_file)
k = crypto.PKey() with open(config['CONFIG_SSL_CACERT_FILE'], 'rb') as cacert_file:
k.generate_key(crypto.TYPE_RSA, 4096) ca_cert = x509.load_pem_x509_certificate(cacert_file.read())
mail = config['CONFIG_SSL_CERT_SUBJECT_MAIL']
hostinfo = config['HOST_DETAILS'][host]
fqdn = hostinfo['fqdn']
cert = crypto.X509()
subject = cert.get_subject()
subject.C = config['CONFIG_SSL_CERT_SUBJECT_C']
subject.ST = config['CONFIG_SSL_CERT_SUBJECT_ST']
subject.L = config['CONFIG_SSL_CERT_SUBJECT_L']
subject.O = config['CONFIG_SSL_CERT_SUBJECT_O'] # noqa: E741
subject.OU = config['CONFIG_SSL_CERT_SUBJECT_OU']
cn = "%s/%s" % (service, fqdn)
# if subject.CN is more than 64 chars long, cert creation will fail
if len(cn) > 64:
cn = cn[0:63]
subject.CN = cn
subject.emailAddress = mail
cert.add_extensions([ with open(config['CONFIG_SSL_CACERT_KEY_FILE'], 'rb') as ca_key_file:
crypto.X509Extension( ca_key = serialization.load_pem_private_key(ca_key_file.read(),
"keyUsage".encode('ascii'), password=None)
False,
"nonRepudiation,digitalSignature,keyEncipherment".encode('ascii')),
crypto.X509Extension(
"extendedKeyUsage".encode('ascii'),
False,
"clientAuth,serverAuth".encode('ascii')),
])
cert.gmtime_adj_notBefore(0) # create a key pair
cert.gmtime_adj_notAfter(315360000) key = rsa.generate_private_key(
cert.set_issuer(ca.get_subject()) public_exponent=65537,
cert.set_pubkey(k) key_size=4096,
serial = int(time()) )
cert.set_serial_number(serial)
cert.sign(ca_key, 'sha256')
final_cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert) hostinfo = config['HOST_DETAILS'][host]
final_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k) fqdn = hostinfo['fqdn']
deliver_ssl_file(ca_file, config['CONFIG_SSL_CACERT'], host) cn = "%s/%s" % (service, fqdn)
deliver_ssl_file(final_cert.decode(), ssl_cert_file, host) # if subject.CN is more than 64 chars long, cert creation will fail
deliver_ssl_file(final_key.decode(), ssl_key_file, host) if len(cn) > 64:
cn = cn[0:63]
with open(local_cert_path, 'w') as f: subject = x509.Name([
f.write(final_cert.decode()) x509.NameAttribute(oid.NameOID.COUNTRY_NAME,
config['CONFIG_SSL_CERT_SUBJECT_C']),
x509.NameAttribute(oid.NameOID.STATE_OR_PROVINCE_NAME,
config['CONFIG_SSL_CERT_SUBJECT_ST']),
x509.NameAttribute(oid.NameOID.LOCALITY_NAME,
config['CONFIG_SSL_CERT_SUBJECT_L']),
x509.NameAttribute(oid.NameOID. ORGANIZATION_NAME,
config['CONFIG_SSL_CERT_SUBJECT_O']),
x509.NameAttribute(oid.NameOID. ORGANIZATIONAL_UNIT_NAME,
config['CONFIG_SSL_CERT_SUBJECT_OU']),
x509.NameAttribute(oid.NameOID.COMMON_NAME, cn),
x509.NameAttribute(oid.NameOID.EMAIL_ADDRESS,
config['CONFIG_SSL_CERT_SUBJECT_MAIL']),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
ca_cert.subject
).public_key(
key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.now(datetime.timezone.utc)
).not_valid_after(
datetime.datetime.now(datetime.timezone.utc) +
datetime.timedelta(days=3650)
).add_extension(
x509.KeyUsage(
digital_signature=True,
content_commitment=True,
key_encipherment=True,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
),
critical=False,
).add_extension(
x509.ExtendedKeyUsage([
x509.ExtendedKeyUsageOID.CLIENT_AUTH,
x509.ExtendedKeyUsageOID.SERVER_AUTH,
]),
critical=False,
).sign(ca_key, hashes.SHA256())
final_cacert = ca_cert.public_bytes(serialization.Encoding.PEM)
final_cert = cert.public_bytes(serialization.Encoding.PEM)
final_key = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
deliver_ssl_file(final_cacert.decode(), config['CONFIG_SSL_CACERT'], host)
deliver_ssl_file(final_cert.decode(), ssl_cert_file, host)
deliver_ssl_file(final_key.decode(), ssl_key_file, host)
with open(local_cert_path, 'wb') as f:
f.write(final_cert)
def deliver_ssl_file(content, path, host): def deliver_ssl_file(content, path, host):

View File

@ -164,9 +164,9 @@ def create_manifest(config, messages):
raise exceptions.ParamValidationError( raise exceptions.ParamValidationError(
"The file %s doesn't exist" % ssl_chain_file) "The file %s doesn't exist" % ssl_chain_file)
final_cacert = open(ssl_chain_file, 'rt').read()
final_cert = open(ssl_cert_file, 'rt').read() final_cert = open(ssl_cert_file, 'rt').read()
final_key = open(ssl_key_file, 'rt').read() final_key = open(ssl_key_file, 'rt').read()
final_cacert = open(ssl_chain_file, 'rt').read()
host = config['CONFIG_CONTROLLER_HOST'] host = config['CONFIG_CONTROLLER_HOST']
deliver_ssl_file(final_cacert, ssl_chain_file, host) deliver_ssl_file(final_cacert, ssl_chain_file, host)
deliver_ssl_file(final_cert, ssl_cert_file, host) deliver_ssl_file(final_cert, ssl_cert_file, host)

View File

@ -15,11 +15,16 @@
""" """
Plugin responsible for managing SSL options Plugin responsible for managing SSL options
""" """
import datetime
import os import os
from OpenSSL import crypto
from socket import gethostname from socket import gethostname
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography import x509
from cryptography.x509 import oid
from packstack.installer import basedefs from packstack.installer import basedefs
from packstack.installer import utils from packstack.installer import utils
from packstack.installer import validators from packstack.installer import validators
@ -207,7 +212,7 @@ def initSequences(controller):
def create_self_signed_cert(config, messages): def create_self_signed_cert(config, messages):
""" """
OpenSSL wrapper to create selfsigned CA. Generate selfsigned CA.
""" """
# for now hardcoded place for landing CACert file on servers # for now hardcoded place for landing CACert file on servers
@ -239,55 +244,82 @@ def create_self_signed_cert(config, messages):
KEY_FILE = config['CONFIG_SSL_CACERT_KEY_FILE'] = ( KEY_FILE = config['CONFIG_SSL_CACERT_KEY_FILE'] = (
'%s/keys/selfcert.crt' % config['CONFIG_SSL_CERT_DIR'] '%s/keys/selfcert.crt' % config['CONFIG_SSL_CERT_DIR']
) )
if not os.path.exists(CERT_FILE) or not os.path.exists(KEY_FILE):
# create a key pair
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 4096)
# create a self-signed cert if os.path.exists(CERT_FILE) and os.path.exists(KEY_FILE):
mail = config['CONFIG_SSL_CERT_SUBJECT_MAIL'] return
cert = crypto.X509()
subject = cert.get_subject()
subject.C = config['CONFIG_SSL_CERT_SUBJECT_C']
subject.ST = config['CONFIG_SSL_CERT_SUBJECT_ST']
subject.L = config['CONFIG_SSL_CERT_SUBJECT_L']
subject.O = config['CONFIG_SSL_CERT_SUBJECT_O'] # noqa: E741
subject.OU = config['CONFIG_SSL_CERT_SUBJECT_OU']
subject.CN = config['CONFIG_SSL_CERT_SUBJECT_CN']
subject.emailAddress = mail
cert.set_serial_number(1000)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(k)
# CA extensions # create a key pair
cert.add_extensions([ key = rsa.generate_private_key(
crypto.X509Extension("basicConstraints".encode('ascii'), False, public_exponent=65537,
"CA:TRUE".encode('ascii')), key_size=4096,
crypto.X509Extension("keyUsage".encode('ascii'), False, )
"keyCertSign, cRLSign".encode('ascii')),
crypto.X509Extension("subjectKeyIdentifier".encode('ascii'), False,
"hash".encode('ascii'),
subject=cert),
])
cert.add_extensions([ subject = issuer = x509.Name([
crypto.X509Extension( x509.NameAttribute(oid.NameOID.COUNTRY_NAME,
"authorityKeyIdentifier".encode('ascii'), False, config['CONFIG_SSL_CERT_SUBJECT_C']),
"keyid:always".encode('ascii'), issuer=cert) x509.NameAttribute(oid.NameOID.STATE_OR_PROVINCE_NAME,
]) config['CONFIG_SSL_CERT_SUBJECT_ST']),
x509.NameAttribute(oid.NameOID.LOCALITY_NAME,
config['CONFIG_SSL_CERT_SUBJECT_L']),
x509.NameAttribute(oid.NameOID. ORGANIZATION_NAME,
config['CONFIG_SSL_CERT_SUBJECT_O']),
x509.NameAttribute(oid.NameOID. ORGANIZATIONAL_UNIT_NAME,
config['CONFIG_SSL_CERT_SUBJECT_OU']),
x509.NameAttribute(oid.NameOID.COMMON_NAME,
config['CONFIG_SSL_CERT_SUBJECT_CN']),
x509.NameAttribute(oid.NameOID.EMAIL_ADDRESS,
config['CONFIG_SSL_CERT_SUBJECT_MAIL']),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.now(datetime.timezone.utc)
).not_valid_after(
datetime.datetime.now(datetime.timezone.utc) +
datetime.timedelta(days=3650)
).add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=False,
).add_extension(
x509.KeyUsage(
digital_signature=False,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False,
),
critical=False,
).add_extension(
x509.SubjectKeyIdentifier.from_public_key(key.public_key()),
critical=False,
).add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(key.public_key()),
critical=False,
).sign(key, hashes.SHA256())
cert.sign(k, 'sha256') with open(CERT_FILE, 'wb') as certfile:
certfile.write(cert.public_bytes(serialization.Encoding.PEM))
open((CERT_FILE), "w").write( with open(KEY_FILE, 'wb') as keyfile:
crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode()) keyfile.write(key.private_bytes(
open((KEY_FILE), "w").write( encoding=serialization.Encoding.PEM,
crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode()) format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
messages.append( messages.append(
"%sNOTE%s : A selfsigned CA certificate was generated to be used " "%sNOTE%s : A selfsigned CA certificate was generated to be used "
"for ssl, you should still change it do subordinate CA cert. In " "for ssl, you should still change it do subordinate CA cert. In "
"any case please save the contents of %s." "any case please save the contents of %s."
% (utils.COLORS['red'], utils.COLORS['nocolor'], % (utils.COLORS['red'], utils.COLORS['nocolor'],
config['CONFIG_SSL_CERT_DIR'])) config['CONFIG_SSL_CERT_DIR']))

View File

@ -2,6 +2,6 @@ pbr>=1.6 # Apache-2.0
netaddr>=0.7.6 netaddr>=0.7.6
PyYAML>=3.10 PyYAML>=3.10
docutils>=0.11 docutils>=0.11
pyOpenSSL>=16.2.0
netifaces netifaces
distro distro
cryptography>=2.1 # BSD/Apache-2.0