Changeset View
Changeset View
Standalone View
Standalone View
pykolab/conf/entitlement.py
Show First 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | def __init__(self, *args, **kw): | ||||
if os.access(ca_cert_file, os.R_OK): | if os.access(ca_cert_file, os.R_OK): | ||||
# Verify /etc/kolab/mirror_ca.crt | # Verify /etc/kolab/mirror_ca.crt | ||||
ca_cert = OpenSSL.crypto.load_certificate( | ca_cert = OpenSSL.crypto.load_certificate( | ||||
OpenSSL.SSL.FILETYPE_PEM, | OpenSSL.SSL.FILETYPE_PEM, | ||||
open(ca_cert_file).read() | open(ca_cert_file).read() | ||||
) | ) | ||||
if (bool)(ca_cert.has_expired()): | if (bool)(ca_cert.has_expired()): | ||||
raise Exception, _("Invalid entitlement verification " + \ | raise Exception(_("Invalid entitlement verification " + \ | ||||
"certificate at %s" % (ca_cert_file)) | "certificate at %s" % (ca_cert_file))) | ||||
# TODO: Check validity and warn ~1-2 months in advance. | # TODO: Check validity and warn ~1-2 months in advance. | ||||
ca_cert_issuer = ca_cert.get_issuer() | ca_cert_issuer = ca_cert.get_issuer() | ||||
ca_cert_subject = ca_cert.get_subject() | ca_cert_subject = ca_cert.get_subject() | ||||
ca_cert_issuer_hash = subprocess.Popen( | ca_cert_issuer_hash = subprocess.Popen( | ||||
[ | [ | ||||
'openssl', | 'openssl', | ||||
'x509', | 'x509', | ||||
'-in', | '-in', | ||||
ca_cert_file, | ca_cert_file, | ||||
'-noout', | '-noout', | ||||
'-issuer_hash' | '-issuer_hash' | ||||
], | ], | ||||
stdout=subprocess.PIPE | stdout=subprocess.PIPE | ||||
).communicate()[0].strip() | ).communicate()[0].strip() | ||||
ca_cert_issuer_hash_digest = hashlib.sha224(ca_cert_issuer_hash).hexdigest() | ca_cert_issuer_hash_digest = hashlib.sha224(ca_cert_issuer_hash).hexdigest() | ||||
if not ca_cert_issuer_hash_digest in self.entitlement_verification: | if not ca_cert_issuer_hash_digest in self.entitlement_verification: | ||||
raise Exception, _("Invalid entitlement verification " + \ | raise Exception(_("Invalid entitlement verification " + \ | ||||
"certificate at %s") % (ca_cert_file) | "certificate at %s") % (ca_cert_file)) | ||||
ca_cert_subject_hash = subprocess.Popen( | ca_cert_subject_hash = subprocess.Popen( | ||||
[ | [ | ||||
'openssl', | 'openssl', | ||||
'x509', | 'x509', | ||||
'-in', | '-in', | ||||
ca_cert_file, | ca_cert_file, | ||||
'-noout', | '-noout', | ||||
'-subject_hash' | '-subject_hash' | ||||
], | ], | ||||
stdout=subprocess.PIPE | stdout=subprocess.PIPE | ||||
).communicate()[0].strip() | ).communicate()[0].strip() | ||||
ca_cert_subject_hash_digest = hashlib.sha224(ca_cert_subject_hash).hexdigest() | ca_cert_subject_hash_digest = hashlib.sha224(ca_cert_subject_hash).hexdigest() | ||||
if not ca_cert_subject_hash_digest in self.entitlement_verification: | if not ca_cert_subject_hash_digest in self.entitlement_verification: | ||||
raise Exception, _("Invalid entitlement verification " + \ | raise Exception(_("Invalid entitlement verification " + \ | ||||
"certificate at %s") % (ca_cert_file) | "certificate at %s") % (ca_cert_file)) | ||||
customer_cert_issuer_hash = subprocess.Popen( | customer_cert_issuer_hash = subprocess.Popen( | ||||
[ | [ | ||||
'openssl', | 'openssl', | ||||
'x509', | 'x509', | ||||
'-in', | '-in', | ||||
customer_cert_file, | customer_cert_file, | ||||
'-noout', | '-noout', | ||||
'-issuer_hash' | '-issuer_hash' | ||||
], | ], | ||||
stdout=subprocess.PIPE | stdout=subprocess.PIPE | ||||
).communicate()[0].strip() | ).communicate()[0].strip() | ||||
customer_cert_issuer_hash_digest = hashlib.sha224(customer_cert_issuer_hash).hexdigest() | customer_cert_issuer_hash_digest = hashlib.sha224(customer_cert_issuer_hash).hexdigest() | ||||
if not customer_cert_issuer_hash_digest in self.entitlement_verification: | if not customer_cert_issuer_hash_digest in self.entitlement_verification: | ||||
raise Exception, _("Invalid entitlement verification " + \ | raise Exception(_("Invalid entitlement verification " + \ | ||||
"certificate at %s") % (customer_cert_file) | "certificate at %s") % (customer_cert_file)) | ||||
if not ca_cert_issuer.countryName == ca_cert_subject.countryName: | if not ca_cert_issuer.countryName == ca_cert_subject.countryName: | ||||
raise Exception, _("Invalid entitlement certificate") | raise Exception(_("Invalid entitlement certificate")) | ||||
if not ca_cert_issuer.organizationName == ca_cert_subject.organizationName: | if not ca_cert_issuer.organizationName == ca_cert_subject.organizationName: | ||||
raise Exception, _("Invalid entitlement certificate") | raise Exception(_("Invalid entitlement certificate")) | ||||
if os.path.isdir('/etc/kolab/entitlement.d/') and \ | if os.path.isdir('/etc/kolab/entitlement.d/') and \ | ||||
os.access('/etc/kolab/entitlement.d/', os.R_OK): | os.access('/etc/kolab/entitlement.d/', os.R_OK): | ||||
for root, dirs, files in os.walk('/etc/kolab/entitlement.d/'): | for root, dirs, files in os.walk('/etc/kolab/entitlement.d/'): | ||||
if not root == '/etc/kolab/entitlement.d/': | if not root == '/etc/kolab/entitlement.d/': | ||||
continue | continue | ||||
for entitlement_file in files: | for entitlement_file in files: | ||||
▲ Show 20 Lines • Show All 90 Lines • ▼ Show 20 Lines | def verify_certificate(self, customer_cert_file): | ||||
customer_cert_file, | customer_cert_file, | ||||
'-noout', | '-noout', | ||||
'-serial' | '-serial' | ||||
], | ], | ||||
stdout=subprocess.PIPE | stdout=subprocess.PIPE | ||||
).communicate()[0].strip().split('=')[1] | ).communicate()[0].strip().split('=')[1] | ||||
if not customer_cert_serial == cert_serial: | if not customer_cert_serial == cert_serial: | ||||
raise Exception, _("Invalid entitlement verification " + \ | raise Exception(_("Invalid entitlement verification " + \ | ||||
"certificate at %s") % (customer_cert_file) | "certificate at %s") % (customer_cert_file)) | ||||
customer_cert_issuer_hash = subprocess.Popen( | customer_cert_issuer_hash = subprocess.Popen( | ||||
[ | [ | ||||
'openssl', | 'openssl', | ||||
'x509', | 'x509', | ||||
'-in', | '-in', | ||||
customer_cert_file, | customer_cert_file, | ||||
'-noout', | '-noout', | ||||
'-issuer_hash' | '-issuer_hash' | ||||
], | ], | ||||
stdout=subprocess.PIPE | stdout=subprocess.PIPE | ||||
).communicate()[0].strip() | ).communicate()[0].strip() | ||||
if not customer_cert_issuer_hash == cert_issuer_hash: | if not customer_cert_issuer_hash == cert_issuer_hash: | ||||
raise Exception, _("Invalid entitlement verification " + \ | raise Exception(_("Invalid entitlement verification " + \ | ||||
"certificate at %s") % (customer_cert_file) | "certificate at %s") % (customer_cert_file)) | ||||
customer_cert_subject_hash = subprocess.Popen( | customer_cert_subject_hash = subprocess.Popen( | ||||
[ | [ | ||||
'openssl', | 'openssl', | ||||
'x509', | 'x509', | ||||
'-in', | '-in', | ||||
customer_cert_file, | customer_cert_file, | ||||
'-noout', | '-noout', | ||||
'-subject_hash' | '-subject_hash' | ||||
], | ], | ||||
stdout=subprocess.PIPE | stdout=subprocess.PIPE | ||||
).communicate()[0].strip() | ).communicate()[0].strip() | ||||
if not customer_cert_subject_hash == cert_subject_hash: | if not customer_cert_subject_hash == cert_subject_hash: | ||||
raise Exception, _("Invalid entitlement verification " + \ | raise Exception(_("Invalid entitlement verification " + \ | ||||
"certificate at %s") % (customer_cert_file) | "certificate at %s") % (customer_cert_file)) | ||||
def get(self): | def get(self): | ||||
return self.entitlement | return self.entitlement |