From 341930eb918bdfa3e984a07bcbd9dde002ce8a9c Mon Sep 17 00:00:00 2001 From: Alessandro Pilotti Date: Fri, 24 Feb 2017 13:55:20 +0200 Subject: [PATCH] Add certificates plugin for Windows Install all the certificates provided by the metadata. Change-Id: Ida2550a10fa043e40b194db5d0db10692e716edf Implements: add-certificates-plugin Co-Authored-By: Paula Madalina Crismaru --- cloudbaseinit/constant.py | 3 + cloudbaseinit/metadata/services/base.py | 3 + cloudbaseinit/plugins/windows/certificates.py | 66 ++++ .../plugins/windows/test_certificates.py | 132 ++++++++ cloudbaseinit/tests/utils/test_encoding.py | 4 + .../tests/utils/windows/test_x509.py | 283 +++++++++++++++++- cloudbaseinit/utils/encoding.py | 7 + cloudbaseinit/utils/windows/cryptoapi.py | 93 +++++- cloudbaseinit/utils/windows/x509.py | 224 +++++++++++++- 9 files changed, 791 insertions(+), 24 deletions(-) create mode 100644 cloudbaseinit/plugins/windows/certificates.py create mode 100644 cloudbaseinit/tests/plugins/windows/test_certificates.py diff --git a/cloudbaseinit/constant.py b/cloudbaseinit/constant.py index 30bbbfb0..887c80d3 100644 --- a/cloudbaseinit/constant.py +++ b/cloudbaseinit/constant.py @@ -43,3 +43,6 @@ LOGON_PASSWORD_CHANGE_OPTIONS = [CLEAR_TEXT_INJECTED_ONLY, NEVER_CHANGE, VOL_ACT_KMS = "KMS" VOL_ACT_AVMA = "AVMA" + +CERT_LOCATION_LOCAL_MACHINE = "LocalMachine" +CERT_LOCATION_CURRENT_USER = "CurrentUser" diff --git a/cloudbaseinit/metadata/services/base.py b/cloudbaseinit/metadata/services/base.py index 04f6f5eb..c5ca9b74 100644 --- a/cloudbaseinit/metadata/services/base.py +++ b/cloudbaseinit/metadata/services/base.py @@ -157,6 +157,9 @@ class BaseMetadataService(object): def get_winrm_listeners_configuration(self): pass + def get_server_certs(self): + pass + def get_client_auth_certs(self): pass diff --git a/cloudbaseinit/plugins/windows/certificates.py b/cloudbaseinit/plugins/windows/certificates.py new file mode 100644 index 00000000..b9119754 --- /dev/null +++ b/cloudbaseinit/plugins/windows/certificates.py @@ -0,0 +1,66 @@ +# Copyright (c) 2017 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as oslo_logging + +from cloudbaseinit import conf as cloudbaseinit_conf +from cloudbaseinit import constant +from cloudbaseinit import exception +from cloudbaseinit.plugins.common import base +from cloudbaseinit.utils.windows import x509 + +CONF = cloudbaseinit_conf.CONF +LOG = oslo_logging.getLogger(__name__) + + +class ServerCertificatesPlugin(base.BasePlugin): + @staticmethod + def _use_machine_keyset(store_location): + if store_location == constant.CERT_LOCATION_LOCAL_MACHINE: + return True + elif store_location == constant.CERT_LOCATION_CURRENT_USER: + return False + else: + raise exception.ItemNotFoundException( + "Unsupported certificate store location: %s" % + store_location) + + def execute(self, service, shared_data): + certs_info = service.get_server_certs() + + if certs_info is None: + LOG.info("The metadata service does not provide server " + "certificates") + else: + cert_mgr = x509.CryptoAPICertManager() + for cert_info in service.get_server_certs(): + cert_name = cert_info.get("name") + store_location = cert_info.get("store_location") + store_name = cert_info.get("store_name") + pfx_data = cert_info.get("pfx_data") + machine_keyset = self._use_machine_keyset(store_location) + pfx_password = None + + LOG.info("Importing PFX certificate %(cert_name)s in store " + "%(store_location)s, %(store_name)s", + {"cert_name": cert_name, + "store_location": store_location, + "store_name": store_name}) + cert_mgr.import_pfx_certificate( + pfx_data, pfx_password, machine_keyset, store_name) + + return base.PLUGIN_EXECUTION_DONE, False + + def get_os_requirements(self): + return 'win32', (5, 2) diff --git a/cloudbaseinit/tests/plugins/windows/test_certificates.py b/cloudbaseinit/tests/plugins/windows/test_certificates.py new file mode 100644 index 00000000..19f43277 --- /dev/null +++ b/cloudbaseinit/tests/plugins/windows/test_certificates.py @@ -0,0 +1,132 @@ +# Copyright 2017 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import importlib +import unittest + +try: + import unittest.mock as mock +except ImportError: + import mock + +from cloudbaseinit import constant +from cloudbaseinit import exception +from cloudbaseinit.tests import testutils + + +class ServerCertificatesPluginTests(unittest.TestCase): + def setUp(self): + module_path = 'cloudbaseinit.plugins.windows.certificates' + self.snatcher = testutils.LogSnatcher(module_path) + + self._ctypes_mock = mock.MagicMock() + self._comtypes_mock = mock.MagicMock() + self._ctypes_mock_ = mock.MagicMock() + + self._module_patcher = mock.patch.dict( + 'sys.modules', + {'comtypes': self._comtypes_mock, + 'ctypes': self._ctypes_mock, + 'ctypes.windll': self._ctypes_mock_}) + + self._module_patcher.start() + self.addCleanup(self._module_patcher.stop) + + self.cert_module = importlib.import_module(module_path) + self._cert = self.cert_module.ServerCertificatesPlugin() + + @mock.patch.object(constant, 'CERT_LOCATION_LOCAL_MACHINE', + mock.sentinel.CERT_LOCATION_LOCAL_MACHINE, + create=True) + @mock.patch.object(constant, 'CERT_LOCATION_CURRENT_USER', + mock.sentinel.CERT_LOCATION_CURRENT_USER, + create=True) + def _test_use_machine_keyset(self, store_location): + if store_location == constant.CERT_LOCATION_LOCAL_MACHINE: + expected_result = True + elif store_location == constant.CERT_LOCATION_CURRENT_USER: + expected_result = False + else: + ex = exception.ItemNotFoundException( + "Unsupported certificate store location: %s" % + store_location) + with self.assertRaises(exception.ItemNotFoundException) as exc: + (self._cert._use_machine_keyset(store_location)) + self.assertEqual(str(ex), str(exc.exception)) + return + + result = (self._cert._use_machine_keyset(store_location)) + self.assertEqual(result, expected_result) + + def test_use_keyset_current_user(self): + store_location = mock.sentinel.CERT_LOCATION_CURRENT_USER + self._test_use_machine_keyset(store_location) + + def test_use_keyset_local_machine(self): + store_location = mock.sentinel.CERT_LOCATION_LOCAL_MACHINE + self._test_use_machine_keyset(store_location) + + def test_use_keyset_except(self): + self._test_use_machine_keyset(None) + + def test_get_os_requirements(self): + result = self._cert.get_os_requirements() + self.assertEqual(result, ('win32', (5, 2))) + + @mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager') + def _test_execute(self, mock_crypto_manager, certs_info=None): + mock_service = mock.Mock() + mock_service.get_server_certs.return_value = certs_info + self._cert._use_machine_keyset = mock.Mock() + if certs_info is None: + expected_logging = [ + "The metadata service does not provide server " + "certificates" + ] + call_count = 0 + else: + call_count = len(certs_info) + cert_info = certs_info[0] + cert_name = cert_info.get("name") + store_location = cert_info.get("store_location") + store_name = cert_info.get("store_name") + expected_logging = [ + "Importing PFX certificate {cert_name} in store " + "{store_location}, {store_name}".format( + cert_name=cert_name, + store_location=store_location, + store_name=store_name) + ] * call_count + with self.snatcher: + result = self._cert.execute( + mock_service, mock.sentinel.shared_data) + self.assertEqual(expected_logging, self.snatcher.output) + self.assertEqual(result, + (self.cert_module.base.PLUGIN_EXECUTION_DONE, False)) + self.assertEqual(mock_crypto_manager.return_value. + import_pfx_certificate.call_count, call_count) + self.assertEqual(self._cert._use_machine_keyset.call_count, call_count) + + def test_execute_no_certs(self): + self._test_execute() + + def test_execute(self): + cert_info = { + "name": "fake_name", + "store_location": "fake store_location", + "store_name": "fake store_name", + "pfx_data": "fake pfx_data" + } + certs_info = [cert_info] * 5 + self._test_execute(certs_info=certs_info) diff --git a/cloudbaseinit/tests/utils/test_encoding.py b/cloudbaseinit/tests/utils/test_encoding.py index dac234fa..6971819d 100644 --- a/cloudbaseinit/tests/utils/test_encoding.py +++ b/cloudbaseinit/tests/utils/test_encoding.py @@ -54,3 +54,7 @@ class TestEncoding(unittest.TestCase): if encode: data = data.encode() self.assertEqual(data, content) + + def test_hex_to_bytes(self): + result = encoding.hex_to_bytes("66616b652064617461") + self.assertEqual(result, b"fake data") diff --git a/cloudbaseinit/tests/utils/windows/test_x509.py b/cloudbaseinit/tests/utils/windows/test_x509.py index 32830eb7..b30f1c27 100644 --- a/cloudbaseinit/tests/utils/windows/test_x509.py +++ b/cloudbaseinit/tests/utils/windows/test_x509.py @@ -30,7 +30,9 @@ class CryptoAPICertManagerTests(unittest.TestCase): self._ctypes = mock.MagicMock() self._module_patcher = mock.patch.dict( - 'sys.modules', {'ctypes': self._ctypes}) + 'sys.modules', + {'ctypes': self._ctypes, + 'ctypes.windll': mock.MagicMock()}) self._module_patcher.start() @@ -122,8 +124,8 @@ class CryptoAPICertManagerTests(unittest.TestCase): self.x509.cryptoapi.PROV_RSA_FULL, self.x509.cryptoapi.CRYPT_MACHINE_KEYSET) mock_CryptGenKey.assert_called_with( - mock_HANDLE(), self.x509.cryptoapi.AT_SIGNATURE, - 0x08000000, mock_HANDLE()) + mock_HANDLE(), self.x509.cryptoapi.AT_KEYEXCHANGE, + 0x08000000, mock_byref(mock_HANDLE())) mock_CryptDestroyKey.assert_called_once_with( mock_HANDLE()) mock_CryptReleaseContext.assert_called_once_with( @@ -188,9 +190,9 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_uuid4, mock_get_cert_thumprint, mock_generate_key, mock_add_system_time_interval, - mock_malloc, mock_free, certstr, - certificate, enhanced_key, store_handle, - context_to_store): + mock_malloc, mock_free, + certstr, certificate, enhanced_key, + store_handle, context_to_store): mock_POINTER = self._ctypes.POINTER mock_byref = self._ctypes.byref @@ -202,8 +204,8 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_CertStrToName.return_value = certstr mock_CertOpenStore.return_value = store_handle mock_CertAddCertificateContextToStore.return_value = context_to_store - if (certstr is None or certificate is None or enhanced_key is None - or store_handle is None or context_to_store is None): + if (certstr is None or certificate is None or enhanced_key is None or + store_handle is None or context_to_store is None): self.assertRaises(self.x509.cryptoapi.CryptoAPIException, self._x509_manager.create_self_signed_cert, 'fake subject', 10, True, @@ -239,7 +241,8 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_CertCreateSelfSignCertificate()) mock_free.assert_called_once_with(mock_cast()) - self.assertEqual(mock_get_cert_thumprint.return_value, response) + self.assertEqual(response, + mock_get_cert_thumprint.return_value) mock_generate_key.assert_called_once_with('fake_name', True) @@ -435,3 +438,265 @@ class CryptoAPICertManagerTests(unittest.TestCase): def test_import_cert_CertGetNameString_fail(self): self._test_import_cert(crypttstr=True, store_handle='fake handle', add_enc_cert='fake encoded cert', upn_len=3) + + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CertAddCertificateContextToStore') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.CertOpenStore') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CertFindCertificateInStore') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CertFreeCertificateContext') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.CertCloseStore') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.PFXImportCertStore') + @mock.patch('ctypes.pointer') + @mock.patch('ctypes.POINTER') + @mock.patch('ctypes.cast') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.CRYPTOAPI_BLOB') + def _test_import_pfx_certificate(self, mock_blob, mock_cast, mock_POINTER, + mock_pointer, mock_import_cert_store, + mock_cert_close_store, + mock_cert_free_context, + mock_find_cert_in_store, + mock_cert_open_store, mock_add_cert_store, + import_store_handle, cert_context_p, + store_handle, machine_keyset=True, + add_cert_to_store=True): + + self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE = \ + mock.sentinel.local_machine + self.x509.cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER = \ + mock.sentinel.current_user + self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM = \ + mock.sentinel.store_prov_system + self.x509.cryptoapi.CERT_STORE_ADD_REPLACE_EXISTING = \ + mock.sentinel.cert_add_replace_existing + + if import_store_handle: + import_store_handle = mock.sentinel.import_store_handle + if cert_context_p: + cert_context_p = mock.sentinel.cert_context_p + if store_handle: + store_handle = mock.sentinel.store_handle + + mock_blob.return_value = mock.sentinel.pfx_blob + mock_import_cert_store.return_value = import_store_handle + mock_find_cert_in_store.return_value = cert_context_p + mock_cert_open_store.return_value = store_handle + mock_add_cert_store.return_value = add_cert_to_store + + if (not import_store_handle or not cert_context_p or + not store_handle or not add_cert_to_store): + with self.assertRaises(self.x509.cryptoapi.CryptoAPIException): + self._x509_manager.import_pfx_certificate( + str(mock.sentinel.pfx_data), machine_keyset=machine_keyset) + else: + self._x509_manager.import_pfx_certificate( + str(mock.sentinel.pfx_data), machine_keyset=machine_keyset) + + mock_blob.assert_called_once_with() + mock_cast.assert_called_with( + str(mock.sentinel.pfx_data), mock_POINTER.return_value) + mock_import_cert_store.assert_called_with( + mock_pointer.return_value, None, 0) + mock_pointer.assert_called_once_with(mock_blob.return_value) + if import_store_handle: + if cert_context_p: + if machine_keyset: + flags = mock.sentinel.local_machine + else: + flags = mock.sentinel.current_user + mock_cert_open_store.assert_called_once_with( + mock.sentinel.store_prov_system, 0, 0, flags, + six.text_type(self.x509.STORE_NAME_MY)) + if store_handle: + mock_add_cert_store.assert_called_once_with( + mock_cert_open_store.return_value, cert_context_p, + mock.sentinel.cert_add_replace_existing, None) + call_args = [] + if import_store_handle: + call_args += [mock.call(import_store_handle, 0)] + elif store_handle: + call_args += [mock.call(store_handle, 0)] + mock_cert_close_store.assert_has_calls(call_args) + if cert_context_p: + mock_cert_free_context.assert_called_once_with(cert_context_p) + + def test_import_pfx_certificate_no_import_store_handle(self): + self._test_import_pfx_certificate( + import_store_handle=None, cert_context_p=None, store_handle=None) + + def test_import_pfx_certificate_no_cert_context_p(self): + self._test_import_pfx_certificate( + import_store_handle=True, cert_context_p=None, store_handle=None) + + def test_import_pfx_certificate_no_store_handle(self): + self._test_import_pfx_certificate( + import_store_handle=True, cert_context_p=True, store_handle=None) + + def test_import_pfx_certificate_not_added(self): + self._test_import_pfx_certificate( + import_store_handle=True, cert_context_p=True, store_handle=True, + add_cert_to_store=False) + + def test_import_pfx_certificate(self): + self._test_import_pfx_certificate( + import_store_handle=True, cert_context_p=True, store_handle=True, + machine_keyset=False) + + def test_get_thumbprint_buffer(self): + mock_result = mock.Mock() + mock_result.contents = mock.sentinel.contents + self._ctypes.cast = mock.Mock(return_value=mock_result) + thumbprint_str = '5c5350ff' + result = self._x509_manager._get_thumbprint_buffer( + thumbprint_str) + self.assertEqual(result, mock_result.contents) + + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.CertCloseStore') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CertFindCertificateInStore') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.CertOpenStore') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.CRYPTOAPI_BLOB') + def _test_find_certificate_in_store(self, mock_blob, mock_OpenStore, + mock_FindCertificateInStore, + mock_CloseStore, machine_keyset=True, + store_handle=True, + cert_context_p=True): + self._x509_manager._get_thumbprint_buffer = mock.Mock() + (self._x509_manager._get_thumbprint_buffer. + return_value) = str(mock.sentinel.thumbprint) + mock_blob.return_value = mock.Mock() + mock_OpenStore.return_value = store_handle + mock_FindCertificateInStore.return_value = cert_context_p + if not store_handle or not cert_context_p: + with self.assertRaises(self.x509.cryptoapi.CryptoAPIException): + self._x509_manager._find_certificate_in_store( + mock.sentinel.thumbprint_str, machine_keyset) + else: + result = self._x509_manager._find_certificate_in_store( + mock.sentinel.thumbprint_str, machine_keyset) + self.assertEqual(result, cert_context_p) + + self._x509_manager._get_thumbprint_buffer.assert_called_once_with( + mock.sentinel.thumbprint_str) + mock_blob.assert_called_once_with() + if store_handle: + mock_CloseStore.assert_called_once_with(store_handle, 0) + + def test_find_certificate_in_store(self): + self._test_find_certificate_in_store(machine_keyset=None) + + def test_find_certificate_in_store_no_store_handle(self): + self._test_find_certificate_in_store(store_handle=False) + + def test_find_certificate_in_store_no_cert_context_p(self): + self._test_find_certificate_in_store(cert_context_p=False) + + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CertFreeCertificateContext') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CertDeleteCertificateFromStore') + def _test_delete_certificate_from_store(self, mock_delete_cert, + mock_free_cert, + cert_context_p=True, + delete_cert=True): + self._x509_manager._find_certificate_in_store = mock.Mock() + (self._x509_manager._find_certificate_in_store. + return_value) = cert_context_p + mock_delete_cert.return_value = delete_cert + + if not cert_context_p or not delete_cert: + with self.assertRaises(self.x509.cryptoapi.CryptoAPIException): + self._x509_manager.delete_certificate_from_store( + mock.sentinel.thumbprint_str, mock.sentinel.machine_keyset, + mock.sentinel.store_name) + else: + self._x509_manager.delete_certificate_from_store( + mock.sentinel.thumbprint_str, mock.sentinel.machine_keyset, + mock.sentinel.store_name) + + self._x509_manager._find_certificate_in_store.assert_called_once_with( + mock.sentinel.thumbprint_str, mock.sentinel.machine_keyset, + mock.sentinel.store_name) + if not cert_context_p: + self.assertEqual(mock_delete_cert.call_count, 0) + else: + mock_free_cert.assert_called_once_with(cert_context_p) + + def test_delete_certificate_from_store(self): + self._test_delete_certificate_from_store() + + def test_delete_certificate_from_store_no_cert_context_p(self): + self._test_delete_certificate_from_store(cert_context_p=False) + + def test_delete_certificate_from_store_delete_cert_failed(self): + self._test_delete_certificate_from_store(delete_cert=False) + + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CertFreeCertificateContext') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CRYPT_DECRYPT_MESSAGE_PARA') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CertCloseStore') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CertDeleteCertificateFromStore') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CryptDecryptMessage') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CertAddCertificateLinkToStore') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CertOpenStore') + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CryptStringToBinaryW') + def _test_decode_pkcs7_base64_blob(self, mock_StringToBinary, + mock_OpenStore, mock_AddCert, + mock_Decrypt, mock_DeleteCert, + mock_CloseStore, mock_decrypt_para, + mock_FreeCert, + string_to_binary_data=True, + store_handle=True, + string_to_binary_data_value=True, + add_cert=True, decrypt_by_ref=True, + decrypt_by_pointer=True): + data = str(mock.sentinel.data) + self._x509_manager._find_certificate_in_store = mock.Mock() + mock_StringToBinary.side_effect = [ + string_to_binary_data, string_to_binary_data_value] + mock_OpenStore.return_value = store_handle + mock_AddCert.return_value = add_cert + mock_Decrypt.side_effect = [decrypt_by_ref, decrypt_by_pointer] + + if (string_to_binary_data and store_handle and add_cert and + string_to_binary_data_value and decrypt_by_ref and + decrypt_by_pointer): + result = self._x509_manager.decode_pkcs7_base64_blob( + data, mock.sentinel.thumbprint_str, + mock.sentinel.machine_keyset, mock.sentinel.store_name) + self.assertEqual( + result, bytes(self._ctypes.create_string_buffer.return_value)) + else: + with self.assertRaises(self.x509.cryptoapi.CryptoAPIException): + self._x509_manager.decode_pkcs7_base64_blob( + data, mock.sentinel.thumbprint_str, + mock.sentinel.machine_keyset, mock.sentinel.store_name) + + def test_decode_pkcs7_base64_blob(self): + self._test_decode_pkcs7_base64_blob() + + def test_decode_pkcs7_base64_blob_encrypt_data_fails(self): + self._test_decode_pkcs7_base64_blob(string_to_binary_data=False) + + def test_decode_pkcs7_base64_blob_no_store_handle(self): + self._test_decode_pkcs7_base64_blob(store_handle=False) + + def test_decode_pkcs7_base64_blob_encrypt_data_value_fails(self): + self._test_decode_pkcs7_base64_blob(string_to_binary_data_value=False) + + def test_decode_pkcs7_base64_blob_add_certificate_fails(self): + self._test_decode_pkcs7_base64_blob(add_cert=False) + + def test_decode_pkcs7_base64_blob_decrypt_by_ref_fails(self): + self._test_decode_pkcs7_base64_blob(decrypt_by_ref=False) + + def test_decode_pkcs7_base64_blob_decrypt_by_pointer_fails(self): + self._test_decode_pkcs7_base64_blob(decrypt_by_pointer=False) diff --git a/cloudbaseinit/utils/encoding.py b/cloudbaseinit/utils/encoding.py index 9ac2f0b7..17435f42 100644 --- a/cloudbaseinit/utils/encoding.py +++ b/cloudbaseinit/utils/encoding.py @@ -38,3 +38,10 @@ def write_file(target_path, data, mode='wb'): with open(target_path, mode) as f: f.write(data) + + +def hex_to_bytes(value): + if six.PY2: + return value.decode("hex") + else: + return bytes.fromhex(value) diff --git a/cloudbaseinit/utils/windows/cryptoapi.py b/cloudbaseinit/utils/windows/cryptoapi.py index 7a24b5bf..7a4af24c 100644 --- a/cloudbaseinit/utils/windows/cryptoapi.py +++ b/cloudbaseinit/utils/windows/cryptoapi.py @@ -94,20 +94,49 @@ class CRYPT_KEY_PROV_INFO(ctypes.Structure): ] +class CRYPT_DECRYPT_MESSAGE_PARA(ctypes.Structure): + _fields_ = [ + ('cbSize', wintypes.DWORD), + ('dwMsgAndCertEncodingType', wintypes.DWORD), + ('cCertStore', wintypes.DWORD), + ('rghCertStore', ctypes.POINTER(wintypes.HANDLE)), + ('dwFlags', wintypes.DWORD), + ] + + +class CERT_KEY_CONTEXT(ctypes.Structure): + _fields_ = [ + ('cbSize', wintypes.DWORD), + ('hNCryptKey', wintypes.HANDLE), + ('dwKeySpec', wintypes.DWORD), + ] + + +AT_KEYEXCHANGE = 1 AT_SIGNATURE = 2 CERT_NAME_UPN_TYPE = 8 CERT_SHA1_HASH_PROP_ID = 3 CERT_STORE_ADD_REPLACE_EXISTING = 3 +CERT_STORE_PROV_MEMORY = wintypes.LPSTR(2) CERT_STORE_PROV_SYSTEM = wintypes.LPSTR(10) -CERT_SYSTEM_STORE_CURRENT_USER = 65536 -CERT_SYSTEM_STORE_LOCAL_MACHINE = 131072 +CERT_SYSTEM_STORE_CURRENT_USER = 0x10000 +CERT_SYSTEM_STORE_LOCAL_MACHINE = 0x20000 CERT_X500_NAME_STR = 3 +CERT_STORE_ADD_NEW = 1 +CERT_STORE_OPEN_EXISTING_FLAG = 0x4000 +CERT_STORE_CREATE_NEW_FLAG = 0x2000 +CRYPT_SILENT = 64 CRYPT_MACHINE_KEYSET = 32 CRYPT_NEWKEYSET = 8 CRYPT_STRING_BASE64 = 1 -PKCS_7_ASN_ENCODING = 65536 +PKCS_7_ASN_ENCODING = 0x10000 PROV_RSA_FULL = 1 X509_ASN_ENCODING = 1 +CERT_FIND_ANY = 0 +CERT_FIND_SHA1_HASH = 0x10000 +CERT_KEY_PROV_INFO_PROP_ID = 2 +CERT_KEY_CONTEXT_PROP_ID = 5 + szOID_PKIX_KP_SERVER_AUTH = b"1.3.6.1.5.5.7.3.1" szOID_RSA_SHA1RSA = b"1.2.840.113549.1.1.5" @@ -243,3 +272,61 @@ crypt32.CertGetCertificateContextProperty.argtypes = [ ctypes.c_void_p, ctypes.POINTER(wintypes.DWORD)] CertGetCertificateContextProperty = crypt32.CertGetCertificateContextProperty + +crypt32.CryptBinaryToStringW.restype = wintypes.BOOL +crypt32.CryptBinaryToStringW.argtypes = [ + ctypes.c_void_p, + wintypes.DWORD, + wintypes.DWORD, + wintypes.LPWSTR, + ctypes.POINTER(wintypes.DWORD)] +CryptBinaryToString = crypt32.CryptBinaryToStringW + +crypt32.CryptDecryptMessage.restype = wintypes.BOOL +crypt32.CryptDecryptMessage.argtypes = [ + ctypes.POINTER(CRYPT_DECRYPT_MESSAGE_PARA), + ctypes.c_void_p, + wintypes.DWORD, + ctypes.c_void_p, + ctypes.POINTER(wintypes.DWORD), + ctypes.c_void_p] +CryptDecryptMessage = crypt32.CryptDecryptMessage + +crypt32.CertAddCertificateLinkToStore.restype = wintypes.BOOL +crypt32.CertAddCertificateLinkToStore.argtypes = [ + wintypes.HANDLE, + ctypes.POINTER(CERT_CONTEXT), + wintypes.DWORD, + ctypes.c_void_p +] +CertAddCertificateLinkToStore = crypt32.CertAddCertificateLinkToStore + +crypt32.CertFindCertificateInStore.restype = ctypes.POINTER(CERT_CONTEXT) +crypt32.CertFindCertificateInStore.argtypes = [ + wintypes.HANDLE, + wintypes.DWORD, + wintypes.DWORD, + wintypes.DWORD, + ctypes.c_void_p, + ctypes.c_void_p] +CertFindCertificateInStore = crypt32.CertFindCertificateInStore + +crypt32.CertSetCertificateContextProperty.restype = wintypes.BOOL +crypt32.CertSetCertificateContextProperty.argtypes = [ + ctypes.POINTER(CERT_CONTEXT), + wintypes.DWORD, + wintypes.DWORD, + ctypes.c_void_p] +CertSetCertificateContextProperty = crypt32.CertSetCertificateContextProperty + +crypt32.PFXImportCertStore.restype = wintypes.HANDLE +crypt32.PFXImportCertStore.argtypes = [ + ctypes.POINTER(CRYPTOAPI_BLOB), + wintypes.LPCWSTR, + wintypes.DWORD] +PFXImportCertStore = crypt32.PFXImportCertStore + +crypt32.CertDeleteCertificateFromStore.restype = wintypes.BOOL +crypt32.CertDeleteCertificateFromStore.argtypes = [ + ctypes.POINTER(CERT_CONTEXT)] +CertDeleteCertificateFromStore = crypt32.CertDeleteCertificateFromStore diff --git a/cloudbaseinit/utils/windows/x509.py b/cloudbaseinit/utils/windows/x509.py index 202614d0..4556087d 100644 --- a/cloudbaseinit/utils/windows/x509.py +++ b/cloudbaseinit/utils/windows/x509.py @@ -19,6 +19,7 @@ import uuid import six +from cloudbaseinit.utils import encoding from cloudbaseinit.utils.windows import cryptoapi from cloudbaseinit.utils import x509constants @@ -40,6 +41,26 @@ X509_END_DATE_INTERVAL = 10 * 365 * 24 * 60 * 60 * 10000000 class CryptoAPICertManager(object): + @staticmethod + def _get_thumprint_str(thumbprint, size): + thumbprint_ar = ctypes.cast( + thumbprint, + ctypes.POINTER(ctypes.c_ubyte * + size)).contents + + thumbprint_str = "" + for b in thumbprint_ar: + thumbprint_str += "%02x" % b + return thumbprint_str + + @staticmethod + def _get_thumbprint_buffer(thumbprint_str): + thumbprint_bytes = encoding.hex_to_bytes(thumbprint_str) + return ctypes.cast( + ctypes.create_string_buffer(thumbprint_bytes), + ctypes.POINTER(wintypes.BYTE * + len(thumbprint_bytes))).contents + def _get_cert_thumprint(self, cert_context_p): thumbprint = None @@ -61,15 +82,7 @@ class CryptoAPICertManager(object): thumbprint, ctypes.byref(thumprint_len)): raise cryptoapi.CryptoAPIException() - thumbprint_ar = ctypes.cast( - thumbprint, - ctypes.POINTER(ctypes.c_ubyte * - thumprint_len.value)).contents - - thumbprint_str = "" - for b in thumbprint_ar: - thumbprint_str += "%02x" % b - return thumbprint_str + return self._get_thumprint_str(thumbprint, thumprint_len.value) finally: if thumbprint: free(thumbprint) @@ -100,9 +113,12 @@ class CryptoAPICertManager(object): # RSA 2048 bits if not cryptoapi.CryptGenKey(crypt_prov_handle, - cryptoapi.AT_SIGNATURE, - 0x08000000, key_handle): + cryptoapi.AT_KEYEXCHANGE, + 0x08000000, + ctypes.byref(key_handle)): raise cryptoapi.CryptoAPIException() + + return key_handle finally: if key_handle: cryptoapi.CryptDestroyKey(key_handle) @@ -171,7 +187,7 @@ class CryptoAPICertManager(object): key_prov_info.dwProvType = cryptoapi.PROV_RSA_FULL key_prov_info.cProvParam = None key_prov_info.rgProvParam = None - key_prov_info.dwKeySpec = cryptoapi.AT_SIGNATURE + key_prov_info.dwKeySpec = cryptoapi.AT_KEYEXCHANGE if machine_keyset: key_prov_info.dwFlags = cryptoapi.CRYPT_MACHINE_KEYSET @@ -243,6 +259,190 @@ class CryptoAPICertManager(object): cert_data = cert_data.replace(remove, "") return cert_data + def _find_certificate_in_store(self, thumbprint_str, machine_keyset=True, + store_name=STORE_NAME_MY): + store_handle = None + + thumbprint = self._get_thumbprint_buffer(thumbprint_str) + hash_blob = cryptoapi.CRYPTOAPI_BLOB() + hash_blob.cbData = len(thumbprint) + hash_blob.pbData = thumbprint + + try: + flags = cryptoapi.CERT_STORE_OPEN_EXISTING_FLAG + if machine_keyset: + flags |= cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE + else: + flags |= cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER + + store_handle = cryptoapi.CertOpenStore( + cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags, + six.text_type(store_name)) + if not store_handle: + raise cryptoapi.CryptoAPIException() + + cert_context_p = cryptoapi.CertFindCertificateInStore( + store_handle, + cryptoapi.X509_ASN_ENCODING | cryptoapi.PKCS_7_ASN_ENCODING, + 0, + cryptoapi.CERT_FIND_SHA1_HASH, + ctypes.pointer(hash_blob), + None) + if not cert_context_p: + raise cryptoapi.CryptoAPIException() + + return cert_context_p + finally: + if store_handle: + cryptoapi.CertCloseStore(store_handle, 0) + + def delete_certificate_from_store(self, thumbprint_str, + machine_keyset=True, + store_name=STORE_NAME_MY): + cert_context_p = None + + try: + cert_context_p = self._find_certificate_in_store( + thumbprint_str, machine_keyset, store_name) + if not cert_context_p: + raise cryptoapi.CryptoAPIException() + + if not cryptoapi.CertDeleteCertificateFromStore(cert_context_p): + raise cryptoapi.CryptoAPIException() + finally: + if cert_context_p: + cryptoapi.CertFreeCertificateContext(cert_context_p) + + def import_pfx_certificate(self, pfx_data, pfx_password=None, + machine_keyset=True, store_name=STORE_NAME_MY): + cert_context_p = None + import_store_handle = None + store_handle = None + + try: + pfx_blob = cryptoapi.CRYPTOAPI_BLOB() + pfx_blob.cbData = len(pfx_data) + pfx_blob.pbData = ctypes.cast( + pfx_data, ctypes.POINTER(wintypes.BYTE)) + + import_store_handle = cryptoapi.PFXImportCertStore( + ctypes.pointer(pfx_blob), pfx_password, 0) + if not import_store_handle: + raise cryptoapi.CryptoAPIException() + + cert_context_p = cryptoapi.CertFindCertificateInStore( + import_store_handle, + cryptoapi.X509_ASN_ENCODING | cryptoapi.PKCS_7_ASN_ENCODING, + 0, cryptoapi.CERT_FIND_ANY, None, None) + if not cert_context_p: + raise cryptoapi.CryptoAPIException() + + if machine_keyset: + flags = cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE + else: + flags = cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER + + store_handle = cryptoapi.CertOpenStore( + cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags, + six.text_type(store_name)) + if not store_handle: + raise cryptoapi.CryptoAPIException() + + if not cryptoapi.CertAddCertificateContextToStore( + store_handle, cert_context_p, + cryptoapi.CERT_STORE_ADD_REPLACE_EXISTING, None): + raise cryptoapi.CryptoAPIException() + + finally: + if import_store_handle: + cryptoapi.CertCloseStore(import_store_handle, 0) + if cert_context_p: + cryptoapi.CertFreeCertificateContext(cert_context_p) + if store_handle: + cryptoapi.CertCloseStore(store_handle, 0) + + def decode_pkcs7_base64_blob(self, data, thumbprint_str, + machine_keyset=True, + store_name=STORE_NAME_MY): + base64_data = data.replace('\r', '').replace('\n', '') + store_handle = None + cert_context_p = None + + try: + data_encoded_len = wintypes.DWORD() + + if not cryptoapi.CryptStringToBinaryW( + base64_data, len(base64_data), + cryptoapi.CRYPT_STRING_BASE64, + None, ctypes.byref(data_encoded_len), + None, None): + raise cryptoapi.CryptoAPIException() + + data_encoded = ctypes.cast( + ctypes.create_string_buffer(data_encoded_len.value), + ctypes.POINTER(wintypes.BYTE)) + + if not cryptoapi.CryptStringToBinaryW( + base64_data, len(base64_data), + cryptoapi.CRYPT_STRING_BASE64, + data_encoded, ctypes.byref(data_encoded_len), + None, None): + raise cryptoapi.CryptoAPIException() + + store_handle = cryptoapi.CertOpenStore( + cryptoapi.CERT_STORE_PROV_MEMORY, + cryptoapi.X509_ASN_ENCODING | cryptoapi.PKCS_7_ASN_ENCODING, + None, cryptoapi.CERT_STORE_CREATE_NEW_FLAG, None) + if not store_handle: + raise cryptoapi.CryptoAPIException() + + cert_context_p = self._find_certificate_in_store( + thumbprint_str, machine_keyset, store_name) + + if not cryptoapi.CertAddCertificateLinkToStore( + store_handle, cert_context_p, + cryptoapi.CERT_STORE_ADD_NEW, None): + raise cryptoapi.CryptoAPIException() + + para = cryptoapi.CRYPT_DECRYPT_MESSAGE_PARA() + para.cbSize = ctypes.sizeof(cryptoapi.CRYPT_DECRYPT_MESSAGE_PARA) + para.dwMsgAndCertEncodingType = (cryptoapi.X509_ASN_ENCODING | + cryptoapi.PKCS_7_ASN_ENCODING) + para.cCertStore = 1 + para.rghCertStore = ctypes.pointer(wintypes.HANDLE(store_handle)) + para.dwFlags = cryptoapi.CRYPT_SILENT + + data_decoded_len = wintypes.DWORD() + if not cryptoapi.CryptDecryptMessage( + ctypes.byref(para), + data_encoded, + data_encoded_len, + None, + ctypes.byref(data_decoded_len), + None): + raise cryptoapi.CryptoAPIException() + + data_decoded_buf = ctypes.create_string_buffer( + data_decoded_len.value) + data_decoded = ctypes.cast( + data_decoded_buf, ctypes.POINTER(wintypes.BYTE)) + + if not cryptoapi.CryptDecryptMessage( + ctypes.pointer(para), + data_encoded, + data_encoded_len, + data_decoded, + ctypes.byref(data_decoded_len), + None): + raise cryptoapi.CryptoAPIException() + + return bytes(data_decoded_buf) + finally: + if cert_context_p: + cryptoapi.CertFreeCertificateContext(cert_context_p) + if store_handle: + cryptoapi.CertCloseStore(store_handle, 0) + def import_cert(self, cert_data, machine_keyset=True, store_name=STORE_NAME_MY):