Merge "Change init fork order"
This commit is contained in:
commit
6777012bc3
@ -11,9 +11,10 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import networkx as nx
|
import networkx as nx
|
||||||
from networkx.readwrite import json_graph
|
from networkx.readwrite import json_graph
|
||||||
|
import oslo_messaging
|
||||||
|
import pecan
|
||||||
from pecan import rest
|
from pecan import rest
|
||||||
|
|
||||||
from vitrage.datasources import OPENSTACK_CLUSTER
|
from vitrage.datasources import OPENSTACK_CLUSTER
|
||||||
@ -21,6 +22,22 @@ from vitrage.datasources import OPENSTACK_CLUSTER
|
|||||||
|
|
||||||
class RootRestController(rest.RestController):
|
class RootRestController(rest.RestController):
|
||||||
|
|
||||||
|
@pecan.expose()
|
||||||
|
def _route(self, args, request=None):
|
||||||
|
"""All requests go through here
|
||||||
|
|
||||||
|
We can check the backend status
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
client = pecan.request.client.prepare(timeout=5)
|
||||||
|
backend_is_alive = client.call(pecan.request.context, 'is_alive')
|
||||||
|
if backend_is_alive:
|
||||||
|
return super(RootRestController, self)._route(args, request)
|
||||||
|
else:
|
||||||
|
pecan.abort(503, detail='vitrage-graph is not ready')
|
||||||
|
except oslo_messaging.MessagingTimeout:
|
||||||
|
pecan.abort(503, detail='vitrage-graph not available')
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def as_tree(graph, root=OPENSTACK_CLUSTER, reverse=False):
|
def as_tree(graph, root=OPENSTACK_CLUSTER, reverse=False):
|
||||||
if nx.__version__ >= '2.0':
|
if nx.__version__ >= '2.0':
|
||||||
|
@ -46,7 +46,7 @@ class RPCHook(hooks.PecanHook):
|
|||||||
target = oslo_messaging.Target(topic=conf.rpc_topic)
|
target = oslo_messaging.Target(topic=conf.rpc_topic)
|
||||||
self.client = vitrage_rpc.get_client(transport, target)
|
self.client = vitrage_rpc.get_client(transport, target)
|
||||||
|
|
||||||
def before(self, state):
|
def on_route(self, state):
|
||||||
state.request.client = self.client
|
state.request.client = self.client
|
||||||
|
|
||||||
|
|
||||||
|
34
vitrage/api_handler/apis/operational.py
Normal file
34
vitrage/api_handler/apis/operational.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
# Copyright 2018 - Nokia Corporation
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from oslo_log import log
|
||||||
|
from vitrage.api_handler.apis.base import EntityGraphApisBase
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class OperationalApis(EntityGraphApisBase):
|
||||||
|
|
||||||
|
def __init__(self, conf, graph):
|
||||||
|
self.conf = conf
|
||||||
|
self.graph = graph
|
||||||
|
|
||||||
|
def is_alive(self, ctx):
|
||||||
|
try:
|
||||||
|
if self.graph and self.graph.ready:
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
LOG.exception("is_alive check failed.")
|
||||||
|
LOG.warning("Api during initialization - graph not ready")
|
||||||
|
return False
|
@ -15,17 +15,12 @@
|
|||||||
from vitrage import __version__
|
from vitrage import __version__
|
||||||
|
|
||||||
VITRAGE_TITLE = r"""
|
VITRAGE_TITLE = r"""
|
||||||
__ __ __ __
|
_ __ _ __
|
||||||
/ | / |/ | / |
|
| | / /(_)/ /_ _____ ____ _ ____ _ ___
|
||||||
$$ | $$ |$$/ _$$ |_ ______ ______ ______ ______
|
| | / // // __// ___// __ `// __ `// _ \
|
||||||
$$ | $$ |/ |/ $$ | / \ / \ / \ / \
|
| |/ // // /_ / / / /_/ // /_/ // __/
|
||||||
$$ \ /$$/ $$ |$$$$$$/ /$$$$$$ |$$$$$$ |/$$$$$$ |/$$$$$$ |
|
|___//_/ \__//_/ \__,_/ \__, / \___/
|
||||||
$$ /$$/ $$ | $$ | __ $$ | $$/ / $$ |$$ | $$ |$$ $$ |
|
/____/
|
||||||
$$ $$/ $$ | $$ |/ |$$ | /$$$$$$$ |$$ \__$$ |$$$$$$$$/
|
|
||||||
$$$/ $$ | $$ $$/ $$ | $$ $$ |$$ $$ |$$ |
|
|
||||||
$/ $$/ $$$$/ $$/ $$$$$$$/ $$$$$$$ | $$$$$$$/
|
|
||||||
/ \__$$ |
|
|
||||||
$$ $$/
|
|
||||||
$$$$$$/
|
|
||||||
Vitrage RCA Service, version %s
|
Vitrage RCA Service, version %s
|
||||||
""" % __version__
|
""" % __version__
|
||||||
|
@ -12,25 +12,34 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
from oslo_log import log
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from vitrage.cli import VITRAGE_TITLE
|
from vitrage.cli import VITRAGE_TITLE
|
||||||
from vitrage.entity_graph import get_graph_driver
|
from vitrage.common.utils import spawn
|
||||||
from vitrage.entity_graph.graph_init import VitrageGraphInit
|
from vitrage.entity_graph.graph_init import VitrageGraphInit
|
||||||
|
from vitrage.entity_graph.workers import GraphWorkersManager
|
||||||
from vitrage import service
|
from vitrage import service
|
||||||
from vitrage import storage
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
"""Main method of vitrage-graph"""
|
"""Main method of vitrage-graph"""
|
||||||
|
|
||||||
print(VITRAGE_TITLE)
|
|
||||||
conf = service.prepare_service()
|
conf = service.prepare_service()
|
||||||
e_graph = get_graph_driver(conf)('Entity Graph')
|
LOG.info(VITRAGE_TITLE)
|
||||||
db_connection = storage.get_connection_from_config(conf)
|
|
||||||
VitrageGraphInit(conf, e_graph, db_connection).run()
|
|
||||||
|
|
||||||
|
workers = GraphWorkersManager(conf)
|
||||||
|
spawn(init, conf, workers)
|
||||||
|
workers.run()
|
||||||
|
|
||||||
|
|
||||||
|
def init(conf, workers):
|
||||||
|
# Because fork duplicates the process memory.
|
||||||
|
# We should only create master process resources after workers are forked.
|
||||||
|
workers.wait_for_worker_start()
|
||||||
|
VitrageGraphInit(conf, workers).run()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
sys.exit(main())
|
sys.exit(main())
|
||||||
|
@ -157,5 +157,7 @@ class ConsistencyEnforcer(object):
|
|||||||
if driver_class.should_delete_outdated_entities():
|
if driver_class.should_delete_outdated_entities():
|
||||||
self.datasources_to_mark_deleted.append(driver_name)
|
self.datasources_to_mark_deleted.append(driver_name)
|
||||||
|
|
||||||
LOG.info('Vertices of the following datasources will be deleted if '
|
if self.datasources_to_mark_deleted:
|
||||||
'they become outdated: %s', self.datasources_to_mark_deleted)
|
LOG.info('Vertices of the following datasources will be deleted if'
|
||||||
|
'they become outdated: %s',
|
||||||
|
self.datasources_to_mark_deleted)
|
||||||
|
@ -21,53 +21,60 @@ from vitrage.common.constants import VertexProperties as VProps
|
|||||||
from vitrage.common.utils import spawn
|
from vitrage.common.utils import spawn
|
||||||
from vitrage.datasources.transformer_base import TransformerBase
|
from vitrage.datasources.transformer_base import TransformerBase
|
||||||
from vitrage.entity_graph import driver_exec
|
from vitrage.entity_graph import driver_exec
|
||||||
|
from vitrage.entity_graph import get_graph_driver
|
||||||
|
|
||||||
from vitrage.entity_graph import EVALUATOR_TOPIC
|
from vitrage.entity_graph import EVALUATOR_TOPIC
|
||||||
from vitrage.entity_graph.graph_persistency import GraphPersistency
|
from vitrage.entity_graph.graph_persistency import GraphPersistency
|
||||||
from vitrage.entity_graph.processor.notifier import GraphNotifier
|
from vitrage.entity_graph.processor.notifier import GraphNotifier
|
||||||
from vitrage.entity_graph.processor.notifier import PersistNotifier
|
from vitrage.entity_graph.processor.notifier import PersistNotifier
|
||||||
from vitrage.entity_graph.processor.processor import Processor
|
from vitrage.entity_graph.processor.processor import Processor
|
||||||
from vitrage.entity_graph.scheduler import Scheduler
|
from vitrage.entity_graph.scheduler import Scheduler
|
||||||
from vitrage.entity_graph.workers import GraphWorkersManager
|
|
||||||
from vitrage.graph.driver.networkx_graph import NXGraph
|
from vitrage.graph.driver.networkx_graph import NXGraph
|
||||||
from vitrage import messaging
|
from vitrage import messaging
|
||||||
|
from vitrage import storage
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class VitrageGraphInit(object):
|
class VitrageGraphInit(object):
|
||||||
def __init__(self, conf, graph, db_connection):
|
def __init__(self, conf, workers):
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
self.graph = graph
|
self.graph = get_graph_driver(conf)('Entity Graph')
|
||||||
self.db = db_connection
|
self.db = db_connection = storage.get_connection_from_config(conf)
|
||||||
self.workers = GraphWorkersManager(conf, graph, db_connection)
|
self.workers = workers
|
||||||
self.events_coordination = EventsCoordination(conf, self.process_event)
|
self.events_coordination = EventsCoordination(conf, self.process_event)
|
||||||
self.persist = GraphPersistency(conf, db_connection, graph)
|
self.persist = GraphPersistency(conf, db_connection, self.graph)
|
||||||
self.driver_exec = driver_exec.DriverExec(
|
self.driver_exec = driver_exec.DriverExec(
|
||||||
self.conf,
|
self.conf,
|
||||||
self.events_coordination.handle_multiple_low_priority,
|
self.events_coordination.handle_multiple_low_priority,
|
||||||
self.persist)
|
self.persist)
|
||||||
self.scheduler = Scheduler(conf, graph, self.driver_exec, self.persist)
|
self.scheduler = Scheduler(conf, self.graph, self.driver_exec,
|
||||||
self.processor = Processor(conf, graph)
|
self.persist)
|
||||||
|
self.processor = Processor(conf, self.graph)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
LOG.info('Init Started')
|
LOG.info('Init Started')
|
||||||
graph_snapshot = self.persist.query_recent_snapshot()
|
graph_snapshot = self.persist.query_recent_snapshot()
|
||||||
if graph_snapshot:
|
if graph_snapshot:
|
||||||
|
t = spawn(self.workers.submit_read_db_graph)
|
||||||
self._restart_from_stored_graph(graph_snapshot)
|
self._restart_from_stored_graph(graph_snapshot)
|
||||||
|
t.join()
|
||||||
|
self.workers.submit_enable_evaluations()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
self._start_from_scratch()
|
self._start_from_scratch()
|
||||||
self.workers.run()
|
self.workers.submit_read_db_graph()
|
||||||
|
self.workers.submit_start_evaluations()
|
||||||
|
self._init_finale()
|
||||||
|
|
||||||
def _restart_from_stored_graph(self, graph_snapshot):
|
def _restart_from_stored_graph(self, graph_snapshot):
|
||||||
LOG.info('Initializing graph from database snapshot (%sKb)',
|
LOG.info('Main process - loading graph from database snapshot (%sKb)',
|
||||||
len(graph_snapshot.graph_snapshot) / 1024)
|
len(graph_snapshot.graph_snapshot) / 1024)
|
||||||
NXGraph.read_gpickle(graph_snapshot.graph_snapshot, self.graph)
|
NXGraph.read_gpickle(graph_snapshot.graph_snapshot, self.graph)
|
||||||
self.persist.replay_events(self.graph, graph_snapshot.event_id)
|
self.persist.replay_events(self.graph, graph_snapshot.event_id)
|
||||||
self._recreate_transformers_id_cache()
|
self._recreate_transformers_id_cache()
|
||||||
LOG.info("%s vertices loaded", self.graph.num_vertices())
|
LOG.info("%s vertices loaded", self.graph.num_vertices())
|
||||||
self.subscribe_presist_notifier()
|
self.subscribe_presist_notifier()
|
||||||
spawn(self._start_all_workers, is_snapshot=True)
|
|
||||||
|
|
||||||
def _start_from_scratch(self):
|
def _start_from_scratch(self):
|
||||||
LOG.info('Starting for the first time')
|
LOG.info('Starting for the first time')
|
||||||
@ -78,13 +85,8 @@ class VitrageGraphInit(object):
|
|||||||
self.subscribe_presist_notifier()
|
self.subscribe_presist_notifier()
|
||||||
self.driver_exec.snapshot_get_all()
|
self.driver_exec.snapshot_get_all()
|
||||||
LOG.info("%s vertices loaded", self.graph.num_vertices())
|
LOG.info("%s vertices loaded", self.graph.num_vertices())
|
||||||
spawn(self._start_all_workers, is_snapshot=False)
|
|
||||||
|
|
||||||
def _start_all_workers(self, is_snapshot):
|
def _init_finale(self):
|
||||||
if is_snapshot:
|
|
||||||
self.workers.submit_enable_evaluations()
|
|
||||||
else:
|
|
||||||
self.workers.submit_start_evaluations()
|
|
||||||
self._add_graph_subscriptions()
|
self._add_graph_subscriptions()
|
||||||
self.scheduler.start_periodic_tasks()
|
self.scheduler.start_periodic_tasks()
|
||||||
LOG.info('Init Finished')
|
LOG.info('Init Finished')
|
||||||
|
@ -48,9 +48,13 @@ class GraphPersistency(object):
|
|||||||
|
|
||||||
def replay_events(self, graph, event_id):
|
def replay_events(self, graph, event_id):
|
||||||
LOG.info('Getting events from database')
|
LOG.info('Getting events from database')
|
||||||
events = self.db.events.get_replay_events(
|
count = self.do_replay_events(self.db, graph, event_id)
|
||||||
|
LOG.info('%s database events applied ', count)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def do_replay_events(db, graph, event_id):
|
||||||
|
events = db.events.get_replay_events(
|
||||||
event_id=event_id)
|
event_id=event_id)
|
||||||
LOG.info('Applying %s database events', len(events))
|
|
||||||
|
|
||||||
for event in events:
|
for event in events:
|
||||||
if event.is_vertex:
|
if event.is_vertex:
|
||||||
@ -67,6 +71,7 @@ class GraphPersistency(object):
|
|||||||
del event.payload['label']
|
del event.payload['label']
|
||||||
e = Edge(source_id, target_id, label, event.payload)
|
e = Edge(source_id, target_id, label, event.payload)
|
||||||
graph.update_edge(e)
|
graph.update_edge(e)
|
||||||
|
return len(events)
|
||||||
|
|
||||||
def persist_event(self, before, current, is_vertex, graph, event_id=None):
|
def persist_event(self, before, current, is_vertex, graph, event_id=None):
|
||||||
"""Callback subscribed to driver.graph updates"""
|
"""Callback subscribed to driver.graph updates"""
|
||||||
|
@ -14,12 +14,15 @@
|
|||||||
import abc
|
import abc
|
||||||
import cotyledon
|
import cotyledon
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
|
import os
|
||||||
from oslo_concurrency import processutils as ps
|
from oslo_concurrency import processutils as ps
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
|
|
||||||
|
from vitrage.api_handler.apis.operational import OperationalApis
|
||||||
|
from vitrage.entity_graph.graph_persistency import GraphPersistency
|
||||||
|
|
||||||
from vitrage.api_handler.apis.alarm import AlarmApis
|
from vitrage.api_handler.apis.alarm import AlarmApis
|
||||||
from vitrage.api_handler.apis.event import EventApis
|
from vitrage.api_handler.apis.event import EventApis
|
||||||
from vitrage.api_handler.apis.rca import RcaApis
|
from vitrage.api_handler.apis.rca import RcaApis
|
||||||
@ -34,6 +37,7 @@ from vitrage.entity_graph import EVALUATOR_TOPIC
|
|||||||
from vitrage.evaluator.actions.base import ActionMode
|
from vitrage.evaluator.actions.base import ActionMode
|
||||||
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
|
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
|
||||||
from vitrage.evaluator.scenario_repository import ScenarioRepository
|
from vitrage.evaluator.scenario_repository import ScenarioRepository
|
||||||
|
from vitrage.graph.driver.networkx_graph import NXGraph
|
||||||
from vitrage import messaging
|
from vitrage import messaging
|
||||||
from vitrage import rpc as vitrage_rpc
|
from vitrage import rpc as vitrage_rpc
|
||||||
from vitrage import storage
|
from vitrage import storage
|
||||||
@ -41,6 +45,8 @@ from vitrage import storage
|
|||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
# Supported message types
|
# Supported message types
|
||||||
|
WAIT_FOR_WORKER_START = 'wait_for_worker_start'
|
||||||
|
READ_DB_GRAPH = 'read_db_graph'
|
||||||
GRAPH_UPDATE = 'graph_update'
|
GRAPH_UPDATE = 'graph_update'
|
||||||
ENABLE_EVALUATION = 'enable_evaluation'
|
ENABLE_EVALUATION = 'enable_evaluation'
|
||||||
START_EVALUATION = 'start_evaluation'
|
START_EVALUATION = 'start_evaluation'
|
||||||
@ -58,16 +64,15 @@ class GraphWorkersManager(cotyledon.ServiceManager):
|
|||||||
- the queues used to communicate with these workers
|
- the queues used to communicate with these workers
|
||||||
- methods interface to submit tasks to workers
|
- methods interface to submit tasks to workers
|
||||||
"""
|
"""
|
||||||
def __init__(self, conf, entity_graph, db):
|
def __init__(self, conf):
|
||||||
super(GraphWorkersManager, self).__init__()
|
super(GraphWorkersManager, self).__init__()
|
||||||
self._conf = conf
|
self._conf = conf
|
||||||
self._entity_graph = entity_graph
|
self._db = None
|
||||||
self._db = db
|
|
||||||
self._evaluator_queues = []
|
self._evaluator_queues = []
|
||||||
self._template_queues = []
|
self._template_queues = []
|
||||||
self._api_queues = []
|
self._api_queues = []
|
||||||
self._all_queues = []
|
self._all_queues = []
|
||||||
self.register_hooks(on_terminate=self._stop)
|
self.register_hooks(on_terminate=self._force_stop)
|
||||||
self.add_evaluator_workers()
|
self.add_evaluator_workers()
|
||||||
self.add_api_workers()
|
self.add_api_workers()
|
||||||
|
|
||||||
@ -88,7 +93,7 @@ class GraphWorkersManager(cotyledon.ServiceManager):
|
|||||||
workers = self._conf.evaluator.workers or ps.get_worker_count()
|
workers = self._conf.evaluator.workers or ps.get_worker_count()
|
||||||
queues = [multiprocessing.JoinableQueue() for i in range(workers)]
|
queues = [multiprocessing.JoinableQueue() for i in range(workers)]
|
||||||
self.add(EvaluatorWorker,
|
self.add(EvaluatorWorker,
|
||||||
args=(self._conf, queues, self._entity_graph, workers),
|
args=(self._conf, queues, workers),
|
||||||
workers=workers)
|
workers=workers)
|
||||||
self._evaluator_queues = queues
|
self._evaluator_queues = queues
|
||||||
self._all_queues.extend(queues)
|
self._all_queues.extend(queues)
|
||||||
@ -105,9 +110,7 @@ class GraphWorkersManager(cotyledon.ServiceManager):
|
|||||||
raise VitrageError('add_api_workers called more than once')
|
raise VitrageError('add_api_workers called more than once')
|
||||||
workers = self._conf.api.workers
|
workers = self._conf.api.workers
|
||||||
queues = [multiprocessing.JoinableQueue() for i in range(workers)]
|
queues = [multiprocessing.JoinableQueue() for i in range(workers)]
|
||||||
self.add(ApiWorker,
|
self.add(ApiWorker, args=(self._conf, queues), workers=workers)
|
||||||
args=(self._conf, queues, self._entity_graph),
|
|
||||||
workers=workers)
|
|
||||||
self._api_queues = queues
|
self._api_queues = queues
|
||||||
self._all_queues.extend(queues)
|
self._all_queues.extend(queues)
|
||||||
|
|
||||||
@ -143,6 +146,24 @@ class GraphWorkersManager(cotyledon.ServiceManager):
|
|||||||
"""
|
"""
|
||||||
self._submit_and_wait(self._evaluator_queues, (RELOAD_TEMPLATES,))
|
self._submit_and_wait(self._evaluator_queues, (RELOAD_TEMPLATES,))
|
||||||
|
|
||||||
|
def submit_read_db_graph(self):
|
||||||
|
"""Initialize the worker graph from database snapshot
|
||||||
|
|
||||||
|
So that new/deleted templates are added/removed
|
||||||
|
"""
|
||||||
|
LOG.info("Worker processes - loading graph...")
|
||||||
|
self._submit_and_wait(self._all_queues, (READ_DB_GRAPH,))
|
||||||
|
LOG.info("Worker processes - graph is ready")
|
||||||
|
|
||||||
|
def wait_for_worker_start(self):
|
||||||
|
"""Wait for responses from all workers
|
||||||
|
|
||||||
|
So that new/deleted templates are added/removed
|
||||||
|
"""
|
||||||
|
LOG.info("Worker processes - starting...")
|
||||||
|
self._submit_and_wait(self._all_queues, (WAIT_FOR_WORKER_START,))
|
||||||
|
LOG.info("Worker processes - ready!")
|
||||||
|
|
||||||
def submit_template_event(self, event):
|
def submit_template_event(self, event):
|
||||||
"""Template worker to load the new/deleted template
|
"""Template worker to load the new/deleted template
|
||||||
|
|
||||||
@ -150,6 +171,9 @@ class GraphWorkersManager(cotyledon.ServiceManager):
|
|||||||
"""
|
"""
|
||||||
template_action = event.get(TEMPLATE_ACTION)
|
template_action = event.get(TEMPLATE_ACTION)
|
||||||
|
|
||||||
|
if not self._db:
|
||||||
|
self._db = storage.get_connection_from_config(self._conf)
|
||||||
|
|
||||||
if template_action == ADD:
|
if template_action == ADD:
|
||||||
templates = self._db.templates.query(status=TStatus.LOADING)
|
templates = self._db.templates.query(status=TStatus.LOADING)
|
||||||
new_status = TStatus.ACTIVE
|
new_status = TStatus.ACTIVE
|
||||||
@ -182,20 +206,19 @@ class GraphWorkersManager(cotyledon.ServiceManager):
|
|||||||
q.join()
|
q.join()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _stop():
|
def _force_stop():
|
||||||
raise SystemExit(0)
|
os._exit(0)
|
||||||
|
|
||||||
|
|
||||||
class GraphCloneWorkerBase(cotyledon.Service):
|
class GraphCloneWorkerBase(cotyledon.Service):
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
worker_id,
|
worker_id,
|
||||||
conf,
|
conf,
|
||||||
task_queues,
|
task_queues):
|
||||||
entity_graph):
|
|
||||||
super(GraphCloneWorkerBase, self).__init__(worker_id)
|
super(GraphCloneWorkerBase, self).__init__(worker_id)
|
||||||
self._conf = conf
|
self._conf = conf
|
||||||
self._task_queue = task_queues[worker_id]
|
self._task_queue = task_queues[worker_id]
|
||||||
self._entity_graph = entity_graph
|
self._entity_graph = NXGraph()
|
||||||
|
|
||||||
name = 'GraphCloneWorkerBase'
|
name = 'GraphCloneWorkerBase'
|
||||||
|
|
||||||
@ -205,9 +228,14 @@ class GraphCloneWorkerBase(cotyledon.Service):
|
|||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
LOG.info("%s - Starting %s", self.__class__.__name__, self.worker_id)
|
|
||||||
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
|
self._entity_graph.notifier._subscriptions = [] # Quick n dirty
|
||||||
self._init_instance()
|
self._init_instance()
|
||||||
|
if self._entity_graph.num_vertices():
|
||||||
|
LOG.info("%s - Started %s (%s vertices)", self.__class__.__name__,
|
||||||
|
self.worker_id, self._entity_graph.num_vertices())
|
||||||
|
else:
|
||||||
|
LOG.info("%s - Started empty %s", self.__class__.__name__,
|
||||||
|
self.worker_id)
|
||||||
self._read_queue()
|
self._read_queue()
|
||||||
|
|
||||||
def _read_queue(self):
|
def _read_queue(self):
|
||||||
@ -226,6 +254,11 @@ class GraphCloneWorkerBase(cotyledon.Service):
|
|||||||
if action == GRAPH_UPDATE:
|
if action == GRAPH_UPDATE:
|
||||||
(action, before, current, is_vertex) = task
|
(action, before, current, is_vertex) = task
|
||||||
self._graph_update(before, current, is_vertex)
|
self._graph_update(before, current, is_vertex)
|
||||||
|
elif action == READ_DB_GRAPH:
|
||||||
|
self._read_db_graph()
|
||||||
|
elif action == WAIT_FOR_WORKER_START:
|
||||||
|
# Nothing to do, manager is just verifying this worker is alive
|
||||||
|
pass
|
||||||
|
|
||||||
def _graph_update(self, before, current, is_vertex):
|
def _graph_update(self, before, current, is_vertex):
|
||||||
if current:
|
if current:
|
||||||
@ -239,16 +272,23 @@ class GraphCloneWorkerBase(cotyledon.Service):
|
|||||||
else:
|
else:
|
||||||
self._entity_graph.remove_edge(before)
|
self._entity_graph.remove_edge(before)
|
||||||
|
|
||||||
|
def _read_db_graph(self):
|
||||||
|
db = storage.get_connection_from_config(self._conf)
|
||||||
|
graph_snapshot = db.graph_snapshots.query()
|
||||||
|
NXGraph.read_gpickle(graph_snapshot.graph_snapshot, self._entity_graph)
|
||||||
|
GraphPersistency.do_replay_events(db, self._entity_graph,
|
||||||
|
graph_snapshot.event_id)
|
||||||
|
self._entity_graph.ready = True
|
||||||
|
|
||||||
|
|
||||||
class EvaluatorWorker(GraphCloneWorkerBase):
|
class EvaluatorWorker(GraphCloneWorkerBase):
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
worker_id,
|
worker_id,
|
||||||
conf,
|
conf,
|
||||||
task_queues,
|
task_queues,
|
||||||
e_graph,
|
|
||||||
workers_num):
|
workers_num):
|
||||||
super(EvaluatorWorker, self).__init__(
|
super(EvaluatorWorker, self).__init__(
|
||||||
worker_id, conf, task_queues, e_graph)
|
worker_id, conf, task_queues)
|
||||||
self._workers_num = workers_num
|
self._workers_num = workers_num
|
||||||
self._evaluator = None
|
self._evaluator = None
|
||||||
|
|
||||||
@ -312,7 +352,6 @@ class ApiWorker(GraphCloneWorkerBase):
|
|||||||
|
|
||||||
def _init_instance(self):
|
def _init_instance(self):
|
||||||
conf = self._conf
|
conf = self._conf
|
||||||
LOG.info("Vitrage Api Handler Service - Starting...")
|
|
||||||
notifier = messaging.VitrageNotifier(conf, "vitrage.api",
|
notifier = messaging.VitrageNotifier(conf, "vitrage.api",
|
||||||
[EVALUATOR_TOPIC])
|
[EVALUATOR_TOPIC])
|
||||||
db = storage.get_connection_from_config(conf)
|
db = storage.get_connection_from_config(conf)
|
||||||
@ -326,10 +365,9 @@ class ApiWorker(GraphCloneWorkerBase):
|
|||||||
TemplateApis(notifier, db),
|
TemplateApis(notifier, db),
|
||||||
EventApis(conf),
|
EventApis(conf),
|
||||||
ResourceApis(self._entity_graph, conf),
|
ResourceApis(self._entity_graph, conf),
|
||||||
WebhookApis(conf)]
|
WebhookApis(conf),
|
||||||
|
OperationalApis(conf, self._entity_graph)]
|
||||||
|
|
||||||
server = vitrage_rpc.get_server(target, endpoints, transport)
|
server = vitrage_rpc.get_server(target, endpoints, transport)
|
||||||
|
|
||||||
server.start()
|
server.start()
|
||||||
|
|
||||||
LOG.info("Vitrage Api Handler Service - Started!")
|
|
||||||
|
@ -217,4 +217,5 @@ class ScenarioRepository(object):
|
|||||||
|
|
||||||
def log_enabled_scenarios(self):
|
def log_enabled_scenarios(self):
|
||||||
scenarios = [s for s in self._all_scenarios if s.enabled]
|
scenarios = [s for s in self._all_scenarios if s.enabled]
|
||||||
LOG.info("Scenarios:\n%s", sorted([s.id for s in scenarios]))
|
if scenarios:
|
||||||
|
LOG.info("Scenarios:\n%s", sorted([s.id for s in scenarios]))
|
||||||
|
@ -55,6 +55,7 @@ class NXGraph(Graph):
|
|||||||
self._g = nx.MultiDiGraph()
|
self._g = nx.MultiDiGraph()
|
||||||
self.add_vertices(vertices)
|
self.add_vertices(vertices)
|
||||||
self.add_edges(edges)
|
self.add_edges(edges)
|
||||||
|
self.ready = False
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
return len(self._g)
|
return len(self._g)
|
||||||
|
@ -102,8 +102,6 @@ def _normalize_path_to_datasource_name(path_list, top=os.getcwd()):
|
|||||||
def register_opts(conf, package_name, paths):
|
def register_opts(conf, package_name, paths):
|
||||||
"""register opts of package package_name, with base path in paths"""
|
"""register opts of package package_name, with base path in paths"""
|
||||||
for path in paths:
|
for path in paths:
|
||||||
LOG.info("package name: %s" % package_name)
|
|
||||||
LOG.info("path: % s" % path)
|
|
||||||
try:
|
try:
|
||||||
opt = importutils.import_module(
|
opt = importutils.import_module(
|
||||||
"%s.%s" % (path, package_name)).OPTS
|
"%s.%s" % (path, package_name)).OPTS
|
||||||
@ -113,5 +111,6 @@ def register_opts(conf, package_name, paths):
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
except ImportError:
|
except ImportError:
|
||||||
LOG.error("Failed to register config options for %s" %
|
pass
|
||||||
package_name)
|
LOG.error("Failed to import config options for %s. Not found in %s",
|
||||||
|
package_name, str(paths))
|
||||||
|
@ -90,7 +90,7 @@ def get_server(target, endpoints, transport, serializer=None):
|
|||||||
assert transport is not None
|
assert transport is not None
|
||||||
|
|
||||||
if profiler:
|
if profiler:
|
||||||
LOG.info('profiler enabled for RPC server')
|
LOG.debug('profiler enabled for RPC server')
|
||||||
serializer = ProfilerContextSerializer(serializer=serializer)
|
serializer = ProfilerContextSerializer(serializer=serializer)
|
||||||
|
|
||||||
access_policy = dispatcher.DefaultRPCAccessPolicy
|
access_policy = dispatcher.DefaultRPCAccessPolicy
|
||||||
|
Loading…
x
Reference in New Issue
Block a user