Owner information and middleware for context

This commit is contained in:
John Bresnahan 2013-05-20 11:16:19 -10:00
parent 05acef2cbe
commit 76e191efeb
12 changed files with 357 additions and 200 deletions

View File

@ -1,5 +1,5 @@
[pipeline:staccato-api]
pipeline = rootapp
pipeline = unauthenticated-context rootapp
[app:rootapp]
use = egg:Paste#urlmap
@ -12,4 +12,8 @@ openstack.app_factory = staccato.api.versions:VersionApp
[app:apiv1app]
paste.app_factory = staccato.openstack.common.pastedeploy:app_factory
openstack.app_factory = staccato.api.v1.router:API
openstack.app_factory = staccato.api.v1.xfer:API
[filter:unauthenticated-context]
paste.filter_factory = staccato.openstack.common.pastedeploy:filter_factory
openstack.filter_factory = staccato.api.v1.xfer:UnauthTestMiddleware

View File

@ -1,5 +0,0 @@
import paste.urlmap
def root_app_factory(loader, global_conf, **local_conf):
return paste.urlmap.urlmap_factory(loader, global_conf, **local_conf)

View File

@ -1,139 +0,0 @@
import json
import urlparse
import routes
import staccato.openstack.common.wsgi as os_wsgi
import staccato.xfer.executor as executor
import staccato.xfer.events as xfer_events
from staccato import db
from staccato.xfer.constants import Events
from staccato.common import config
from staccato.common import utils
class XferController(object):
def __init__(self, conf, sm):
self.sm = sm
self.conf = conf
self.db_con = db.StaccatoDB(conf)
def _xfer_from_db(self, xfer_id):
return self.db_con.lookup_xfer_request_by_id(xfer_id)
def _to_state_machine(self, event, xfer_request):
self.sm.event_occurred(event,
conf=self.conf,
xfer_request=xfer_request,
db=self.db_con)
def urlxfer(self, request, srcurl, dsturl, dstopts=None, srcopts=None,
start_ndx=0, end_ndx=None):
srcurl_parts = urlparse.urlparse(srcurl)
dsturl_parts = urlparse.urlparse(dsturl)
plugin_policy = config.get_protocol_policy(self.conf)
src_module_name = utils.find_protocol_module_name(plugin_policy,
srcurl_parts)
dst_module_name = utils.find_protocol_module_name(plugin_policy,
dsturl_parts)
src_module = utils.load_protocol_module(src_module_name, self.conf)
dst_module = utils.load_protocol_module(dst_module_name, self.conf)
write_info = dst_module.new_write(dsturl_parts, dstopts)
read_info = src_module.new_write(srcurl_parts, srcopts)
db_con = db.StaccatoDB(self.conf)
xfer = db_con.get_new_xfer(srcurl,
dsturl,
src_module_name,
dst_module_name,
start_ndx=start_ndx,
end_ndx=end_ndx,
read_info=read_info,
write_info=write_info)
return xfer
def status(self, request, xfer_id):
xfer = self._xfer_from_db(xfer_id)
return self._xfer_to_dict(xfer)
def _xfer_to_dict(self, x):
d = {}
d['id'] = x.id
d['srcurl'] = x.srcurl
d['dsturl'] = x.dsturl
d['state'] = x.state
d['progress'] = x.next_ndx
return d
def delete(self, request, xfer_id):
xfer_request = self._xfer_from_db(xfer_id)
self._to_state_machine(Events.EVENT_DELETE, xfer_request)
def cancel(self, request, xfer_id):
xfer_request = self._xfer_from_db(xfer_id)
self._to_state_machine(Events.EVENT_CANCEL, xfer_request)
class LDeserial(os_wsgi.RequestHeadersDeserializer):
"""Default request headers deserializer"""
meta_string = 'x-xfer-'
def _pullout_xxfers(self, request):
d = {}
for h in request.headers:
if h.lower().startswith(self.meta_string):
key = h[len(self.meta_string):].lower().replace("-", "_")
val = request.headers[h]
d[key] = val
return d
def default(self, request):
return self._pullout_xxfers(request)
class LSerial(os_wsgi.DictSerializer):
def serialize(self, data, action='default', *args):
return super(LSerial, self).serialize(data, args[0])
def urlxfer(self, data):
x = data
d = {}
d['id'] = x.id
d['srcurl'] = x.srcurl
d['dsturl'] = x.dsturl
d['state'] = x.state
d['progress'] = x.next_ndx
return json.dumps(d)
class API(os_wsgi.Router):
def __init__(self, conf):
self.conf = conf
self.executor = executor.SimpleThreadExecutor(self.conf)
self.sm = xfer_events.XferStateMachine(self.executor)
controller = XferController(self.conf, self.sm)
mapper = routes.Mapper()
deserializer = os_wsgi.RequestDeserializer(
headers_deserializer=LDeserial())
serializer = LSerial()
sc = os_wsgi.Resource(controller,
deserializer=deserializer,
serializer=serializer)
mapper.connect(None, "/urlxfer", controller=sc, action="urlxfer")
mapper.connect(None, "/status", controller=sc, action="status")
# Actions are all implicitly defined
#mapper.resource("server", "servers", controller=controller)
super(API, self).__init__(mapper)

View File

@ -1,33 +1,222 @@
import httplib
import json
import logging
import urlparse
import webob.dec
import webob.exc
import routes
from webob.exc import (HTTPError,
HTTPNotFound,
HTTPConflict,
HTTPBadRequest,
HTTPForbidden,
HTTPRequestEntityTooLarge,
HTTPInternalServerError,
HTTPServiceUnavailable)
import staccato.openstack.common.wsgi as os_wsgi
import staccato.db as db
import staccato.openstack.common.middleware.context as os_context
import staccato.xfer.executor as executor
import staccato.xfer.events as xfer_events
from staccato import db
from staccato.xfer.constants import Events
from staccato.common import config, exceptions
from staccato.common import utils
class XferApp(object):
"""
A single WSGI application that just returns version information
"""
def __init__(self, conf):
class UnauthTestMiddleware(os_context.ContextMiddleware):
def __init__(self, app, options):
self.options = options
super(UnauthTestMiddleware, self).__init__(app, options)
def process_request(self, req):
req.context = self.make_context(is_admin=True,
user='admin')
req.context.owner = 'admin'
class XferController(object):
def __init__(self, db_con, sm, conf):
self.sm = sm
self.db_con = db_con
self.log = logging
self.conf = conf
self.db = db.StaccatoDB(self.conf)
@webob.dec.wsgify(RequestClass=os_wsgi.Request)
def __call__(self, req):
version_info = {
'id': self.conf.id,
'version': self.conf.version,
'status': 'active'
}
version_objs = [version_info]
def _xfer_from_db(self, xfer_id, request):
try:
return self.db_con.lookup_xfer_request_by_id(
xfer_id, owner=request.context.owner)
except exceptions.StaccatoNotFoundInDBException, db_ex:
raise HTTPNotFound(explanation="No such ID %s" % xfer_id,
content_type="text/plain")
response = webob.Response(request=req,
status=httplib.MULTIPLE_CHOICES,
content_type='application/json')
response.body = json.dumps(dict(versions=version_objs))
return response
def _to_state_machine(self, event, xfer_request, name):
try:
self.sm.event_occurred(event,
xfer_request=xfer_request,
db=self.db_con)
except exceptions.StaccatoInvalidStateTransitionException, ex:
msg = _('You cannot %s a transfer that is in the %s '
'state. %s' % (name, xfer_request.state, ex))
self._log_request(logging.INFO, msg)
raise HTTPBadRequest(explanation=msg,
content_type="text/plain")
def urlxfer(self, request, srcurl, dsturl, dstopts=None, srcopts=None,
start_ndx=0, end_ndx=None):
srcurl_parts = urlparse.urlparse(srcurl)
dsturl_parts = urlparse.urlparse(dsturl)
plugin_policy = config.get_protocol_policy(self.conf)
src_module_name = utils.find_protocol_module_name(plugin_policy,
srcurl_parts)
dst_module_name = utils.find_protocol_module_name(plugin_policy,
dsturl_parts)
src_module = utils.load_protocol_module(src_module_name, self.conf)
dst_module = utils.load_protocol_module(dst_module_name, self.conf)
write_info = dst_module.new_write(dsturl_parts, dstopts)
read_info = src_module.new_write(srcurl_parts, srcopts)
db_con = db.StaccatoDB(self.conf)
xfer = db_con.get_new_xfer(request.context.owner,
srcurl,
dsturl,
src_module_name,
dst_module_name,
start_ndx=start_ndx,
end_ndx=end_ndx,
read_info=read_info,
write_info=write_info)
return xfer
def status(self, request, xfer_id):
xfer = self._xfer_from_db(xfer_id, request)
return xfer
def list(self, request, limit=None):
return self.db_con.lookup_xfer_request_all(owner=request.context.owner)
def _xfer_to_dict(self, x):
d = {}
d['id'] = x.id
d['srcurl'] = x.srcurl
d['dsturl'] = x.dsturl
d['state'] = x.state
d['progress'] = x.next_ndx
return d
def delete(self, request, xfer_id):
xfer_request = self._xfer_from_db(xfer_id, request)
self._to_state_machine(Events.EVENT_DELETE,
xfer_request,
'delete')
def cancel(self, request, xfer_id):
xfer_request = self._xfer_from_db(xfer_id, request)
self._to_state_machine(Events.EVENT_CANCEL,
xfer_request,
'cancel')
def _log_request(self, level, msg, ex=None):
# reformat the exception with context, user info, etc
if ex:
self.log.exception(msg)
self.log.log(level, msg)
class XferDeserializer(os_wsgi.RequestHeadersDeserializer):
"""Default request headers deserializer"""
meta_string = 'x-xfer-'
def _pullout_xxfers(self, request):
d = {}
for h in request.headers:
if h.lower().startswith(self.meta_string):
key = h[len(self.meta_string):].lower().replace("-", "_")
val = request.headers[h]
d[key] = val
return d
def default(self, request):
return self._pullout_xxfers(request)
class XferSerializer(os_wsgi.DictSerializer):
def serialize(self, data, action='default', *args):
return super(XferSerializer, self).serialize(data, args[0])
def status(self, data):
return self._xfer_to_json(data)
def cancel(self, data):
return self._xfer_to_json(data)
def delete(self, data):
return self._xfer_to_json(data)
def urlxfer(self, data):
return self._xfer_to_json(data)
def _xfer_to_json(self, data):
x = data
d = {}
d['id'] = x.id
d['srcurl'] = x.srcurl
d['dsturl'] = x.dsturl
d['state'] = x.state
d['progress'] = x.next_ndx
return json.dumps(d)
def list(self, data):
xfer_list = []
for xfer in data:
xfer_list.append(xfer.id)
return json.dumps(xfer_list)
class API(os_wsgi.Router):
def __init__(self, conf):
self.conf = conf
self.db_con = db.StaccatoDB(conf)
self.executor = executor.SimpleThreadExecutor(self.conf)
self.sm = xfer_events.XferStateMachine(self.executor)
controller = XferController(self.db_con, self.sm, self.conf)
mapper = routes.Mapper()
deserializer = os_wsgi.RequestDeserializer(
headers_deserializer=XferDeserializer())
serializer = XferSerializer()
sc = os_wsgi.Resource(controller,
deserializer=deserializer,
serializer=serializer)
mapper.connect(None,
"/urlxfer",
controller=sc,
action="urlxfer")
mapper.connect(None,
"/status/{xfer_id}",
controller=sc,
action="status")
mapper.connect(None,
"/cancel/{xfer_id}",
controller=sc,
action="cancel")
mapper.connect(None,
"/delete/{xfer_id}",
controller=sc,
action="delete")
mapper.connect(None,
"/list",
controller=sc,
action="list")
super(API, self).__init__(mapper)

View File

@ -1,7 +1,6 @@
import eventlet
import gettext
import sys
import time
from staccato.common import config
import staccato.openstack.common.wsgi as os_wsgi
@ -20,7 +19,6 @@ def fail(returncode, e):
def main():
try:
#config.parse_args(sys.argv)
conf = config.get_config_object()
paste_file = conf.paste_deploy.config_file
wsgi_app = os_pastedeploy.paste_deploy_app(paste_file,
@ -32,5 +30,4 @@ def main():
except RuntimeError as e:
fail(1, e)
main()

View File

@ -38,3 +38,11 @@ class StaccatoEventException(StaccatoBaseException):
class StaccatoInvalidStateTransitionException(StaccatoEventException):
pass
class StaccatoDatabaseException(StaccatoBaseException):
pass
class StaccatoNotFoundInDBException(StaccatoDataBaseException):
pass

View File

@ -7,10 +7,16 @@ class StateMachine(object):
# set up the transition table
self._transitions = {}
self._state_funcs = {}
self._state_observer_funcs = {}
def set_state_func(self, state, func):
self._state_funcs[state] = func
def set_state_observer(self, state, func):
if state not in self._state_observer_funcs:
self._state_observer_funcs[state] = []
self._state_observer_funcs[state].append(func)
def set_mapping(self, state, event, next_state, func=None):
if state not in self._transitions:
self._transitions[state] = {}
@ -44,6 +50,12 @@ class StateMachine(object):
next_state, function = state_ent[event]
self._state_changed(current_state, event, next_state, **kwvals)
# call all observors. They are not allowed to effect state change
for f in self._state_observer_funcs:
try:
f(current_state, event, next_state, **kwvals)
except Exception, ex:
raise
# log the change
if function:
try:

View File

@ -1,8 +1,8 @@
import logging
import time
import sqlalchemy
import sqlalchemy.orm as sa_orm
import sqlalchemy.orm.exc as orm_exc
import sqlalchemy.sql.expression as sql_expression
from staccato.db import migration, models
@ -28,6 +28,7 @@ class StaccatoDB(object):
return self.maker()
def get_new_xfer(self,
owner,
srcurl,
dsturl,
src_module_name,
@ -43,6 +44,7 @@ class StaccatoDB(object):
with session.begin():
xfer_request = models.XferRequest()
xfer_request.owner = owner
xfer_request.srcurl = srcurl
xfer_request.dsturl = dsturl
xfer_request.src_module_name = src_module_name
@ -65,31 +67,94 @@ class StaccatoDB(object):
session.add(db_obj)
session.flush()
def lookup_xfer_request_by_id(self, xfer_id, session=None):
def lookup_xfer_request_by_id(self, xfer_id, owner=None, session=None):
try:
if session is None:
session = self.get_sessions()
with session.begin():
query = session.query(models.XferRequest)
if owner is not None:
query = query.filter(sql_expression.and_(
models.XferRequest.owner == owner,
models.XferRequest.id == xfer_id))
else:
query = query.filter(models.XferRequest.id == xfer_id)
xfer_request = query.one()
return xfer_request
except orm_exc.NoResultFound, nf_ex:
raise exceptions.StaccatoNotFoundInDBException(nf_ex)
except Exception, ex:
raise exceptions.StaccatoDataBaseException(ex)
def lookup_xfer_request_all(self, owner=None, session=None):
try:
if session is None:
session = self.get_sessions()
with session.begin():
query = session.query(models.XferRequest)
if owner is not None:
query = query.filter(models.XferRequest.owner == owner)
xfer_requests = query.all()
return xfer_requests
except orm_exc.NoResultFound, nf_ex:
raise exceptions.StaccatoNotFoundInDBException(nf_ex)
except Exception, ex:
raise exceptions.StaccatoDataBaseException(ex)
def get_all_ready(self, owner=None, limit=None, session=None):
if session is None:
session = self.get_sessions()
with session.begin():
query = session.query(models.XferRequest)\
.filter(models.XferRequest.id == xfer_id)
xfer_request = query.one()
query = session.query(models.XferRequest)
if owner is not None:
query = query.filter(sql_expression.and_(
models.XferRequest.owner == owner,
sql_expression.or_(
models.XferRequest.state == constants.States.STATE_NEW,
models.XferRequest.state == constants.States.STATE_ERROR)))
else:
query = query.filter(sql_expression.or_(
models.XferRequest.state == constants.States.STATE_NEW,
models.XferRequest.state == constants.States.STATE_ERROR))
return xfer_request
def get_all_ready(self, limit=None, session=None):
if session is None:
session = self.get_sessions()
with session.begin():
query = session.query(models.XferRequest)\
.filter(
sql_expression.or_(models.XferRequest.state == constants.State.STATE_NEW,
models.XferRequest.state == constants.State.STATE_ERROR))
if limit is not None:
query = query.limit(limit)
xfer_requests = query.all()
return xfer_requests
def get_all_running(self, owner=None, limit=None, session=None):
if session is None:
session = self.get_sessions()
with session.begin():
query = session.query(models.XferRequest)
if owner is not None:
query = query.filter(sql_expression.and_(
models.XferRequest.owner == owner,
models.XferRequest.state == constants.States.STATE_RUNNING))
if limit is not None:
query = query.limit(limit)
xfer_requests = query.all()
return xfer_requests
def get_xfer_requests(self, ids, owner=None, session=None):
if session is None:
session = self.get_sessions()
with session.begin():
query = session.query(models.XferRequest)
if owner is not None:
query = query.filter(
sql_expression.and_(models.XferRequest.owner == owner,
models.XferRequest.id.in_(ids)))
else:
query = query.filter(models.XferRequest.id.in_(ids))
xfer_requests = query.all()
return xfer_requests
def delete_db_obj(self, db_obj, session=None):
if session is None:
session = self.get_sessions()

View File

@ -34,6 +34,7 @@ class XferRequest(BASE, ModelBase):
id = Column(String(36), primary_key=True, default=uuidutils.generate_uuid)
srcurl = Column(String(2048), nullable=False)
dsturl = Column(String(2048), nullable=False)
owner = Column(String(128), nullable=False)
src_module_name = Column(String(512), nullable=False)
dst_module_name = Column(String(512), nullable=False)
state = Column(Integer(), nullable=False)

View File

@ -1 +0,0 @@
__author__ = 'jbresnah'

View File

@ -1,24 +1,49 @@
import time
import staccato.openstack.common.service as os_service
import staccato.xfer.events as s_events
import staccato.xfer.executor as s_executor
import staccato.xfer.constants as s_constants
from staccato.xfer.constants import Events
import staccato.db as s_db
class SimpleCountSchedler(object):
class SimpleCountSchedler(os_service.Service):
def __init__(self, db_obj, max_at_once=4):
self.max_at_once = max_at_once
self.db_obj = db_obj
def __init__(self, conf):
super(SimpleCountSchedler, self).__init__()
self.max_at_once = 1
self.db_obj = s_db.StaccatoDB(conf)
self.executor = s_executor.SimpleThreadExecutor(conf) # todo, pull from conf
self.state_machine = s_events.XferStateMachine(self.executor)
self.running = 0
self.done = False
self._started_ids = []
def _poll_db(self):
while not self.done:
time.sleep(1)
self._check_for_transfers()
def _new_transfer(self, request):
self.running += 1
# todo start the transfer
self._started_ids.append(request.id)
self.state_machine.event_occurred(Events.EVENT_START,
xfer_request=request,
db=self.db_obj)
def _transfer_complete(self):
self.running -= 1
def _check_for_transfers(self):
avail = self.max_at_once - self.running
requests = self.db_obj.get_xfer_requests(self._started_ids)
for r in requests:
if s_constants.is_state_done_running(r.state) :
self._started_ids.remove(r.id)
avail = self.max_at_once - len(self._started_ids)
xfer_request_ready = self.db_obj.get_all_ready(limit=avail)
for request in xfer_request_ready:
self._new_transfer(request)
def poll(self):
self._check_for_transfers()
def start(self):
self.tg.add_thread(self._poll_db)

View File

@ -29,20 +29,19 @@ class XferStateMachine(state_machine.StateMachine):
current_state,
event,
new_state,
conf,
db,
xfer_request,
**kwvals):
"""
This handler just allows for the DB change.
"""
pass
def state_running_handler(
self,
current_state,
event,
new_state,
conf,
db,
xfer_request,
**kwvals):
@ -54,7 +53,6 @@ class XferStateMachine(state_machine.StateMachine):
current_state,
event,
new_state,
conf,
db,
xfer_request,
**kwvals):
@ -85,6 +83,9 @@ class XferStateMachine(state_machine.StateMachine):
self.set_mapping(constants.States.STATE_NEW,
constants.Events.EVENT_CANCEL,
constants.States.STATE_CANCELED)
self.set_mapping(constants.States.STATE_NEW,
constants.Events.EVENT_DELETE,
constants.States.STATE_DELETED)
self.set_mapping(constants.States.STATE_CANCELED,
constants.Events.EVENT_DELETE,