From 0de31ed183c7e9b504824d61cde7d3268bfdf219 Mon Sep 17 00:00:00 2001 From: Kun Huang Date: Tue, 10 Nov 2015 17:53:31 +0800 Subject: [PATCH] move to RPC mode * support tracer list/start * support old style stop command Change-Id: Ia0437eb7bd8775d5c8f50fc3d60cf4ffdc82b494 --- scalpels/agents/server.py | 83 ++++++++++++++++++++ scalpels/cli/actions/start.py | 20 +---- scalpels/cli/actions/stop.py | 23 +----- scalpels/cli/actions/{agent.py => tracer.py} | 7 +- scalpels/cli/api.py | 21 +++++ scalpels/cli/manage.py | 12 +++ scalpels/cli/rpcapi.py | 20 +++++ scalpels/cli/shell.py | 4 +- scalpels/rpc/client.py | 13 --- scalpels/rpc/server.py | 31 -------- setup.cfg | 1 + 11 files changed, 147 insertions(+), 88 deletions(-) create mode 100644 scalpels/agents/server.py rename scalpels/cli/actions/{agent.py => tracer.py} (55%) create mode 100644 scalpels/cli/api.py create mode 100644 scalpels/cli/manage.py create mode 100644 scalpels/cli/rpcapi.py delete mode 100644 scalpels/rpc/client.py delete mode 100644 scalpels/rpc/server.py diff --git a/scalpels/agents/server.py b/scalpels/agents/server.py new file mode 100644 index 0000000..a3484c1 --- /dev/null +++ b/scalpels/agents/server.py @@ -0,0 +1,83 @@ +from oslo_config import cfg +import oslo_messaging +from scalpels.db import api as db_api +from scalpels.agents.base import run_agent +import psutil +import signal + +class ServerControlEndpoint(object): + + target = oslo_messaging.Target(topic="test", version='1.0') + + def __init__(self, server): + self.server = server + + def stop(self, ctx): + if server: + self.server.stop() + +class TraceEndpoint(object): + + target = oslo_messaging.Target(topic="test", version='1.0') + + def tracer_list(self, ctx): + # TODO db_api + # XXX ctx required? + from scalpels.cli.actions.start import agents_map + return agents_map + + def start_tracers(self, ctx, tracers): + print locals() + task = db_api.task_create(results=[], pids=[]) + + pids = [] + for tr in tracers: + pid = run_agent(task.uuid, tr) + pids.append(pid) + + task = db_api.task_update(task.uuid, pids=pids) + print "task <%s> runs successfully!" % task.uuid + +class TaskEndpoint(object): + + target = oslo_messaging.Target(topic="test", version='1.0') + + LOWEST = 8 + + def get_last_task(self): + # XXX put it tu utils? + last_task = db_api.task_get_last() + return last_task + + def stop_task(self, ctx): + uuid = ctx.get("uuid") + last = ctx.get("last") + + if last and uuid: + raise ValueError("can't assign last and uuid togther") + elif not last and not uuid: + task = self.get_last_task() + elif last: + task = self.get_last_task() + elif uuid and len(uuid) < self.LOWEST: + print "at least %d to find a task" % self.LOWEST + return + else: + # len(uuid) > LOWEST + task = db_api.task_get(uuid, fuzzy=True) + + print "command stop: %s" % ctx + print "task: <%s>" % task.uuid + for pid in task.pids: + p = psutil.Process(int(pid)) + p.send_signal(signal.SIGINT) + +transport = oslo_messaging.get_transport(cfg.CONF) +target = oslo_messaging.Target(topic='test', server='localhost') +endpoints = [ + ServerControlEndpoint(None), + TraceEndpoint(), + TaskEndpoint(), +] +server = oslo_messaging.get_rpc_server(transport, target, endpoints, + executor='blocking') diff --git a/scalpels/cli/actions/start.py b/scalpels/cli/actions/start.py index bd04fd8..e180da9 100644 --- a/scalpels/cli/actions/start.py +++ b/scalpels/cli/actions/start.py @@ -4,11 +4,7 @@ import os import json -from scalpels.db import api as db_api -import subprocess -import time -import signal -from scalpels.agents.base import run_agent +from scalpels.cli.api import api as agent_api def _parse_agents_from_args(config): parsed_agents = set() @@ -50,16 +46,4 @@ def run(config): print "command start: %s" % config agents = _parse_agents_from_args(config) agents |= _parse_agents_from_file(config) - - task = db_api.task_create(results=[], pids=[]) - - data_dir = db_api.setup_config_get()["data_dir"].rstrip("/") - pids = [] - for ag in agents: - ag_exec = agents_map.get(ag) % data_dir - if ag_exec: - pid = run_agent(task.uuid, ag) - pids.append(pid) - - task = db_api.task_update(task.uuid, pids=pids) - print "task <%s> runs successfully!" % task.uuid + agent_api.start_tracers(agents) diff --git a/scalpels/cli/actions/stop.py b/scalpels/cli/actions/stop.py index 0a6008b..b80ccbe 100644 --- a/scalpels/cli/actions/stop.py +++ b/scalpels/cli/actions/stop.py @@ -5,6 +5,7 @@ from scalpels.db import api as db_api import psutil import signal +from scalpels.cli.api import api as agent_api LOWEST=8 @@ -13,24 +14,4 @@ def get_last_task(): return last_task def run(config): - uuid = config.get("uuid") - last = config.get("last") - - if last and uuid: - raise ValueError("can't assign last and uuid togther") - elif not last and not uuid: - task = get_last_task() - elif last: - task = get_last_task() - elif uuid and len(uuid) < LOWEST: - print "at least %d to find a task" % LOWEST - return - else: - # len(uuid) > LOWEST - task = db_api.task_get(uuid, fuzzy=True) - - print "command stop: %s" % config - print "task: <%s>" % task.uuid - for pid in task.pids: - p = psutil.Process(int(pid)) - p.send_signal(signal.SIGINT) + agent_api.stop_task(config) diff --git a/scalpels/cli/actions/agent.py b/scalpels/cli/actions/tracer.py similarity index 55% rename from scalpels/cli/actions/agent.py rename to scalpels/cli/actions/tracer.py index 37ab4dd..b29205e 100644 --- a/scalpels/cli/actions/agent.py +++ b/scalpels/cli/actions/tracer.py @@ -2,12 +2,13 @@ #-*- coding:utf-8 -*- # Author: Kun Huang -from scalpels.cli.actions.start import agents_map +from scalpels.cli.api import api as agent_api from prettytable import PrettyTable def run(config): + tracers = agent_api.get_tracer_list() t = PrettyTable(["tracer", "tracer script"]) - for ag in agents_map: - t.add_row([ag, agents_map[ag]]) + for tracer, script in tracers.items(): + t.add_row([tracer, script]) print t diff --git a/scalpels/cli/api.py b/scalpels/cli/api.py new file mode 100644 index 0000000..c82d009 --- /dev/null +++ b/scalpels/cli/api.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- +# Author: Kun Huang + + +from scalpels.cli.rpcapi import rpcapi + +class API(object): + def __init__(self): + pass + + def get_tracer_list(self): + return rpcapi.tracer_list() + + def start_tracers(self, tracers): + rpcapi.start_tracers(tracers=tracers) + + def stop_task(self, config): + rpcapi.stop_task(config) + +api = API() diff --git a/scalpels/cli/manage.py b/scalpels/cli/manage.py new file mode 100644 index 0000000..a451457 --- /dev/null +++ b/scalpels/cli/manage.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- +# Author: Kun Huang + +from scalpels.agents.server import server + +def main(): + server.start() + server.wait() + +if __name__ == "__main__": + main() diff --git a/scalpels/cli/rpcapi.py b/scalpels/cli/rpcapi.py new file mode 100644 index 0000000..eb48aeb --- /dev/null +++ b/scalpels/cli/rpcapi.py @@ -0,0 +1,20 @@ +import oslo_messaging as messaging +from oslo_config import cfg + +class RPCAPI(object): + + def __init__(self, transport): + target = messaging.Target(topic='test', version='1.0') + self._client = messaging.RPCClient(transport, target) + + def tracer_list(self, ctxt={}): + return self._client.call(ctxt, "tracer_list") + + def start_tracers(self, ctxt={}, tracers=None): + self._client.cast(ctxt, "start_tracers", tracers=tracers) + + def stop_task(self, ctxt={}): + self._client.cast(ctxt, "stop_task") + +transport = messaging.get_transport(cfg.CONF) +rpcapi = RPCAPI(transport) diff --git a/scalpels/cli/shell.py b/scalpels/cli/shell.py index eefca25..641bdd8 100755 --- a/scalpels/cli/shell.py +++ b/scalpels/cli/shell.py @@ -50,8 +50,8 @@ def main(): result.add_argument("--short", action="store_true", dest="short", help="report uuid only") # agent command - agent = subparsers.add_parser("agent") - agent.add_argument("-l", "--list", action="store_true", dest="list", help="list all agents") + tracer = subparsers.add_parser("tracer") + tracer.add_argument("-l", "--list", action="store_true", dest="list", help="list all agents") parser = rootparser.parse_args() diff --git a/scalpels/rpc/client.py b/scalpels/rpc/client.py deleted file mode 100644 index e56f81f..0000000 --- a/scalpels/rpc/client.py +++ /dev/null @@ -1,13 +0,0 @@ -import oslo_messaging as messaging -from oslo_config import cfg -class TestClient(object): - - def __init__(self, transport): - target = messaging.Target(topic='test', version='2.0') - self._client = messaging.RPCClient(transport, target) - - def test(self, ctxt, arg): - return self._client.call(ctxt, 'test', arg=arg) - -transport = messaging.get_transport(cfg.CONF) -print TestClient(transport).test({"ctx":"this is context"}, 'ping') diff --git a/scalpels/rpc/server.py b/scalpels/rpc/server.py deleted file mode 100644 index f8cb557..0000000 --- a/scalpels/rpc/server.py +++ /dev/null @@ -1,31 +0,0 @@ -from oslo_config import cfg -import oslo_messaging - -class ServerControlEndpoint(object): - - #target = oslo_messaging.Target(namespace='control', version='2.0') - target = oslo_messaging.Target(topic="test", version='2.0') - - def __init__(self, server): - self.server = server - - def stop(self, ctx): - if server: - self.server.stop() - -class TestEndpoint(object): - - target = oslo_messaging.Target(topic="test", version='2.0') - def test(self, ctx, arg): - return arg - -transport = oslo_messaging.get_transport(cfg.CONF) -target = oslo_messaging.Target(topic='test', server='localhost') -endpoints = [ - ServerControlEndpoint(None), - TestEndpoint(), -] -server = oslo_messaging.get_rpc_server(transport, target, endpoints, - executor='blocking') -server.start() -server.wait() diff --git a/setup.cfg b/setup.cfg index ea2ac29..7f695cc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,3 +28,4 @@ packages = scalpels [entry_points] console_scripts = sca = scalpels.cli.shell:main + sca-m = scalpels.cli.manage:main