Add ability to purge streams on completion.

default to deleting streams once finished with them (if no error)
This commit is contained in:
Monsyne Dragon 2014-09-10 21:43:45 +00:00
parent 2d47fa6f6e
commit db434c24c4
7 changed files with 75 additions and 8 deletions

View File

@ -1,3 +1,9 @@
commit 2d47fa6f6e0de0a54975ff92fc87785a052d4371
Author: Monsyne Dragon <mdragon@rackspace.com>
Date: Mon Sep 8 23:02:52 2014 +0000
Add reset stream method.
commit ca0d09f7bc017ef9e372ae29a0c19bf20b68aca5 commit ca0d09f7bc017ef9e372ae29a0c19bf20b68aca5
Author: Monsyne Dragon <mdragon@rackspace.com> Author: Monsyne Dragon <mdragon@rackspace.com>
Date: Mon Sep 8 19:57:24 2014 +0000 Date: Mon Sep 8 19:57:24 2014 +0000

View File

@ -439,3 +439,11 @@ class TestDB(unittest.TestCase):
stream = self.db.get_stream_by_id(6) stream = self.db.get_stream_by_id(6)
stream = self.db.reset_stream(stream) stream = self.db.reset_stream(stream)
self.assertEqual(stream.state, models.StreamState.retry_expire) self.assertEqual(stream.state, models.StreamState.retry_expire)
def test_purge_stream(self):
stream = self.db.get_stream_by_id(1)
self.db.purge_stream(stream)
with self.assertRaises(db.NoSuchStreamError):
self.db.get_stream_by_id(1)

View File

@ -201,13 +201,23 @@ class TestPipelineManager(unittest.TestCase):
super(TestPipelineManager, self).setUp() super(TestPipelineManager, self).setUp()
@mock.patch.object(pipeline_manager.ConfigManager, 'wrap') @mock.patch.object(pipeline_manager.ConfigManager, 'wrap')
def test_complete_stream(self, mock_config_wrap): def test_complete_stream_nopurge(self, mock_config_wrap):
pm = pipeline_manager.PipelineManager('test') pm = pipeline_manager.PipelineManager('test')
pm.db = mock.MagicMock(spec=pm.db) pm.db = mock.MagicMock(spec=pm.db)
pm.purge_completed_streams = False
stream = "test stream" stream = "test stream"
pm._complete_stream(stream) pm._complete_stream(stream)
pm.db.set_stream_state.assert_called_once_with(stream, StreamState.completed) pm.db.set_stream_state.assert_called_once_with(stream, StreamState.completed)
@mock.patch.object(pipeline_manager.ConfigManager, 'wrap')
def test_complete_stream_purge(self, mock_config_wrap):
pm = pipeline_manager.PipelineManager('test')
pm.db = mock.MagicMock(spec=pm.db)
pm.purge_completed_streams = True
stream = "test stream"
pm._complete_stream(stream)
pm.db.purge_stream.assert_called_once_with(stream)
@mock.patch.object(pipeline_manager.ConfigManager, 'wrap') @mock.patch.object(pipeline_manager.ConfigManager, 'wrap')
def test_error_stream(self, mock_config_wrap): def test_error_stream(self, mock_config_wrap):
pm = pipeline_manager.PipelineManager('test') pm = pipeline_manager.PipelineManager('test')

View File

@ -1,4 +1,5 @@
from winchester.db.interface import DuplicateError, LockError from winchester.db.interface import DuplicateError, LockError
from winchester.db.interface import NoSuchEventError, NoSuchStreamError
from winchester.db.interface import DBInterface from winchester.db.interface import DBInterface

View File

@ -4,6 +4,8 @@ import sqlalchemy
from sqlalchemy import and_, or_ from sqlalchemy import and_, or_
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm.exc import MultipleResultsFound
from winchester import models from winchester import models
from winchester.config import ConfigManager, ConfigSection, ConfigItem from winchester.config import ConfigManager, ConfigSection, ConfigItem
@ -23,6 +25,14 @@ class LockError(models.DBException):
pass pass
class NoSuchEventError(models.DBException):
pass
class NoSuchStreamError(models.DBException):
pass
def sessioned(func): def sessioned(func):
def with_session(self, *args, **kw): def with_session(self, *args, **kw):
if 'session' in kw: if 'session' in kw:
@ -108,14 +118,20 @@ class DBInterface(object):
@sessioned @sessioned
def get_event_by_message_id(self, message_id, session=None): def get_event_by_message_id(self, message_id, session=None):
e = session.query(models.Event).\ try:
filter(models.Event.message_id == message_id).one() e = session.query(models.Event).\
filter(models.Event.message_id == message_id).one()
except NoResultFound:
raise NoSuchEventError("No event found with message_id %s!" % message_id)
return e.as_dict return e.as_dict
@sessioned @sessioned
def get_stream_by_id(self, stream_id, session=None): def get_stream_by_id(self, stream_id, session=None):
s = session.query(models.Stream).\ try:
filter(models.Stream.id == stream_id).one() s = session.query(models.Stream).\
filter(models.Stream.id == stream_id).one()
except NoResultFound:
raise NoSuchStreamError("No stream found with id %s!" % stream_id)
return s return s
@sessioned @sessioned
@ -217,3 +233,10 @@ class DBInterface(object):
if stream.state == models.StreamState.expire_error: if stream.state == models.StreamState.expire_error:
return self.set_stream_state(stream, models.StreamState.retry_expire) return self.set_stream_state(stream, models.StreamState.retry_expire)
return stream return stream
@sessioned
def purge_stream(self, stream, session=None):
if stream not in session:
session.add(stream)
session.delete(stream)

View File

@ -342,6 +342,7 @@ class Stream(ProxiedDictMixin, Base):
state_serial_no = Column(Integer, default=0, nullable=False) state_serial_no = Column(Integer, default=0, nullable=False)
distinguished_by = relationship("DistinguishingTrait", distinguished_by = relationship("DistinguishingTrait",
cascade="save-update, merge, delete, delete-orphan",
collection_class=attribute_mapped_collection('name')) collection_class=attribute_mapped_collection('name'))
_proxied = association_proxy("distinguished_by", "value", _proxied = association_proxy("distinguished_by", "value",
creator=lambda name, value: DistinguishingTrait(name=name, value=value)) creator=lambda name, value: DistinguishingTrait(name=name, value=value))

View File

@ -112,6 +112,8 @@ class PipelineManager(object):
pipeline_config=ConfigItem(required=True, pipeline_config=ConfigItem(required=True,
help="Name of pipeline config file " help="Name of pipeline config file "
"defining the handlers for each pipeline."), "defining the handlers for each pipeline."),
purge_completed_streams=ConfigItem(help="Delete successfully proccessed "
"streams when finished?", default=True),
) )
def __init__(self, config, db=None, pipeline_handlers=None, pipeline_config=None, trigger_defs=None): def __init__(self, config, db=None, pipeline_handlers=None, pipeline_config=None, trigger_defs=None):
@ -156,6 +158,7 @@ class PipelineManager(object):
self.pipeline_worker_batch_size = config['pipeline_worker_batch_size'] self.pipeline_worker_batch_size = config['pipeline_worker_batch_size']
self.pipeline_worker_delay = config['pipeline_worker_delay'] self.pipeline_worker_delay = config['pipeline_worker_delay']
self.statistics_period = config['statistics_period'] self.statistics_period = config['statistics_period']
self.purge_completed_streams = config['purge_completed_streams']
self.streams_fired = 0 self.streams_fired = 0
self.streams_expired = 0 self.streams_expired = 0
self.streams_loaded = 0 self.streams_loaded = 0
@ -209,13 +212,28 @@ class PipelineManager(object):
return True return True
def _complete_stream(self, stream): def _complete_stream(self, stream):
self.db.set_stream_state(stream, StreamState.completed) if self.purge_completed_streams:
self.db.purge_stream(stream)
else:
try:
self.db.set_stream_state(stream, StreamState.completed)
except LockError:
logger.error("Stream %s locked while trying to set 'complete' state! "
"This should not happen." % stream.id)
def _error_stream(self, stream): def _error_stream(self, stream):
self.db.set_stream_state(stream, StreamState.error) try:
self.db.set_stream_state(stream, StreamState.error)
except LockError:
logger.error("Stream %s locked while trying to set 'error' state! "
"This should not happen." % stream.id)
def _expire_error_stream(self, stream): def _expire_error_stream(self, stream):
self.db.set_stream_state(stream, StreamState.expire_error) try:
self.db.set_stream_state(stream, StreamState.expire_error)
except LockError:
logger.error("Stream %s locked while trying to set 'expire_error' state! "
"This should not happen." % stream.id)
def fire_stream(self, stream): def fire_stream(self, stream):
try: try: