Replaces s.unicode with six.text_type
six.text_type() provides already the feature provided by s.unicode().
This commit is contained in:
parent
2b3ee9302a
commit
ceb4ae92d3
@ -17,6 +17,7 @@
|
|||||||
import ctypes
|
import ctypes
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import six
|
||||||
import time
|
import time
|
||||||
import win32process
|
import win32process
|
||||||
import win32security
|
import win32security
|
||||||
@ -29,7 +30,6 @@ from win32com import client
|
|||||||
|
|
||||||
from cloudbaseinit.openstack.common import log as logging
|
from cloudbaseinit.openstack.common import log as logging
|
||||||
from cloudbaseinit.osutils import base
|
from cloudbaseinit.osutils import base
|
||||||
from cloudbaseinit.utils import s
|
|
||||||
from cloudbaseinit.utils.windows import network
|
from cloudbaseinit.utils.windows import network
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -372,7 +372,7 @@ class WindowsUtils(base.BaseOSUtils):
|
|||||||
sidNameUse = wintypes.DWORD()
|
sidNameUse = wintypes.DWORD()
|
||||||
|
|
||||||
ret_val = advapi32.LookupAccountNameW(
|
ret_val = advapi32.LookupAccountNameW(
|
||||||
0, s.unicode(username), sid, ctypes.byref(cbSid), domainName,
|
0, six.text_type(username), sid, ctypes.byref(cbSid), domainName,
|
||||||
ctypes.byref(cchReferencedDomainName), ctypes.byref(sidNameUse))
|
ctypes.byref(cchReferencedDomainName), ctypes.byref(sidNameUse))
|
||||||
if not ret_val:
|
if not ret_val:
|
||||||
raise Exception("Cannot get user SID")
|
raise Exception("Cannot get user SID")
|
||||||
@ -382,10 +382,10 @@ class WindowsUtils(base.BaseOSUtils):
|
|||||||
def add_user_to_local_group(self, username, groupname):
|
def add_user_to_local_group(self, username, groupname):
|
||||||
|
|
||||||
lmi = Win32_LOCALGROUP_MEMBERS_INFO_3()
|
lmi = Win32_LOCALGROUP_MEMBERS_INFO_3()
|
||||||
lmi.lgrmi3_domainandname = s.unicode(username)
|
lmi.lgrmi3_domainandname = six.text_type(username)
|
||||||
|
|
||||||
ret_val = netapi32.NetLocalGroupAddMembers(0, s.unicode(groupname), 3,
|
ret_val = netapi32.NetLocalGroupAddMembers(0, six.text_type(groupname),
|
||||||
ctypes.addressof(lmi), 1)
|
3, ctypes.addressof(lmi), 1)
|
||||||
|
|
||||||
if ret_val == self.NERR_GroupNotFound:
|
if ret_val == self.NERR_GroupNotFound:
|
||||||
raise Exception('Group not found')
|
raise Exception('Group not found')
|
||||||
@ -410,8 +410,9 @@ class WindowsUtils(base.BaseOSUtils):
|
|||||||
def create_user_logon_session(self, username, password, domain='.',
|
def create_user_logon_session(self, username, password, domain='.',
|
||||||
load_profile=True):
|
load_profile=True):
|
||||||
token = wintypes.HANDLE()
|
token = wintypes.HANDLE()
|
||||||
ret_val = advapi32.LogonUserW(s.unicode(username), s.unicode(domain),
|
ret_val = advapi32.LogonUserW(six.text_type(username),
|
||||||
s.unicode(password), 2, 0,
|
six.text_type(domain),
|
||||||
|
six.text_type(password), 2, 0,
|
||||||
ctypes.byref(token))
|
ctypes.byref(token))
|
||||||
if not ret_val:
|
if not ret_val:
|
||||||
raise Exception("User logon failed")
|
raise Exception("User logon failed")
|
||||||
@ -419,7 +420,7 @@ class WindowsUtils(base.BaseOSUtils):
|
|||||||
if load_profile:
|
if load_profile:
|
||||||
pi = Win32_PROFILEINFO()
|
pi = Win32_PROFILEINFO()
|
||||||
pi.dwSize = ctypes.sizeof(Win32_PROFILEINFO)
|
pi.dwSize = ctypes.sizeof(Win32_PROFILEINFO)
|
||||||
pi.lpUserName = s.unicode(username)
|
pi.lpUserName = six.text_type(username)
|
||||||
ret_val = userenv.LoadUserProfileW(token, ctypes.byref(pi))
|
ret_val = userenv.LoadUserProfileW(token, ctypes.byref(pi))
|
||||||
if not ret_val:
|
if not ret_val:
|
||||||
kernel32.CloseHandle(token)
|
kernel32.CloseHandle(token)
|
||||||
@ -446,7 +447,7 @@ class WindowsUtils(base.BaseOSUtils):
|
|||||||
def set_host_name(self, new_host_name):
|
def set_host_name(self, new_host_name):
|
||||||
ret_val = kernel32.SetComputerNameExW(
|
ret_val = kernel32.SetComputerNameExW(
|
||||||
self.ComputerNamePhysicalDnsHostname,
|
self.ComputerNamePhysicalDnsHostname,
|
||||||
s.unicode(new_host_name))
|
six.text_type(new_host_name))
|
||||||
if not ret_val:
|
if not ret_val:
|
||||||
raise Exception("Cannot set host name")
|
raise Exception("Cannot set host name")
|
||||||
|
|
||||||
@ -753,7 +754,7 @@ class WindowsUtils(base.BaseOSUtils):
|
|||||||
def get_volume_label(self, drive):
|
def get_volume_label(self, drive):
|
||||||
max_label_size = 261
|
max_label_size = 261
|
||||||
label = ctypes.create_unicode_buffer(max_label_size)
|
label = ctypes.create_unicode_buffer(max_label_size)
|
||||||
ret_val = kernel32.GetVolumeInformationW(s.unicode(drive), label,
|
ret_val = kernel32.GetVolumeInformationW(six.text_type(drive), label,
|
||||||
max_label_size, 0, 0, 0, 0, 0)
|
max_label_size, 0, 0, 0, 0, 0)
|
||||||
if ret_val:
|
if ret_val:
|
||||||
return label.value
|
return label.value
|
||||||
|
@ -18,13 +18,11 @@ import ctypes
|
|||||||
import importlib
|
import importlib
|
||||||
import mock
|
import mock
|
||||||
import os
|
import os
|
||||||
|
import six
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from cloudbaseinit.utils import s
|
|
||||||
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
@ -273,7 +271,7 @@ class WindowsUtilsTest(unittest.TestCase):
|
|||||||
response = self._winutils._get_user_sid_and_domain(self._USERNAME)
|
response = self._winutils._get_user_sid_and_domain(self._USERNAME)
|
||||||
|
|
||||||
advapi32.LookupAccountNameW.assert_called_with(
|
advapi32.LookupAccountNameW.assert_called_with(
|
||||||
0, s.unicode(self._USERNAME), sid,
|
0, six.text_type(self._USERNAME), sid,
|
||||||
self._ctypes_mock.byref(cbSid), domainName,
|
self._ctypes_mock.byref(cbSid), domainName,
|
||||||
self._ctypes_mock.byref(cchReferencedDomainName),
|
self._ctypes_mock.byref(cchReferencedDomainName),
|
||||||
self._ctypes_mock.byref(sidNameUse))
|
self._ctypes_mock.byref(sidNameUse))
|
||||||
@ -308,12 +306,12 @@ class WindowsUtilsTest(unittest.TestCase):
|
|||||||
group_name)
|
group_name)
|
||||||
|
|
||||||
netapi32.NetLocalGroupAddMembers.assert_called_with(
|
netapi32.NetLocalGroupAddMembers.assert_called_with(
|
||||||
0, s.unicode(group_name), 3,
|
0, six.text_type(group_name), 3,
|
||||||
self._ctypes_mock.addressof.return_value, 1)
|
self._ctypes_mock.addressof.return_value, 1)
|
||||||
|
|
||||||
self._ctypes_mock.addressof.assert_called_once_with(lmi)
|
self._ctypes_mock.addressof.assert_called_once_with(lmi)
|
||||||
self.assertEqual(lmi.lgrmi3_domainandname,
|
self.assertEqual(lmi.lgrmi3_domainandname,
|
||||||
s.unicode(self._USERNAME))
|
six.text_type(self._USERNAME))
|
||||||
|
|
||||||
def test_add_user_to_local_group_no_error(self):
|
def test_add_user_to_local_group_no_error(self):
|
||||||
self._test_add_user_to_local_group(ret_value=0)
|
self._test_add_user_to_local_group(ret_value=0)
|
||||||
@ -461,7 +459,7 @@ class WindowsUtilsTest(unittest.TestCase):
|
|||||||
|
|
||||||
mock_SetComputerNameExW.assert_called_with(
|
mock_SetComputerNameExW.assert_called_with(
|
||||||
self._winutils.ComputerNamePhysicalDnsHostname,
|
self._winutils.ComputerNamePhysicalDnsHostname,
|
||||||
s.unicode('fake name'))
|
six.text_type('fake name'))
|
||||||
|
|
||||||
def test_set_host_name(self):
|
def test_set_host_name(self):
|
||||||
self._test_set_host_name(ret_value='fake response')
|
self._test_set_host_name(ret_value='fake response')
|
||||||
|
@ -16,9 +16,9 @@
|
|||||||
|
|
||||||
import importlib
|
import importlib
|
||||||
import mock
|
import mock
|
||||||
|
import six
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from cloudbaseinit.utils import s
|
|
||||||
from cloudbaseinit.utils import x509constants
|
from cloudbaseinit.utils import x509constants
|
||||||
|
|
||||||
|
|
||||||
@ -220,7 +220,7 @@ class CryptoAPICertManagerTests(unittest.TestCase):
|
|||||||
mock_CertOpenStore.assert_called_with(
|
mock_CertOpenStore.assert_called_with(
|
||||||
self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0,
|
self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0,
|
||||||
self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE,
|
self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE,
|
||||||
s.unicode(self.x509.STORE_NAME_MY))
|
six.text_type(self.x509.STORE_NAME_MY))
|
||||||
mock_get_cert_thumprint.assert_called_once_with(
|
mock_get_cert_thumprint.assert_called_once_with(
|
||||||
mock_CertCreateSelfSignCertificate())
|
mock_CertCreateSelfSignCertificate())
|
||||||
|
|
||||||
@ -353,7 +353,7 @@ class CryptoAPICertManagerTests(unittest.TestCase):
|
|||||||
mock_CertOpenStore.assert_called_with(
|
mock_CertOpenStore.assert_called_with(
|
||||||
self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0,
|
self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0,
|
||||||
self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE,
|
self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE,
|
||||||
s.unicode(self.x509.STORE_NAME_MY))
|
six.text_type(self.x509.STORE_NAME_MY))
|
||||||
|
|
||||||
mock_CertAddEncodedCertificateToStore.assert_called_with(
|
mock_CertAddEncodedCertificateToStore.assert_called_with(
|
||||||
mock_CertOpenStore(),
|
mock_CertOpenStore(),
|
||||||
|
@ -1,35 +0,0 @@
|
|||||||
# Copyright 2014 Cloudbase Solutions Srl
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import sys
|
|
||||||
|
|
||||||
_unicode = None
|
|
||||||
|
|
||||||
|
|
||||||
def unicode(obj):
|
|
||||||
def unicode_2(obj):
|
|
||||||
import __builtin__
|
|
||||||
return __builtin__.unicode(obj)
|
|
||||||
|
|
||||||
def unicode_3(obj):
|
|
||||||
return str(obj)
|
|
||||||
|
|
||||||
global _unicode
|
|
||||||
if not _unicode:
|
|
||||||
if sys.version_info.major >= 3:
|
|
||||||
_unicode = unicode_3
|
|
||||||
else:
|
|
||||||
_unicode = unicode_2
|
|
||||||
|
|
||||||
return _unicode(obj)
|
|
@ -16,11 +16,11 @@
|
|||||||
|
|
||||||
import copy
|
import copy
|
||||||
import ctypes
|
import ctypes
|
||||||
|
import six
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from ctypes import wintypes
|
from ctypes import wintypes
|
||||||
|
|
||||||
from cloudbaseinit.utils import s
|
|
||||||
from cloudbaseinit.utils.windows import cryptoapi
|
from cloudbaseinit.utils.windows import cryptoapi
|
||||||
from cloudbaseinit.utils import x509constants
|
from cloudbaseinit.utils import x509constants
|
||||||
|
|
||||||
@ -182,7 +182,7 @@ class CryptoAPICertManager(object):
|
|||||||
|
|
||||||
store_handle = cryptoapi.CertOpenStore(
|
store_handle = cryptoapi.CertOpenStore(
|
||||||
cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags,
|
cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags,
|
||||||
s.unicode(store_name))
|
six.text_type(store_name))
|
||||||
if not store_handle:
|
if not store_handle:
|
||||||
raise cryptoapi.CryptoAPIException()
|
raise cryptoapi.CryptoAPIException()
|
||||||
|
|
||||||
@ -246,7 +246,7 @@ class CryptoAPICertManager(object):
|
|||||||
|
|
||||||
store_handle = cryptoapi.CertOpenStore(
|
store_handle = cryptoapi.CertOpenStore(
|
||||||
cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags,
|
cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags,
|
||||||
s.unicode(store_name))
|
six.text_type(store_name))
|
||||||
if not store_handle:
|
if not store_handle:
|
||||||
raise cryptoapi.CryptoAPIException()
|
raise cryptoapi.CryptoAPIException()
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user