MessageController implementation for mongodb storage

This patch implements MessageController for the mongodb storage. In
order to test the changes made here, it was necessary to modify current
base classes.

This patch also adds more granular exceptions (QueueDoesNotExist and
MessageDoesNotExist) both based on the existing exception DoesNotExist.

Implements blueprint storage-base
Implements blueprint storage-mongodb

Change-Id: I0752c5d797104a9cc58efd8e866b3b627646bce5
This commit is contained in:
Flaper Fesp 2013-03-21 10:51:17 +01:00
parent 9cde7d8c95
commit 1810ffa8c9
9 changed files with 471 additions and 18 deletions

View File

@ -0,0 +1,186 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Time related utilities and helper functions.
"""
import calendar
import datetime
import iso8601
# ISO 8601 extended time format with microseconds
_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
def isotime(at=None, subsecond=False):
"""Stringify time in ISO 8601 format"""
if not at:
at = utcnow()
st = at.strftime(_ISO8601_TIME_FORMAT
if not subsecond
else _ISO8601_TIME_FORMAT_SUBSECOND)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
st += ('Z' if tz == 'UTC' else tz)
return st
def parse_isotime(timestr):
"""Parse time from ISO 8601 format"""
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(e.message)
except TypeError as e:
raise ValueError(e.message)
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
"""Returns formatted utcnow."""
if not at:
at = utcnow()
return at.strftime(fmt)
def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
"""Turn a formatted time back into a datetime."""
return datetime.datetime.strptime(timestr, fmt)
def normalize_time(timestamp):
"""Normalize time in arbitrary timezone to UTC naive object"""
offset = timestamp.utcoffset()
if offset is None:
return timestamp
return timestamp.replace(tzinfo=None) - offset
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
if isinstance(before, basestring):
before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
if isinstance(after, basestring):
after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds)
def utcnow_ts():
"""Timestamp version of our utcnow function."""
return calendar.timegm(utcnow().timetuple())
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
try:
return utcnow.override_time.pop(0)
except AttributeError:
return utcnow.override_time
return datetime.datetime.utcnow()
def iso8601_from_timestamp(timestamp):
"""Returns a iso8601 formated date from timestamp"""
return isotime(datetime.datetime.utcfromtimestamp(timestamp))
utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()):
"""
Override utils.utcnow to return a constant time or a list thereof,
one at a time.
"""
utcnow.override_time = override_time
def advance_time_delta(timedelta):
"""Advance overridden time using a datetime.timedelta."""
assert(not utcnow.override_time is None)
try:
for dt in utcnow.override_time:
dt += timedelta
except TypeError:
utcnow.override_time += timedelta
def advance_time_seconds(seconds):
"""Advance overridden time by seconds."""
advance_time_delta(datetime.timedelta(0, seconds))
def clear_time_override():
"""Remove the overridden time."""
utcnow.override_time = None
def marshall_now(now=None):
"""Make an rpc-safe datetime with microseconds.
Note: tzinfo is stripped, but not required for relative times."""
if not now:
now = utcnow()
return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
minute=now.minute, second=now.second,
microsecond=now.microsecond)
def unmarshall_time(tyme):
"""Unmarshall a datetime dict."""
return datetime.datetime(day=tyme['day'],
month=tyme['month'],
year=tyme['year'],
hour=tyme['hour'],
minute=tyme['minute'],
second=tyme['second'],
microsecond=tyme['microsecond'])
def delta_seconds(before, after):
"""
Compute the difference in seconds between two date, time, or
datetime objects (as a float, to microsecond resolution).
"""
delta = after - before
try:
return delta.total_seconds()
except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6))
def is_soon(dt, window):
"""
Determines if time is going to happen in the next window seconds.
:params dt: the time
:params window: minimum seconds to remain to consider the time not soon
:return: True if expiration is within the given duration
"""
soon = (utcnow() + datetime.timedelta(seconds=window))
return normalize_time(dt) <= soon

View File

@ -20,3 +20,20 @@ class DoesNotExist(Exception):
class NotPermitted(Exception):
pass
class QueueDoesNotExist(DoesNotExist):
def __init__(self, name, tenant):
msg = (_("Queue %(name)s does not exist for tenant %(tenant)s") %
dict(name=name, tenant=tenant))
super(QueueDoesNotExist, self).__init__(msg)
class MessageDoesNotExist(DoesNotExist):
def __init__(self, mid, queue, tenant):
msg = (_("Message %(mid)s does not exist in "
"queue %(queue)s of tenant %(tenant)s") %
dict(mid=mid, queue=queue, tenant=tenant))
super(MessageDoesNotExist, self).__init__(msg)

View File

@ -22,8 +22,12 @@ Fields Mapping:
letter of their long name. Fields mapping will be
updated and documented in each class.
"""
from bson import objectid
from marconi.openstack.common import timeutils
from marconi import storage
from marconi.storage import exceptions
from marconi.storage.mongodb import utils
class QueueController(storage.QueueBase):
@ -40,6 +44,7 @@ class QueueController(storage.QueueBase):
def __init__(self, *args, **kwargs):
super(QueueController, self).__init__(*args, **kwargs)
self._col = self.driver.db["queues"]
# NOTE(flaper87): This creates a unique compound index for
# tenant and name. Using tenant as the first field of the
# index allows for querying by tenant and tenant+name.
@ -47,23 +52,30 @@ class QueueController(storage.QueueBase):
# as specific tenant, for example. Order Matters!
self._col.ensure_index([("t", 1), ("n", 1)], unique=True)
@property
def _col(self):
return self.driver.db["queues"]
def list(self, tenant=None):
cursor = self._col.find({"t": tenant}, fields=["n", "m"])
cursor = self._col.find({"t": tenant}, fields=dict(n=1, m=1, _id=0))
for queue in cursor:
queue["name"] = queue.pop("n")
queue["metadata"] = queue.pop("m", {})
yield queue
def get(self, name, tenant=None):
queue = self._col.find_one({"t": tenant, "n": name}, fields=["m"])
def _get(self, name, tenant=None, fields={"m": 1, "_id": 0}):
queue = self._col.find_one({"t": tenant, "n": name}, fields=fields)
if queue is None:
msg = (_("Queue %(name)s does not exist for tenant %(tenant)s") %
dict(name=name, tenant=tenant))
raise storage.exceptions.DoesNotExist(msg)
raise exceptions.QueueDoesNotExist(name, tenant)
return queue
def get_id(self, name, tenant=None):
"""
Just like `get` method but returns the queue's id
:returns: Queue's `ObjectId`
"""
queue = self._get(name, tenant, fields=["_id"])
return queue.get("_id")
def get(self, name, tenant=None):
queue = self._get(name, tenant)
return queue.get("m", {})
def upsert(self, name, metadata, tenant=None):
@ -86,3 +98,121 @@ class QueueController(storage.QueueBase):
def actions(self, name, tenant=None, marker=None, limit=10):
raise NotImplementedError
class MessageController(storage.MessageBase):
"""
Messages:
Name Field
----------------
queue -> q
expires -> e
ttl -> t
uuid -> u
"""
def __init__(self, *args, **kwargs):
super(MessageController, self).__init__(*args, **kwargs)
# Make sure indexes exist before,
# doing anything.
self._col = self.driver.db["messages"]
self._col.ensure_index("q", 1)
self._col.ensure_index("u", 1)
self._col.ensure_index([("e", -1)])
def _get_queue_id(self, queue, tenant):
queue_controller = self.driver.queue_controller
return queue_controller.get_id(queue, tenant)
def list(self, queue, tenant=None, marker=None,
limit=10, echo=False, client_uuid=None):
query = {"e": {"$gt": timeutils.utcnow_ts()}}
query["q"] = self._get_queue_id(queue, tenant)
if not echo and client_uuid:
query["u"] = {"$ne": client_uuid}
if marker:
try:
query["_id"] = {"$gt": utils.to_oid(marker)}
except ValueError:
raise StopIteration
messages = self._col.find(query, limit=limit,
sort=[("_id", 1)])
now = timeutils.utcnow_ts()
for msg in messages:
oid = msg.get("_id")
age = now - utils.oid_ts(oid)
yield {
"id": str(oid),
"age": age,
"ttl": msg["t"],
"body": msg["b"],
"marker": str(oid),
}
def get(self, queue, message_id, tenant=None):
# Check whether the queue exists or not
self._get_queue_id(queue, tenant)
# Base query, always check expire time
query = {"e": {"$gt": timeutils.utcnow_ts()}}
mid = utils.to_oid(message_id)
#NOTE(flaper87): Not adding the queue filter
# since we already verified that it exists.
# Since mid is unique, it doesn't make
# sense to add an extra filter. This also
# reduces index hits and query time.
query["_id"] = mid
message = self._col.find_one(query)
if message is None:
raise exceptions.MessageDoesNotExist(mid, queue, tenant)
oid = message.get("_id")
age = timeutils.utcnow_ts() - utils.oid_ts(oid)
return {
"id": oid,
"age": age,
"ttl": message["t"],
"body": message["b"],
}
def post(self, queue, messages, tenant=None, client_uuid=None):
qid = self._get_queue_id(queue, tenant)
ids = []
def denormalizer(messages):
for msg in messages:
ttl = int(msg["ttl"])
oid = objectid.ObjectId()
ids.append(str(oid))
# Lets remove the timezone, we want it to be plain
# utc
expires = utils.oid_ts(oid) + ttl
yield {
"_id": oid,
"t": ttl,
"q": qid,
"e": expires,
"u": client_uuid,
"b": msg['body'] if 'body' in msg else {}
}
self._col.insert(denormalizer(messages), manipulate=False)
return ids
def delete(self, queue, message_id, tenant=None, claim=None):
self._get_queue_id(queue, tenant)
mid = utils.to_oid(message_id)
self._col.remove(mid)

View File

@ -0,0 +1,50 @@
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import calendar
from bson import errors as berrors
from bson import objectid
from marconi.openstack.common import timeutils
def to_oid(obj):
"""
Creates a new ObjectId based on the input
and raises ValueError whenever a TypeError
or InvalidId error is raised by the ObjectID
class.
:param obj: Anything that can be passed as an
input to `objectid.ObjectId`
"""
try:
return objectid.ObjectId(obj)
except (TypeError, berrors.InvalidId):
msg = _("Wrong id %s") % obj
raise ValueError(msg)
def oid_ts(oid):
"""
Creates a non-tz-aware timestamp based on
the incoming object id datetime information.
"""
try:
norm_time = timeutils.normalize_time(oid.generation_time)
return calendar.timegm(norm_time.timetuple())
except AttributeError:
raise TypeError(_("Expected ObjectId and got %s") % type(oid))

View File

@ -1,6 +1,6 @@
[drivers]
transport = wsgi
storage = mongodb
transport = marconi.transport.wsgi
storage = marconi.storage.mongodb
[drivers:transport:wsgi]
port = 8888

View File

@ -53,6 +53,7 @@ class QueueControllerTest(ControllerBaseTest):
counter = 0
for queue in queues:
self.assertEqual(len(queue), 2)
self.assertIn("name", queue)
self.assertIn("metadata", queue)
counter += 1
@ -101,18 +102,32 @@ class MessageControllerTest(ControllerBaseTest):
super(MessageControllerTest, self).setUp()
# Lets create a queue
self.queue_controller = self.driver.queue_controller()
self.queue_controller.create(self.queue_name)
self.queue_controller = self.driver.queue_controller
self.queue_controller.upsert(self.queue_name, {},
tenant=self.tenant)
def tearDown(self):
self.queue_controller.delete(self.queue_name)
super(MessageControllerTest, self).tearDown()
def insert_fixtures(self, client_uuid=None, num=4):
def messages():
for n in xrange(num):
yield {
"ttl": 60,
"body": {
"event": "Event number %s" % n
}}
self.controller.post(self.queue_name, messages(),
tenant=self.tenant, client_uuid=client_uuid)
def test_message_lifecycle(self):
queue_name = self.queue_name
messages = [
{
"ttl": 60,
"body": {
"event": "BackupStarted",
"backupId": "c378813c-3f0b-11e2-ad92-7823d2b0f3ce"
@ -121,8 +136,8 @@ class MessageControllerTest(ControllerBaseTest):
]
# Test Message Creation
created = self.controller.post(queue_name, messages,
tenant=self.tenant)
created = list(self.controller.post(queue_name, messages,
tenant=self.tenant))
self.assertEqual(len(created), 1)
# Test Message Get
@ -134,4 +149,35 @@ class MessageControllerTest(ControllerBaseTest):
# Test DoesNotExist
self.assertRaises(storage.exceptions.DoesNotExist,
self.controller.get,
queue_name, created[0], tenant=self.tenant)
queue_name, message_id=created[0],
tenant=self.tenant)
def test_qet_multi(self):
self.insert_fixtures(client_uuid="my_uuid", num=20)
def load_messages(expected, *args, **kwargs):
msgs = list(self.controller.list(*args, **kwargs))
self.assertEqual(len(msgs), expected)
return msgs
# Test all messages, echo False and no uuid
load_messages(10, self.queue_name, tenant=self.tenant)
# Test all messages, echo False and uuid
load_messages(0, self.queue_name, tenant=self.tenant,
client_uuid="my_uuid")
# Test all messages and limit
load_messages(20, self.queue_name, tenant=self.tenant, limit=20)
# Test all messages, echo True, and uuid
msgs = load_messages(10, self.queue_name, echo=True,
tenant=self.tenant, client_uuid="my_uuid")
# Test all messages, echo False, no uuid and marker
msgs = load_messages(10, self.queue_name, tenant=self.tenant,
marker=msgs[4]["marker"])
# Test all messages, echo True, uuid and marker
load_messages(5, self.queue_name, echo=True, tenant=self.tenant,
marker=msgs[9]["marker"], client_uuid="my_uuid")

View File

@ -60,3 +60,26 @@ class MongodbQueueTests(base.QueueControllerTest):
col = self.controller._col
indexes = col.index_information()
self.assertIn("t_1_n_1", indexes)
class MongodbMessageTests(base.MessageControllerTest):
driver_class = mongodb.Driver
controller_class = controllers.MessageController
def setUp(self):
if not os.environ.get("MONGODB_TEST_LIVE"):
self.skipTest("No MongoDB instance running")
super(MongodbMessageTests, self).setUp()
self.load_conf("wsgi_mongodb.conf")
def tearDown(self):
self.controller._col.drop()
super(MongodbMessageTests, self).tearDown()
def test_indexes(self):
col = self.controller._col
indexes = col.index_information()
self.assertIn("q_1", indexes)
self.assertIn("e_-1", indexes)

View File

@ -1,3 +1,3 @@
[DEFAULT]
modules=importutils,setup,version
modules=importutils,setup,timeutils,version
base=marconi

View File

@ -6,3 +6,4 @@ PasteDeploy
pymongo
python-keystoneclient
WebOb
iso8601>=0.1.4