Message support in SQlite.

* The JSON serialization is switched to msgpack;
 * list() on messages and stats() on queues fail loudly.

Implements: blueprint message-pagination

Change-Id: Iadb1ef348a53e0de28c6b45782ef686a3f1bf1e2
This commit is contained in:
Zhihao Yuan 2013-03-21 09:26:10 -04:00 committed by Zhihao Yuan
parent b24d01b709
commit fb5a45f0c4
4 changed files with 344 additions and 100 deletions

View File

@ -14,8 +14,6 @@
# limitations under the License. # limitations under the License.
import json
from marconi.storage import base from marconi.storage import base
from marconi.storage import exceptions from marconi.storage import exceptions
@ -23,63 +21,67 @@ from marconi.storage import exceptions
class Queue(base.QueueBase): class Queue(base.QueueBase):
def __init__(self, driver): def __init__(self, driver):
self.driver = driver self.driver = driver
self.driver._run('''create table if not exists Queues ( self.driver.run('''
id INTEGER, create table
tenant TEXT, if not exists
name TEXT, Queues (
metadata TEXT, id INTEGER,
PRIMARY KEY(id), tenant TEXT,
UNIQUE(tenant, name) name TEXT,
)''') metadata DOCUMENT,
self.driver._run('''create unique index if not exists Paths on Queues ( PRIMARY KEY(id),
tenant, name UNIQUE(tenant, name)
)''') )
''')
def list(self, tenant): def list(self, tenant):
records = self.driver._run('''select name, metadata from Queues where records = self.driver.run('''
tenant = ?''', tenant) select name, metadata from Queues
where tenant = ?''', tenant)
for k, v in records: for k, v in records:
yield {'name': k, 'metadata': v} yield {'name': k, 'metadata': v}
def get(self, name, tenant): def get(self, name, tenant):
sql = '''select metadata from Queues where
tenant = ? and name = ?'''
try: try:
return json.loads(self.driver._get(sql, tenant, name)[0]) return self.driver.get('''
except TypeError: select metadata from Queues
msg = (_("Queue %(name)s does not exist for tenant %(tenant)s") where tenant = ? and name = ?''', tenant, name)[0]
% dict(name=name, tenant=tenant))
raise exceptions.DoesNotExist(msg) except _NoResult:
_queue_doesnotexist(name, tenant)
def upsert(self, name, metadata, tenant): def upsert(self, name, metadata, tenant):
with self.driver: with self.driver('immediate'):
sql_select = '''select metadata from Queues where previous_record = self.driver.run('''
tenant = ? and name = ?''' select id from Queues
previous_record = self.driver._get(sql_select, tenant, name) where tenant = ? and name = ?
''', tenant, name).fetchone()
sql_replace = '''replace into Queues self.driver.run('''
values (null, ?, ?, ?)''' replace into Queues
doc = json.dumps(metadata, ensure_ascii=False) values (null, ?, ?, ?)
self.driver._run(sql_replace, tenant, name, doc) ''', tenant, name, self.driver.pack(metadata))
return previous_record is None return previous_record is None
def delete(self, name, tenant): def delete(self, name, tenant):
self.driver._run('''delete from Queues where self.driver.run('''
tenant = ? and name = ?''', delete from Queues
tenant, name) where tenant = ? and name = ?''', tenant, name)
def stats(self, name, tenant): def stats(self, name, tenant):
sql = '''select count(id) qid, messages = self.driver.get('''
from Messages where select Q.id, count(M.id)
qid = (select id from Queues where from Queues as Q join Messages as M
tenant = ? and name = ?)''' on qid = Q.id
where tenant = ? and name = ?''', tenant, name)
if qid is None:
_queue_doesnotexist(name, tenant)
return { return {
'messages': self.driver._get(sql, tenant, name)[0], 'messages': messages,
'actions': 0, 'actions': 0,
} }
@ -90,55 +92,181 @@ class Queue(base.QueueBase):
class Message(base.MessageBase): class Message(base.MessageBase):
def __init__(self, driver): def __init__(self, driver):
self.driver = driver self.driver = driver
self.driver._run(''' self.driver.run('''
create table if not exists Messages ( create table
if not exists
Messages (
id INTEGER, id INTEGER,
qid INTEGER, qid INTEGER,
ttl INTEGER, ttl INTEGER,
content TEXT, content DOCUMENT,
created DATETIME, client TEXT,
created DATETIME, -- seconds since the Julian day
PRIMARY KEY(id), PRIMARY KEY(id),
FOREIGN KEY(qid) references Queues(id) on delete cascade FOREIGN KEY(qid) references Queues(id) on delete cascade
) )
''') ''')
def get(self, queue, tenant=None, message_id=None, def get(self, queue, message_id, tenant):
marker=None, echo=False, client_uuid=None): try:
pass content, ttl, age = self.driver.get('''
select content, ttl, julianday() * 86400.0 - created
from Queues as Q join Messages as M
on qid = Q.id
where ttl > julianday() * 86400.0 - created
and M.id = ? and tenant = ? and name = ?
''', _msgid_decode(message_id), tenant, queue)
def post(self, queue, messages, tenant): return {
with self.driver: 'id': message_id,
'ttl': ttl,
'age': int(age),
'body': content,
}
except (_NoResult, _BadID):
_msg_doesnotexist(message_id)
def list(self, queue, tenant, marker=None,
limit=10, echo=False, client_uuid=None):
with self.driver('deferred'):
try: try:
qid, = self.driver._get('''select id from Queues where sql = '''
tenant = ? and name = ?''', select id, content, ttl, julianday() * 86400.0 - created
tenant, queue) from Messages
except TypeError: where ttl > julianday() * 86400.0 - created
msg = (_("Queue %(name)s does not exist for tenant %(tenant)s") and qid = ?'''
% dict(name=queue, tenant=tenant)) args = [_get_qid(self.driver, queue, tenant)]
raise exceptions.DoesNotExist(msg) if not echo:
sql += '''
and client != ?'''
args += [client_uuid]
if marker:
sql += '''
and id > ?'''
args += [_marker_decode(marker)]
sql += '''
limit ?'''
args += [limit]
iter = self.driver.run(sql, *args)
for id, content, ttl, age in iter:
yield {
'id': _msgid_encode(id),
'ttl': ttl,
'age': int(age),
'marker': _marker_encode(id),
'body': content,
}
except _BadID:
return
def post(self, queue, messages, tenant, client_uuid):
with self.driver('immediate'):
qid = _get_qid(self.driver, queue, tenant)
# cleanup all expired messages in this queue
self.driver.run('''
delete from Messages
where ttl <= julianday() * 86400.0 - created
and qid = ?''', qid)
# executemany() sets lastrowid to None, so no matter we manually # executemany() sets lastrowid to None, so no matter we manually
# generate the IDs or not, we still need to query for it. # generate the IDs or not, we still need to query for it.
try:
unused, = self.driver._get('''select id + 1 from Messages unused = self.driver.get('''
where id = (select max(id) select max(id) + 1 from Messages''')[0] or 1001
from Messages)''')
except TypeError:
unused, = 1001,
def it(newid): def it(newid):
for m in messages: for m in messages:
yield (newid, qid, m['ttl'], yield (newid, qid, m['ttl'],
json.dumps(m, ensure_ascii=False)) self.driver.pack(m['body']), client_uuid)
newid += 1 newid += 1
self.driver._run_multiple('''insert into Messages values self.driver.run_multiple('''
(?, ?, ?, ?, datetime())''', insert into Messages
it(unused)) values (?, ?, ?, ?, ?, julianday() * 86400.0)''', it(unused))
return [str(x) for x in range(unused, unused + len(messages))] return map(_msgid_encode, range(unused, unused + len(messages)))
def delete(self, queue, message_id, tenant=None, claim=None): def delete(self, queue, message_id, tenant, claim=None):
pass try:
self.driver.run('''
delete from Messages
where id = ?
and qid = (select id from Queues
where tenant = ? and name = ?)
''', _msgid_decode(message_id), tenant, queue)
except _BadID:
pass
class _NoResult(Exception):
pass
class _BadID(Exception):
pass
def _queue_doesnotexist(name, tenant):
msg = (_("Queue %(name)s does not exist for tenant %(tenant)s")
% dict(name=name, tenant=tenant))
raise exceptions.DoesNotExist(msg)
def _msg_doesnotexist(id):
msg = (_("Message %(id)s does not exist")
% dict(id=id))
raise exceptions.DoesNotExist(msg)
def _get_qid(driver, queue, tenant):
try:
return driver.get('''
select id from Queues
where tenant = ? and name = ?''', tenant, queue)[0]
except _NoResult:
_queue_doesnotexist(queue, tenant)
# The utilities below make the database IDs opaque to the users
# of Marconi API. The only purpose is to advise the users NOT to
# make assumptions on the implementation of and/or relationship
# between the message IDs, the markers, and claim IDs.
#
# The magic numbers are arbitrarily picked; the numbers themselves
# come with no special functionalities.
def _msgid_encode(id):
return hex(id ^ 0x5c693a53)[2:]
def _msgid_decode(id):
try:
return int(id, 16) ^ 0x5c693a53
except ValueError:
raise _BadID
def _marker_encode(id):
return oct(id ^ 0x3c96a355)[1:]
def _marker_decode(id):
try:
return int(id, 8) ^ 0x3c96a355
except ValueError:
raise _BadID

View File

@ -14,8 +14,11 @@
# limitations under the License. # limitations under the License.
import contextlib
import sqlite3 import sqlite3
import msgpack
from marconi.common import config from marconi.common import config
from marconi import storage from marconi import storage
from marconi.storage.sqlite import controllers from marconi.storage.sqlite import controllers
@ -26,26 +29,70 @@ cfg = config.namespace('drivers:storage:sqlite').from_options(
class Driver(storage.DriverBase): class Driver(storage.DriverBase):
def __init__(self): def __init__(self):
self.__path = cfg.database self.__path = cfg.database
self.__conn = sqlite3.connect(self.__path) self.__conn = sqlite3.connect(self.__path,
detect_types=sqlite3.PARSE_DECLTYPES)
self.__db = self.__conn.cursor() self.__db = self.__conn.cursor()
self._run('''PRAGMA foreign_keys = ON''') self.run('''PRAGMA foreign_keys = ON''')
def _run(self, sql, *args): @staticmethod
def pack(o):
"""
Convert a Python variable to a SQlite variable
with the customized type `DOCUMENT`.
:param o: a Python str, unicode, int, long, float, bool, None
or a dict or list of %o
"""
return buffer(msgpack.dumps(o))
sqlite3.register_converter('DOCUMENT', lambda s:
msgpack.loads(s, encoding='utf-8'))
def run(self, sql, *args):
"""
Perform a SQL query.
:param sql: a query string with the '?' placeholders
:param args: the arguments to substitute the placeholders
"""
return self.__db.execute(sql, args) return self.__db.execute(sql, args)
def _run_multiple(self, sql, it): def run_multiple(self, sql, it):
"""
Iteratively perform multiple SQL queries.
:param sql: a query string with the '?' placeholders
:param it: an iterator which yields a sequence of arguments to
substitute the placeholders
"""
self.__db.executemany(sql, it) self.__db.executemany(sql, it)
def _get(self, sql, *args): def get(self, sql, *args):
return self._run(sql, *args).fetchone() """
Get one entry from the query result.
def __enter__(self): :param sql: a query string with the '?' placeholders
self._run('begin immediate') :param args: the arguments to substitute the placeholders
:raises: _NoResult if the result set is empty
"""
try:
return self.run(sql, *args).next()
def __exit__(self, exc_type, exc_value, traceback): except StopIteration:
self.__conn.commit() raise controllers._NoResult
@contextlib.contextmanager
def __call__(self, isolation):
self.run('begin ' + isolation)
try:
yield
self.__conn.commit()
except Exception:
self.__conn.rollback()
raise
@property @property
def queue_controller(self): def queue_controller(self):

View File

@ -23,13 +23,16 @@ from marconi.tests import util as testing
#TODO(zyuan): let tests/storage/base.py handle these #TODO(zyuan): let tests/storage/base.py handle these
class TestSqlite(testing.TestBase): class TestSqlite(testing.TestBase):
def test_some_messages(self): def setUp(self):
storage = sqlite.Driver() super(TestSqlite, self).setUp()
q = storage.queue_controller
q.upsert('fizbit', {'_message_ttl': 40}, '480924')
m = storage.message_controller
d = [ storage = sqlite.Driver()
self.queue_ctrl = storage.queue_controller
self.queue_ctrl.upsert('fizbit', {'_message_ttl': 40}, '480924')
self.msg_ctrl = storage.message_controller
def test_some_messages(self):
doc = [
{ {
'body': { 'body': {
'event': 'BackupStarted', 'event': 'BackupStarted',
@ -37,21 +40,86 @@ class TestSqlite(testing.TestBase):
}, },
'ttl': 30, 'ttl': 30,
}, },
{
'body': {
'event': 'BackupProgress',
'currentBytes': '0',
'totalBytes': '99614720',
},
'ttl': 10
},
] ]
n = q.stats('fizbit', '480924')['messages'] for _ in range(10):
l1 = m.post('fizbit', d, '480924') self.msg_ctrl.post('fizbit', doc, '480924',
l2 = m.post('fizbit', d, '480924') client_uuid='30387f00')
self.assertEquals([int(v) + 2 for v in l1], map(int, l2)) msgid = self.msg_ctrl.post('fizbit', doc, '480924',
self.assertEquals(q.stats('fizbit', '480924')['messages'] - n, 4) client_uuid='79ed56f8')[0]
q.delete('fizbit', '480924')
self.assertEquals(
self.queue_ctrl.stats('fizbit', '480924')['messages'], 11)
msgs = list(self.msg_ctrl.list('fizbit', '480924',
client_uuid='30387f00'))
self.assertEquals(len(msgs), 1)
#TODO(zyuan): move this to tests/storage/test_impl_sqlite.py
msgs = list(self.msg_ctrl.list('fizbit', '480924',
marker='illformed'))
self.assertEquals(len(msgs), 0)
cnt = 0
marker = None
while True:
nomsg = True
for msg in self.msg_ctrl.list('fizbit', '480924',
limit=3, marker=marker,
client_uuid='79ed56f8'):
nomsg = False
if nomsg:
break
marker = msg['marker']
cnt += 1
self.assertEquals(cnt, 4)
self.assertIn(
'body', self.msg_ctrl.get('fizbit', msgid, '480924'))
self.msg_ctrl.delete('fizbit', msgid, '480924')
with testtools.ExpectedException(exceptions.DoesNotExist): with testtools.ExpectedException(exceptions.DoesNotExist):
m.post('fizbit', d, '480924') self.msg_ctrl.get('fizbit', msgid, '480924')
def test_expired_messages(self):
doc = [
{'body': {}, 'ttl': 0},
]
msgid = self.msg_ctrl.post('fizbit', doc, '480924',
client_uuid='unused')[0]
with testtools.ExpectedException(exceptions.DoesNotExist):
self.msg_ctrl.get('fizbit', msgid, '480924')
def test_nonexsitent(self):
with testtools.ExpectedException(exceptions.DoesNotExist):
self.msg_ctrl.post('nonexistent', [], '480924',
client_uuid='30387f00')
with testtools.ExpectedException(exceptions.DoesNotExist):
for _ in self.msg_ctrl.list('nonexistent', '480924'):
pass
with testtools.ExpectedException(exceptions.DoesNotExist):
self.queue_ctrl.stats('nonexistent', '480924')
#TODO(zyuan): move this to tests/storage/test_impl_sqlite.py
def test_illformed_id(self):
# SQlite-specific tests. Since all IDs exposed in APIs are opaque,
# any ill-formed IDs should be regarded as non-existing ones.
with testtools.ExpectedException(exceptions.DoesNotExist):
self.msg_ctrl.get('nonexistent', 'illformed', '480924')
self.msg_ctrl.delete('nonexistent', 'illformed', '480924')
def tearDown(self):
self.queue_ctrl.delete('fizbit', '480924')
super(TestSqlite, self).tearDown()

View File

@ -1,5 +1,6 @@
cliff cliff
falcon falcon
msgpack-python
oslo.config>=1.1.0 oslo.config>=1.1.0
PasteDeploy PasteDeploy
pymongo pymongo