From 53e887fbdf5794f4c43336478c04aee7dec8a62c Mon Sep 17 00:00:00 2001 From: gecong1973 Date: Thu, 29 Jun 2017 23:28:28 -0400 Subject: [PATCH] Support dead letter queue for swift Implement blueprint dead-letter-queue Change-Id: I1ee88a8963e2bc80172710da5ab60313952495e4 --- zaqar/storage/swift/claims.py | 70 +++++++++++++++++++++++++++------ zaqar/storage/swift/messages.py | 9 +++-- zaqar/storage/swift/utils.py | 4 +- 3 files changed, 67 insertions(+), 16 deletions(-) diff --git a/zaqar/storage/swift/claims.py b/zaqar/storage/swift/claims.py index cef13f195..54945d066 100644 --- a/zaqar/storage/swift/claims.py +++ b/zaqar/storage/swift/claims.py @@ -93,28 +93,59 @@ class ClaimController(storage.Claim): def create(self, queue, metadata, project=None, limit=storage.DEFAULT_MESSAGES_PER_CLAIM): message_ctrl = self.driver.message_controller + queue_ctrl = self.driver.queue_controller + queue_meta = queue_ctrl.get(queue, project=project) ttl = metadata['ttl'] grace = metadata['grace'] msg_ts = ttl + grace claim_id = uuidutils.generate_uuid() + dlq = True if ('_max_claim_count' in queue_meta and + '_dead_letter_queue' in queue_meta) else False + messages, marker = message_ctrl._list(queue, project, limit=limit, include_claimed=False) claimed = [] for msg in messages: + claim_count = msg.get('claim_count', 0) md5 = hashlib.md5() md5.update( jsonutils.dumps( {'body': msg['body'], 'claim_id': None, - 'ttl': msg['ttl']})) + 'ttl': msg['ttl'], + 'claim_count': claim_count})) md5 = md5.hexdigest() msg_ttl = max(msg['ttl'], msg_ts) + move_to_dlq = False + if dlq: + if claim_count < queue_meta['_max_claim_count']: + # Check if the message's claim count has exceeded the + # max claim count defined in the queue, if not , + # Save the new max claim count for message + claim_count = claim_count + 1 + else: + # if the message's claim count has exceeded the + # max claim count defined in the queue, move the + # message to the dead letter queue. + # NOTE: We're moving message by changing the + # project info directly. That means, the queue and dead + # letter queue must be created on the same pool. + dlq_ttl = queue_meta.get("_dead_letter_queue_messages_ttl") + move_to_dlq = True + if dlq_ttl: + msg_ttl = dlq_ttl + content = jsonutils.dumps( - {'body': msg['body'], 'claim_id': claim_id, 'ttl': msg_ttl}) - try: - self._client.put_object( - utils._message_container(queue, project), + {'body': msg['body'], 'claim_id': claim_id, + 'ttl': msg_ttl, + 'claim_count': claim_count}) + + if move_to_dlq: + dead_letter_queue = queue_meta.get("_dead_letter_queue") + utils._put_or_create_container( + self._client, + utils._message_container(dead_letter_queue, project), msg['id'], content, content_type='application/json', @@ -122,14 +153,29 @@ class ClaimController(storage.Claim): 'if-match': md5, 'x-object-meta-claimid': claim_id, 'x-delete-after': msg_ttl}) - except swiftclient.ClientException as exc: - if exc.http_status == 412: - continue - raise + + message_ctrl._delete(queue, msg['id'], project) + else: - msg['claim_id'] = claim_id - msg['ttl'] = msg_ttl - claimed.append(msg) + try: + self._client.put_object( + utils._message_container(queue, project), + msg['id'], + content, + content_type='application/json', + headers={'x-object-meta-clientid': msg['client_uuid'], + 'if-match': md5, + 'x-object-meta-claimid': claim_id, + 'x-delete-after': msg_ttl}) + except swiftclient.ClientException as exc: + if exc.http_status == 412: + continue + raise + else: + msg['claim_id'] = claim_id + msg['ttl'] = msg_ttl + msg['claim_count'] = claim_count + claimed.append(msg) utils._put_or_create_container( self._client, diff --git a/zaqar/storage/swift/messages.py b/zaqar/storage/swift/messages.py index dffa09c2c..f6aba9004 100644 --- a/zaqar/storage/swift/messages.py +++ b/zaqar/storage/swift/messages.py @@ -121,9 +121,11 @@ class MessageController(storage.Message): raise def is_claimed(msg, headers): - if include_claimed: + if include_claimed or msg['claim_id'] is None: return False - return msg['claim_id'] is not None + claim_obj = self.driver.claim_controller._get( + queue, msg['claim_id'], project) + return claim_obj is not None and claim_obj['ttl'] > 0 def is_echo(msg, headers): if echo: @@ -210,7 +212,8 @@ class MessageController(storage.Message): def _create_msg(self, queue, msg, client_uuid, project): slug = str(uuid.uuid1()) contents = jsonutils.dumps( - {'body': msg.get('body', {}), 'claim_id': None, 'ttl': msg['ttl']}) + {'body': msg.get('body', {}), 'claim_id': None, + 'ttl': msg['ttl'], 'claim_count': 0}) try: self._client.put_object( utils._message_container(queue, project), diff --git a/zaqar/storage/swift/utils.py b/zaqar/storage/swift/utils.py index ea5032235..c48f004b4 100644 --- a/zaqar/storage/swift/utils.py +++ b/zaqar/storage/swift/utils.py @@ -56,7 +56,8 @@ def _message_to_json(message_id, msg, headers, now): 'age': now - float(headers['x-timestamp']), 'ttl': msg['ttl'], 'body': msg['body'], - 'claim_id': msg['claim_id'] + 'claim_id': msg['claim_id'], + 'claim_count': msg.get('claim_count', 0) } @@ -105,6 +106,7 @@ def _filter_messages(messages, filters, marker, get_object, list_objects, 'body': obj['body'], 'age': now - float(headers['x-timestamp']), 'claim_id': obj['claim_id'], + 'claim_count': obj.get('claim_count', 0), } if limit <= 0: break