scheduler_maanger.py: restored the signatures of Notificatio.info() Notificatio.warn() and Notificatio.error()

fairshare_manager.py: fixed calculateFairShare()
client: added new shell commands

Change-Id: I8b0232851b4741f1aee7b95e9181c8133ee8d220
This commit is contained in:
Lisa Zangrando 2016-06-03 14:44:35 +02:00
parent b11fb6e11c
commit a9a83e6abc
4 changed files with 377 additions and 44 deletions

View File

@ -30,6 +30,13 @@ synergy.managers =
FairShareManager = synergy_scheduler_manager.fairshare_manager:FairShareManager FairShareManager = synergy_scheduler_manager.fairshare_manager:FairShareManager
SchedulerManager = synergy_scheduler_manager.scheduler_manager:SchedulerManager SchedulerManager = synergy_scheduler_manager.scheduler_manager:SchedulerManager
synergy.commands =
get_queue = synergy_scheduler_manager.client.command:GetQueue
get_quota = synergy_scheduler_manager.client.command:GetQuota
get_priority = synergy_scheduler_manager.client.command:GetPriority
get_share = synergy_scheduler_manager.client.command:GetShare
get_usage = synergy_scheduler_manager.client.command:GetUsage
[build_sphinx] [build_sphinx]
source-dir = doc/source source-dir = doc/source
build-dir = doc/build build-dir = doc/build

View File

@ -0,0 +1,361 @@
from synergy.client.command import Execute
__author__ = "Lisa Zangrando"
__email__ = "lisa.zangrando[AT]pd.infn.it"
__copyright__ = """Copyright (c) 2015 INFN - INDIGO-DataCloud
All Rights Reserved
Licensed under the Apache License, Version 2.0;
you may not use this file except in compliance with the
License. You may obtain a copy of the License at:
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express or implied.
See the License for the specific language governing
permissions and limitations under the License."""
class GetQuota(Execute):
def __init__(self):
super(GetQuota, self).__init__("GET_DYNAMIC_QUOTA")
def configureParser(self, subparser):
parser = subparser.add_parser("get_quota",
add_help=True,
help="shows the dynamic quota info")
parser.add_argument("--long",
action='store_true',
help="shows more details")
def sendRequest(self, synergy_url, args):
self.long = args.long
super(GetQuota, self).sendRequest(
synergy_url + "/synergy/execute", "QuotaManager", self.getName())
def log(self):
quota = self.getResults()
if not self.long:
cores_in_use = "{:d}".format(quota["cores"]["in_use"])
max_cores_in_use = max(len(cores_in_use), len("in use"))
cores_limit = "{:.2f}".format(quota["cores"]["limit"])
max_cores_limit = max(len(cores_limit), len("limit"))
ram_in_use = "{:d}".format(quota["ram"]["in_use"])
max_ram_in_use = max(len(ram_in_use), len("in use"))
ram_limit = "{:.2f}".format(quota["ram"]["limit"])
max_ram_limit = max(len(ram_limit), len("limit"))
separator = "-" * (max_cores_in_use + max_cores_limit +
max_ram_in_use + max_ram_limit + 7) + "\n"
raw = "| {0:%ss} | {1:%ss} | {2:%ss} |\n" % (
len("ram (MB)"),
max(max_cores_in_use, max_ram_in_use),
max(max_cores_limit, max_ram_limit))
msg = separator
msg += raw.format("type", "in use", "limit")
msg += separator
msg += raw.format("ram (MB)", ram_in_use, ram_limit)
msg += raw.format("cores", cores_in_use, cores_limit)
msg += separator
print(msg)
else:
max_ram = 0
max_ram_in_use = len("{:d}".format(quota["ram"]["in_use"]))
max_ram_limit = len("{:.2f}".format(quota["ram"]["limit"]))
max_cores = 0
max_cores_in_use = len("{:d}".format(quota["cores"]["in_use"]))
max_cores_limit = len("{:.2f}".format(quota["cores"]["limit"]))
max_prj_name = len("project")
for project in quota["projects"].values():
max_prj_name = max(len(project["name"]), max_prj_name)
max_ram = max(len("{:d}".format(project["ram"])), max_ram)
max_cores = max(len("{:d}".format(project["cores"])),
max_cores)
separator = "-" * (max_prj_name + max_cores + max_cores_in_use +
max_cores_limit + max_ram + max_ram_in_use +
max_ram_limit + 48)
title = "| {0:%ss} | {1:%ss} | {2:%ss} |\n" % (
max_prj_name,
max_cores + max_cores_in_use + max_cores_limit + 19,
max_ram + max_ram_in_use + max_ram_limit + 19)
raw = "| {0:%ss} | in use={1:%d} ({2:%d}) | limit={3:%ss} |" \
" in use={4:%d} ({5:%d}) | limit={6:%ss} |\n"
raw = raw % (max_prj_name, max_cores, max_cores_in_use,
max_cores_limit, max_ram, max_ram_in_use,
max_ram_limit)
msg = separator + "\n"
msg += title.format("project", "cores", "ram (MB)")
msg += separator + "\n"
for project in quota["projects"].values():
msg += raw.format(
project["name"], project["cores"],
quota["cores"]["in_use"],
"{:.2f}".format(quota["cores"]["limit"]),
project["ram"],
quota["ram"]["in_use"],
"{:.2f}".format(quota["ram"]["limit"]))
msg += separator + "\n"
print(msg)
class GetPriority(Execute):
def __init__(self):
super(GetPriority, self).__init__("GET_PRIORITY")
def configureParser(self, subparser):
subparser.add_parser("get_priority",
add_help=True,
help="shows the users priority")
def sendRequest(self, synergy_url, args):
super(GetPriority, self).sendRequest(
synergy_url + "/synergy/execute",
"FairShareManager",
self.getName())
def log(self):
projects = self.getResults()
max_prj = len("project")
max_user = len("user")
max_priority = len("priority")
for prj_name, users in projects.items():
max_prj = max(len(prj_name), max_prj)
for user_name, priority in users.items():
max_user = max(len(user_name), max_user)
max_priority = max(len("{:.2f}".format(priority)),
max_priority)
separator = "-" * (max_prj + max_user + max_priority + 10) + "\n"
raw = "| {0:%ss} | {1:%ss} | {2:%ss} |\n" % (
max_prj, max_user, max_priority)
msg = separator
msg += raw.format("project", "user", "priority")
msg += separator
for prj_name in sorted(projects.keys()):
for user_name in sorted(projects[prj_name].keys()):
msg += raw.format(
prj_name,
user_name,
"{:.2f}".format(projects[prj_name][user_name]))
msg += separator
print(msg)
class GetQueue(Execute):
def __init__(self):
super(GetQueue, self).__init__("GET_QUEUE")
def configureParser(self, subparser):
subparser.add_parser("get_queue",
add_help=True,
help="shows the queue info")
def sendRequest(self, synergy_url, args):
super(GetQueue, self).sendRequest(
synergy_url + "/synergy/execute",
"QueueManager",
self.getName(),
{"name": "DYNAMIC"})
def log(self):
queue = self.getResults()
max_status = len("status")
max_queue = max(len(queue["name"]), len("queue"))
max_size = max(len("{:d}".format(queue["size"])), len("size"))
separator = "-" * (max_queue + max_status + max_size + 10) + "\n"
raw = "| {0:%ss} | {1:%ss} | {2:%ss} |\n" % (
max_queue, max_status, max_size)
msg = separator
msg += raw.format("queue", "status", "size")
msg += separator
msg += raw.format(queue["name"],
queue["status"],
"{:d}".format(queue["size"]))
msg += separator
print(msg)
class GetShare(Execute):
def __init__(self):
super(GetShare, self).__init__("GET_SHARE")
def configureParser(self, subparser):
subparser.add_parser("get_share",
add_help=True,
help="shows the users share")
def sendRequest(self, synergy_url, args):
super(GetShare, self).sendRequest(
synergy_url + "/synergy/execute",
"FairShareManager",
"GET_PROJECTS")
def log(self):
projects = self.getResults()
max_prj = len("project")
max_usr = len("user")
max_prj_share = len("share")
max_usr_share = len("share (abs)")
for project in projects.values():
max_prj = max(len(project["name"]), max_prj)
max_prj_share = max(len("{:.2f}".format(project["share"])),
max_prj_share)
for user in project["users"].values():
max_usr = max(len(user["name"]), max_usr)
max_usr_share = max(
len("{:.2f} ({:.2f})".format(user["share"],
user["norm_share"])),
max_usr_share)
separator_str = "-" * (max_prj + max_usr + max_prj_share +
max_usr_share + 13) + "\n"
data_str = "| {0:%ss} | {1:%ss} | {2:%ss} | {3:%ss} |\n" % (
max_prj, max_prj_share, max_usr, max_usr_share)
msg = separator_str
msg += data_str.format("project", "share", "user", "share (abs)")
msg += separator_str
for project in projects.values():
for user in project["users"].values():
msg += data_str.format(
project["name"],
"{:.2f}".format(project["share"]),
user["name"],
"{:.2f} ({:.2f})".format(user["share"],
user["norm_share"]))
msg += separator_str
print(msg)
class GetUsage(Execute):
def __init__(self):
super(GetUsage, self).__init__("GET_USAGE")
def configureParser(self, subparser):
subparser.add_parser("get_usage",
add_help=True,
help="retrieve the resource usages")
def sendRequest(self, synergy_url, args):
super(GetUsage, self).sendRequest(
synergy_url + "/synergy/execute",
"FairShareManager",
"GET_PROJECTS")
def log(self):
projects = self.getResults()
max_prj = len("project")
max_usr = len("user")
max_prj_cores = len("cores")
max_usr_cores = len("cores")
max_prj_ram = len("ram")
max_usr_ram = len("ram (abs)")
for project in projects.values():
usage = project["usage"]
max_prj = max(len(project["name"]), max_prj)
max_prj_cores = max(len(
"{:.2f}%".format(usage["effective_cores"] * 100)),
max_prj_cores)
max_prj_ram = max(len(
"{:.2f}%".format(usage["effective_ram"] * 100)),
max_prj_ram)
for user in project["users"].values():
usage = user["usage"]
max_usr = max(len(user["name"]), max_usr)
max_usr_cores = max(len("{:.2f}% ({:.2f})%".format(
usage["effective_rel_cores"] * 100,
usage["norm_cores"] * 100)),
max_usr_cores)
max_usr_ram = max(len("{:.2f}% ({:.2f})%".format(
usage["effective_rel_ram"] * 100,
usage["norm_ram"] * 100)),
max_usr_ram)
separator = "-" * (max_prj + max_usr + max_prj_cores +
max_usr_cores + max_prj_ram +
max_usr_ram + 19) + "\n"
raw = "| {0:%ss} | {1:%ss} | {2:%ss} | {3:%ss} | {4:%ss} | " \
"{5:%ss} | \n" % (max_prj, max_prj_cores, max_prj_ram,
max_usr, max_usr_cores, max_usr_ram)
msg = separator
msg += raw.format("project", "cores", "ram",
"user", "cores (abs)", "ram (abs)")
msg += separator
for project in projects.values():
prj_usage = project["usage"]
for user in project["users"].values():
usr_usage = user["usage"]
prj_cores = "{:.2f}%".format(
prj_usage["effective_cores"] * 100)
prj_ram = "{:.2f}%".format(prj_usage["effective_ram"] * 100)
usr_cores = "{:.2f}% ({:.2f}%)".format(
usr_usage["effective_rel_cores"] * 100,
usr_usage["norm_cores"] * 100)
usr_ram = "{:.2f}% ({:.2f}%)".format(
usr_usage["effective_rel_ram"] * 100,
usr_usage["norm_ram"] * 100)
msg += raw.format(
project["name"], prj_cores, prj_ram,
user["name"], usr_cores, usr_ram)
msg += separator
print(msg)

View File

@ -249,7 +249,6 @@ class FairShareManager(Manager):
# check the share for each user and update the usage_record # check the share for each user and update the usage_record
users = project["users"] users = project["users"]
prj_id = project["id"] prj_id = project["id"]
# prj_name = project["name"]
prj_share = project["share"] prj_share = project["share"]
sibling_share = float(0) sibling_share = float(0)
@ -310,14 +309,6 @@ class FairShareManager(Manager):
sibling_share = project["sibling_share"] sibling_share = project["sibling_share"]
users = project["users"] users = project["users"]
# effect_prj_cores_usage = actual_usage_cores +
# ((total_actual_usage_cores - actual_usage_cores) *
# prj_share / total_prj_share)
# effect_prj_cores_usage = actual_usage_ram +
# ((total_actual_usage_ram - actual_usage_ram) *
# prj_share / total_prj_share)
effect_prj_ram_usage = actual_usage_ram effect_prj_ram_usage = actual_usage_ram
effect_prj_cores_usage = actual_usage_cores effect_prj_cores_usage = actual_usage_cores
@ -358,13 +349,8 @@ class FairShareManager(Manager):
user_usage["effective_rel_cores"] /= actual_usage_cores user_usage["effective_rel_cores"] /= actual_usage_cores
if actual_usage_ram > 0: if actual_usage_ram > 0:
user_usage["effect_rel_ram"] = norm_usage_ram user_usage["effective_rel_ram"] = norm_usage_ram
user_usage["effect_rel_ram"] /= actual_usage_ram user_usage["effective_rel_ram"] /= actual_usage_ram
# user["effect_usage_rel_cores"] = effect_usage_cores /
# effect_prj_cores_usage
# user["effect_usage_rel_ram"] = effect_usage_ram /
# effect_prj_cores_usage
if norm_share > 0: if norm_share > 0:
f_ram = 2 ** (-effect_usage_ram / norm_share) f_ram = 2 ** (-effect_usage_ram / norm_share)

View File

@ -40,7 +40,7 @@ class Notifications(object):
self.dynamic_quota = dynamic_quota self.dynamic_quota = dynamic_quota
def info(self, event_type, payload): def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.debug("Notification INFO: event_type=%s payload=%s" LOG.debug("Notification INFO: event_type=%s payload=%s"
% (event_type, payload)) % (event_type, payload))
@ -69,8 +69,6 @@ class Notifications(object):
instance_id = instance_info["instance_id"] instance_id = instance_info["instance_id"]
ram = instance_info["memory_mb"] ram = instance_info["memory_mb"]
cores = instance_info["vcpus"] cores = instance_info["vcpus"]
# disk = instance_info["root_gb"]
# node = instance_info["node"]
LOG.debug("Notification INFO (type=%s state=%s): cores=%s ram=%s " LOG.debug("Notification INFO (type=%s state=%s): cores=%s ram=%s "
"prj_id=%s instance_id=%s" "prj_id=%s instance_id=%s"
@ -81,15 +79,15 @@ class Notifications(object):
except Exception as ex: except Exception as ex:
LOG.warn("Notification INFO: %s" % ex) LOG.warn("Notification INFO: %s" % ex)
def warn(self, event_type, payload): def warn(self, ctxt, publisher_id, event_type, payload, metadata):
state = payload["state"] state = payload["state"]
instance_id = payload["instance_id"] instance_id = payload["instance_id"]
LOG.info("Notification WARN: event_type=%s state=%s instance_id=%s" LOG.debug("Notification WARN: event_type=%s state=%s instance_id=%s"
% (event_type, state, instance_id)) % (event_type, state, instance_id))
def error(self, event_type, payload, metadata): def error(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.info("Notification ERROR: event_type=%s payload=%s metadata=%s" LOG.debug("Notification ERROR: event_type=%s payload=%s metadata=%s"
% (event_type, payload, metadata)) % (event_type, payload, metadata))
class Worker(threading.Thread): class Worker(threading.Thread):
@ -110,7 +108,6 @@ class Worker(threading.Thread):
def destroy(self): def destroy(self):
try: try:
# if self.queue:
self.queue.close() self.queue.close()
self.exit = True self.exit = True
@ -126,8 +123,6 @@ class Worker(threading.Thread):
queue_item = self.queue.getItem() queue_item = self.queue.getItem()
except Exception as ex: except Exception as ex:
LOG.error("Worker %r: %s" % (self.name, ex)) LOG.error("Worker %r: %s" % (self.name, ex))
# self.exit = True
# break
continue continue
if queue_item is None: if queue_item is None:
@ -137,7 +132,6 @@ class Worker(threading.Thread):
request = queue_item.getData() request = queue_item.getData()
instance = request["instance"] instance = request["instance"]
# user_id = instance["nova_object.data"]["user_id"]
prj_id = instance["nova_object.data"]["project_id"] prj_id = instance["nova_object.data"]["project_id"]
uuid = instance["nova_object.data"]["uuid"] uuid = instance["nova_object.data"]["uuid"]
vcpus = instance["nova_object.data"]["vcpus"] vcpus = instance["nova_object.data"]["vcpus"]
@ -153,8 +147,6 @@ class Worker(threading.Thread):
image = request["image"] image = request["image"]
try: try:
# vm_instance = self.novaConductorAPI.instance_get_by_uuid
# (context, instance_uuid=instance_uuid)
server = self.nova_manager.execute("GET_SERVER", server = self.nova_manager.execute("GET_SERVER",
id=uuid) id=uuid)
except Exception as ex: except Exception as ex:
@ -178,11 +170,6 @@ class Worker(threading.Thread):
ram=memory_mb) ram=memory_mb)
continue continue
# LOG.info(request_spec)
# if (self.quota.reserve(instance_uuid, vcpus, memory_mb)):
# done = False
if self.quota.allocate(instance_id=uuid, if self.quota.allocate(instance_id=uuid,
prj_id=prj_id, prj_id=prj_id,
cores=vcpus, cores=vcpus,
@ -217,14 +204,8 @@ class Worker(threading.Thread):
self.queue.deleteItem(queue_item) self.queue.deleteItem(queue_item)
except Exception as ex: except Exception as ex:
LOG.error("Worker '%s': %s" % (self.name, ex)) LOG.error("Worker '%s': %s" % (self.name, ex))
# self.queue.reinsertItem(queue_item)
continue continue
# LOG.info("Worker done is %s" % done)
# LOG.info(">>>> Worker '%s' queue.isClosed %s exit=%s"
# % (self.name, self.queue.isClosed(), self.exit))
LOG.info("Worker '%s' destroyed!" % self.name) LOG.info("Worker '%s' destroyed!" % self.name)
@ -423,8 +404,6 @@ class SchedulerManager(Manager):
memory_mb = instance["nova_object.data"]["memory_mb"] memory_mb = instance["nova_object.data"]["memory_mb"]
if prj_id in self.projects: if prj_id in self.projects:
# prj_name = self.projects[prj_id]["name"]
# metadata = instance["nova_object.data"]["metadata"]
timestamp = instance["nova_object.data"]["created_at"] timestamp = instance["nova_object.data"]["created_at"]
timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ") timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ")
priority = 0 priority = 0