diff --git a/cloudbaseinit/tests/utils/windows/test_x509.py b/cloudbaseinit/tests/utils/windows/test_x509.py index 3075ce2d..0b442368 100644 --- a/cloudbaseinit/tests/utils/windows/test_x509.py +++ b/cloudbaseinit/tests/utils/windows/test_x509.py @@ -14,64 +14,68 @@ # License for the specific language governing permissions and limitations # under the License. +import importlib import mock -import sys import unittest -from oslo.config import cfg - from cloudbaseinit.utils import s from cloudbaseinit.utils import x509constants -if sys.platform == 'win32': - from cloudbaseinit.utils.windows import cryptoapi - from cloudbaseinit.utils.windows import x509 -CONF = cfg.CONF - - -@unittest.skipUnless(sys.platform == "win32", "requires Windows") class CryptoAPICertManagerTests(unittest.TestCase): def setUp(self): - self._x509 = x509.CryptoAPICertManager() + self._ctypes = mock.MagicMock() + + self._module_patcher = mock.patch.dict( + 'sys.modules', {'ctypes': self._ctypes}) + + self._module_patcher.start() + + self.x509 = importlib.import_module("cloudbaseinit.utils.windows.x509") + self._x509_manager = self.x509.CryptoAPICertManager() + + def tearDown(self): + self._module_patcher.stop() @mock.patch('cloudbaseinit.utils.windows.x509.free') - @mock.patch('ctypes.c_ubyte') - @mock.patch('ctypes.POINTER') - @mock.patch('ctypes.cast') @mock.patch('cloudbaseinit.utils.windows.x509.malloc') - @mock.patch('ctypes.byref') - @mock.patch('ctypes.wintypes.DWORD') @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' 'CertGetCertificateContextProperty') def _test_get_cert_thumprint(self, mock_CertGetCertificateContextProperty, - mock_DWORD, mock_byref, mock_malloc, - mock_cast, mock_POINTER, mock_c_ubyte, - mock_free, ret_val): + mock_malloc, mock_free, ret_val): + mock_DWORD = self._ctypes.wintypes.DWORD + mock_cast = self._ctypes.cast + mock_POINTER = self._ctypes.POINTER + mock_byref = self._ctypes.byref mock_pointer = mock.MagicMock() fake_cert_context_p = 'fake context' - mock_DWORD().value = 10 + mock_DWORD.return_value.value = 10 mock_CertGetCertificateContextProperty.return_value = ret_val mock_POINTER.return_value = mock_pointer - mock_cast().contents = [16] + mock_cast.return_value.contents = [16] + if not ret_val: - self.assertRaises(cryptoapi.CryptoAPIException, - self._x509._get_cert_thumprint, + self.assertRaises(self.x509.cryptoapi.CryptoAPIException, + self._x509_manager._get_cert_thumprint, fake_cert_context_p) else: expected = [mock.call(fake_cert_context_p, - cryptoapi.CERT_SHA1_HASH_PROP_ID, - None, mock_byref()), + self.x509.cryptoapi.CERT_SHA1_HASH_PROP_ID, + None, mock_byref.return_value), mock.call(fake_cert_context_p, - cryptoapi.CERT_SHA1_HASH_PROP_ID, - mock_malloc(), mock_byref())] - response = self._x509._get_cert_thumprint(fake_cert_context_p) + self.x509.cryptoapi.CERT_SHA1_HASH_PROP_ID, + mock_malloc.return_value, + mock_byref.return_value)] + + response = self._x509_manager._get_cert_thumprint( + fake_cert_context_p) + self.assertEqual( mock_CertGetCertificateContextProperty.call_args_list, expected) - mock_malloc.assert_called_with(mock_DWORD()) + mock_malloc.assert_called_with(mock_DWORD.return_value) mock_cast.assert_called_with(mock_malloc(), mock_pointer) mock_free.assert_called_with(mock_malloc()) self.assertEqual(response, '10') @@ -86,31 +90,35 @@ class CryptoAPICertManagerTests(unittest.TestCase): @mock.patch('cloudbaseinit.utils.windows.cryptoapi.CryptReleaseContext') @mock.patch('cloudbaseinit.utils.windows.cryptoapi.CryptGenKey') @mock.patch('cloudbaseinit.utils.windows.cryptoapi.CryptAcquireContext') - @mock.patch('ctypes.byref') - @mock.patch('ctypes.wintypes.HANDLE') - def _test_generate_key(self, mock_HANDLE, mock_byref, - mock_CryptAcquireContext, mock_CryptGenKey, + def _test_generate_key(self, mock_CryptAcquireContext, mock_CryptGenKey, mock_CryptReleaseContext, mock_CryptDestroyKey, acquired_context, generate_key_ret_val): + + mock_HANDLE = self._ctypes.wintypes.HANDLE + mock_byref = self._ctypes.byref + mock_CryptAcquireContext.return_value = acquired_context mock_CryptGenKey.return_value = generate_key_ret_val + if not acquired_context: - self.assertRaises(cryptoapi.CryptoAPIException, - self._x509._generate_key, + self.assertRaises(self.x509.cryptoapi.CryptoAPIException, + self._x509_manager._generate_key, 'fake container', True) else: - if generate_key_ret_val is None: - self.assertRaises(cryptoapi.CryptoAPIException, - self._x509._generate_key, 'fake container', - True) + if not generate_key_ret_val: + self.assertRaises(self.x509.cryptoapi.CryptoAPIException, + self._x509_manager._generate_key, + 'fake container', True) else: - self._x509._generate_key('fake container', True) + self._x509_manager._generate_key('fake container', True) + mock_CryptAcquireContext.assert_called_with( mock_byref(), 'fake container', None, - cryptoapi.PROV_RSA_FULL, cryptoapi.CRYPT_MACHINE_KEYSET) - mock_CryptGenKey.assert_called_with(mock_HANDLE(), - cryptoapi.AT_SIGNATURE, - 0x08000000, mock_HANDLE()) + self.x509.cryptoapi.PROV_RSA_FULL, + self.x509.cryptoapi.CRYPT_MACHINE_KEYSET) + mock_CryptGenKey.assert_called_with( + mock_HANDLE(), self.x509.cryptoapi.AT_SIGNATURE, + 0x08000000, mock_HANDLE()) mock_CryptDestroyKey.assert_called_once_with( mock_HANDLE()) mock_CryptReleaseContext.assert_called_once_with( @@ -130,16 +138,12 @@ class CryptoAPICertManagerTests(unittest.TestCase): @mock.patch('cloudbaseinit.utils.windows.x509.free') @mock.patch('copy.copy') - @mock.patch('ctypes.byref') @mock.patch('cloudbaseinit.utils.windows.x509.malloc') - @mock.patch('ctypes.POINTER') - @mock.patch('ctypes.cast') @mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager' '._generate_key') @mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager' '._get_cert_thumprint') @mock.patch('uuid.uuid4') - @mock.patch('ctypes.wintypes.DWORD') @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' 'CertStrToName') @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' @@ -174,13 +178,16 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_CRYPT_ALGORITHM_IDENTIFIER, mock_CRYPT_KEY_PROV_INFO, mock_CRYPTOAPI_BLOB, - mock_CertStrToName, mock_DWORD, + mock_CertStrToName, mock_uuid4, mock_get_cert_thumprint, - mock_generate_key, mock_cast, - mock_POINTER, mock_malloc, mock_byref, + mock_generate_key, mock_malloc, mock_copy, mock_free, certstr, - certificate, enhanced_key, - store_handle, context_to_store): + certificate, enhanced_key, store_handle, + context_to_store): + + mock_POINTER = self._ctypes.POINTER + mock_byref = self._ctypes.byref + mock_cast = self._ctypes.cast mock_uuid4.return_value = 'fake_name' mock_CertCreateSelfSignCertificate.return_value = certificate @@ -190,11 +197,12 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_CertAddCertificateContextToStore.return_value = context_to_store if (certstr is None or certificate is None or enhanced_key is None or store_handle is None or context_to_store is None): - self.assertRaises(cryptoapi.CryptoAPIException, - self._x509.create_self_signed_cert, - 'fake subject', 10, True, x509.STORE_NAME_MY) + self.assertRaises(self.x509.cryptoapi.CryptoAPIException, + self._x509_manager.create_self_signed_cert, + 'fake subject', 10, True, + self.x509.STORE_NAME_MY) else: - response = self._x509.create_self_signed_cert( + response = self._x509_manager.create_self_signed_cert( subject='fake subject') mock_cast.assert_called_with(mock_malloc(), mock_POINTER()) mock_CRYPTOAPI_BLOB.assert_called_once_with() @@ -208,11 +216,11 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_byref(), mock_byref(), mock_byref(), None) mock_CertAddEnhancedKeyUsageIdentifier.assert_called_with( mock_CertCreateSelfSignCertificate(), - cryptoapi.szOID_PKIX_KP_SERVER_AUTH) + self.x509.cryptoapi.szOID_PKIX_KP_SERVER_AUTH) mock_CertOpenStore.assert_called_with( - cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, - cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE, - s.unicode(x509.STORE_NAME_MY)) + self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, + self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE, + s.unicode(self.x509.STORE_NAME_MY)) mock_get_cert_thumprint.assert_called_once_with( mock_CertCreateSelfSignCertificate()) @@ -272,7 +280,7 @@ class CryptoAPICertManagerTests(unittest.TestCase): fake_cert_data += x509constants.PEM_HEADER + '\n' fake_cert_data += 'fake cert' + '\n' fake_cert_data += x509constants.PEM_FOOTER - response = self._x509._get_cert_base64(fake_cert_data) + response = self._x509_manager._get_cert_base64(fake_cert_data) self.assertEqual(response, 'fake cert') @mock.patch('cloudbaseinit.utils.windows.x509.free') @@ -292,15 +300,8 @@ class CryptoAPICertManagerTests(unittest.TestCase): 'CryptStringToBinaryA') @mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager' '._get_cert_base64') - @mock.patch('ctypes.POINTER') @mock.patch('cloudbaseinit.utils.windows.x509.malloc') - @mock.patch('ctypes.cast') - @mock.patch('ctypes.byref') - @mock.patch('ctypes.wintypes.DWORD') - @mock.patch('ctypes.create_unicode_buffer') - def _test_import_cert(self, mock_create_unicode_buffer, mock_DWORD, - mock_byref, mock_cast, - mock_malloc, mock_POINTER, mock_get_cert_base64, + def _test_import_cert(self, mock_malloc, mock_get_cert_base64, mock_CryptStringToBinaryA, mock_CertOpenStore, mock_CertAddEncodedCertificateToStore, mock_CertGetNameString, @@ -308,6 +309,13 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_CertCloseStore, mock_get_cert_thumprint, mock_free, crypttstr, store_handle, add_enc_cert, upn_len): + mock_POINTER = self._ctypes.POINTER + mock_cast = self._ctypes.cast + mock_byref = self._ctypes.byref + mock_DWORD = self._ctypes.wintypes.DWORD + + mock_create_unicode_buffer = self._ctypes.create_unicode_buffer + fake_cert_data = '' fake_cert_data += x509constants.PEM_HEADER + '\n' fake_cert_data += 'fake cert' + '\n' @@ -319,45 +327,55 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_CertGetNameString.side_effect = [2, upn_len] expected = [mock.call('fake cert', len('fake cert'), - cryptoapi.CRYPT_STRING_BASE64, None, + self.x509.cryptoapi.CRYPT_STRING_BASE64, None, mock_byref(), None, None), mock.call('fake cert', len('fake cert'), - cryptoapi.CRYPT_STRING_BASE64, mock_cast(), - mock_byref(), None, None)] - expected2 = [mock.call(mock_POINTER()(), cryptoapi.CERT_NAME_UPN_TYPE, + self.x509.cryptoapi.CRYPT_STRING_BASE64, + mock_cast(), mock_byref(), None, None)] + expected2 = [mock.call(mock_POINTER()(), + self.x509.cryptoapi.CERT_NAME_UPN_TYPE, 0, None, None, 0), - mock.call(mock_POINTER()(), cryptoapi.CERT_NAME_UPN_TYPE, + mock.call(mock_POINTER()(), + self.x509.cryptoapi.CERT_NAME_UPN_TYPE, 0, None, mock_create_unicode_buffer(), 2)] if (not crypttstr or store_handle is None or add_enc_cert is None or upn_len != 2): - self.assertRaises(cryptoapi.CryptoAPIException, - self._x509.import_cert, fake_cert_data, True, - x509.STORE_NAME_MY) + self.assertRaises(self.x509.cryptoapi.CryptoAPIException, + self._x509_manager.import_cert, fake_cert_data, + True, self.x509.STORE_NAME_MY) else: - response = self._x509.import_cert(fake_cert_data) + response = self._x509_manager.import_cert(fake_cert_data) + mock_cast.assert_called_with(mock_malloc(), mock_POINTER()) self.assertEqual(mock_CryptStringToBinaryA.call_args_list, expected) mock_CertOpenStore.assert_called_with( - cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, - cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE, - s.unicode(x509.STORE_NAME_MY)) + self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, + self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE, + s.unicode(self.x509.STORE_NAME_MY)) + mock_CertAddEncodedCertificateToStore.assert_called_with( mock_CertOpenStore(), - cryptoapi.X509_ASN_ENCODING | cryptoapi.PKCS_7_ASN_ENCODING, + self.x509.cryptoapi.X509_ASN_ENCODING | + self.x509.cryptoapi.PKCS_7_ASN_ENCODING, mock_cast(), mock_DWORD(), - cryptoapi.CERT_STORE_ADD_REPLACE_EXISTING, mock_byref()) + self.x509.cryptoapi.CERT_STORE_ADD_REPLACE_EXISTING, + mock_byref()) + mock_create_unicode_buffer.assert_called_with(2) self.assertEqual(mock_CertGetNameString.call_args_list, expected2) mock_get_cert_thumprint.assert_called_once_with(mock_POINTER()()) + mock_CertFreeCertificateContext.assert_called_once_with( mock_POINTER()()) mock_CertCloseStore.assert_called_once_with( mock_CertOpenStore(), 0) + mock_free.assert_called_once_with(mock_cast()) self.assertEqual(response, (mock_get_cert_thumprint(), mock_create_unicode_buffer().value)) + mock_get_cert_base64.assert_called_with(fake_cert_data) def test_import_cert(self):