From 460c34529868d2dc330afc12b4662702bf1f982a Mon Sep 17 00:00:00 2001
From: Fei Long Wang <flwang@catalyst.net.nz>
Date: Tue, 22 Nov 2016 14:12:18 +1300
Subject: [PATCH] Support purge queue -- wsgi

A new endpoint /v2/queues/myqueue/purge is added to support purge
a queue, which accepts a POST body like:
{"resource_types": ["messages", "subscriptions"]} to allow user
purge particular resource of the queue. Test cases are added as
well.

APIImpact
DocImpact

Partially Implements: blueprint purge-queue

Change-Id: Ie82713fce7cb0db6612693cee81be8c3170d292a
---
 etc/policy.json.sample                        |   1 +
 .../notes/purge-queue-6788a249ee59d55a.yaml   |   4 +
 zaqar/tests/etc/policy.json                   |   1 +
 .../unit/transport/wsgi/v2_0/test_purge.py    | 119 ++++++++++++++++++
 .../transport/wsgi/v2_0/test_validation.py    |  22 ++++
 zaqar/transport/validation.py                 |  17 +++
 zaqar/transport/wsgi/v2_0/__init__.py         |   4 +-
 zaqar/transport/wsgi/v2_0/homedoc.py          |  13 ++
 zaqar/transport/wsgi/v2_0/purge.py            |  81 ++++++++++++
 9 files changed, 261 insertions(+), 1 deletion(-)
 create mode 100644 releasenotes/notes/purge-queue-6788a249ee59d55a.yaml
 create mode 100644 zaqar/tests/unit/transport/wsgi/v2_0/test_purge.py
 create mode 100644 zaqar/transport/wsgi/v2_0/purge.py

diff --git a/etc/policy.json.sample b/etc/policy.json.sample
index df6c63832..83a6bd5d4 100644
--- a/etc/policy.json.sample
+++ b/etc/policy.json.sample
@@ -10,6 +10,7 @@
     "queues:update": "",
     "queues:stats": "",
     "queues:share": "",
+    "queues:purge": "",
 
     "messages:get_all": "",
     "messages:create": "",
diff --git a/releasenotes/notes/purge-queue-6788a249ee59d55a.yaml b/releasenotes/notes/purge-queue-6788a249ee59d55a.yaml
new file mode 100644
index 000000000..f98d89ca2
--- /dev/null
+++ b/releasenotes/notes/purge-queue-6788a249ee59d55a.yaml
@@ -0,0 +1,4 @@
+features:
+  - A new queue action is added so that users can purge a queue
+    quickly. That means all the messages and subscriptions will be deleted
+    automatically but the metadata of the queue will be kept.
diff --git a/zaqar/tests/etc/policy.json b/zaqar/tests/etc/policy.json
index df6c63832..83a6bd5d4 100644
--- a/zaqar/tests/etc/policy.json
+++ b/zaqar/tests/etc/policy.json
@@ -10,6 +10,7 @@
     "queues:update": "",
     "queues:stats": "",
     "queues:share": "",
+    "queues:purge": "",
 
     "messages:get_all": "",
     "messages:create": "",
diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_purge.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_purge.py
new file mode 100644
index 000000000..42a0b06c2
--- /dev/null
+++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_purge.py
@@ -0,0 +1,119 @@
+# Copyright 2016 Catalyst IT Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import falcon
+import uuid
+
+from oslo_serialization import jsonutils
+
+from zaqar.tests.unit.transport.wsgi import base
+
+
+class TestPurge(base.V2Base):
+
+    config_file = 'wsgi_mongodb.conf'
+
+    def setUp(self):
+        super(TestPurge, self).setUp()
+
+        self.headers = {
+            'Client-ID': str(uuid.uuid4())
+        }
+        self.queue_path = self.url_prefix + '/queues/myqueue'
+        self.messages_path = self.queue_path + '/messages'
+        self.subscription_path = (self.queue_path + '/subscriptions')
+
+        self.messages = {'messages': [{'body': 'A', 'ttl': 300},
+                                      {'body': 'B', 'ttl': 400},
+                                      {'body': 'C', 'ttl': 500}]}
+        self.subscriptions = {"subscriber": "http://ping.me", "ttl": 3600,
+                              "options": {"key": "value"}}
+
+    def tearDown(self):
+        self.simulate_delete(self.queue_path, headers=self.headers)
+        super(TestPurge, self).tearDown()
+
+    def _get_msg_id(self, headers):
+        return self._get_msg_ids(headers)[0]
+
+    def _get_msg_ids(self, headers):
+        return headers['location'].rsplit('=', 1)[-1].split(',')
+
+    def test_purge_particular_resource(self):
+        # Post messages
+        messages_body = jsonutils.dumps(self.messages)
+        self.simulate_post(self.messages_path, body=messages_body,
+                           headers=self.headers)
+
+        msg_ids = self._get_msg_ids(self.srmock.headers_dict)
+        for msg_id in msg_ids:
+            target = self.messages_path + '/' + msg_id
+            self.simulate_get(target, headers=self.headers)
+            self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # Post subscriptions
+        sub_resp = self.simulate_post(self.subscription_path,
+                                      body=jsonutils.dumps(self.subscriptions),
+                                      headers=self.headers)
+
+        # Purge queue
+        purge_body = jsonutils.dumps({'resource_types': ['messages']})
+        self.simulate_post(self.queue_path+"/purge", body=purge_body)
+
+        for msg_id in msg_ids:
+            target = self.messages_path + '/' + msg_id
+            self.simulate_get(target, headers=self.headers)
+            self.assertEqual(falcon.HTTP_404, self.srmock.status)
+
+        # Check subscriptions are still there
+        resp_list = self.simulate_get(self.subscription_path,
+                                      headers=self.headers)
+        resp_list_doc = jsonutils.loads(resp_list[0])
+        sid = resp_list_doc['subscriptions'][0]['id']
+        sub_resp_doc = jsonutils.loads(sub_resp[0])
+        self.assertEqual(sub_resp_doc['subscription_id'], sid)
+
+    def test_purge_by_default(self):
+        # Post messages
+        messages_body = jsonutils.dumps(self.messages)
+        self.simulate_post(self.messages_path, body=messages_body,
+                           headers=self.headers)
+
+        msg_ids = self._get_msg_ids(self.srmock.headers_dict)
+        for msg_id in msg_ids:
+            target = self.messages_path + '/' + msg_id
+            self.simulate_get(target, headers=self.headers)
+            self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+        # Post subscriptions
+        sub_resp = self.simulate_post(self.subscription_path,
+                                      body=jsonutils.dumps(self.subscriptions),
+                                      headers=self.headers)
+
+        # Purge queue
+        purge_body = jsonutils.dumps({'resource_types': ['messages',
+                                                         'subscriptions']})
+        self.simulate_post(self.queue_path+"/purge", body=purge_body)
+
+        for msg_id in msg_ids:
+            target = self.messages_path + '/' + msg_id
+            self.simulate_get(target, headers=self.headers)
+            self.assertEqual(falcon.HTTP_404, self.srmock.status)
+
+        # Check subscriptions are still there
+        sub_id = jsonutils.loads(sub_resp[0])['subscription_id']
+        self.simulate_get(self.subscription_path + "/" + sub_id,
+                          headers=self.headers)
+        self.assertEqual(falcon.HTTP_404, self.srmock.status)
diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py
index 9aa04a54a..5d5df6a47 100644
--- a/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py
+++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_validation.py
@@ -179,3 +179,25 @@ class TestValidation(base.V2Base):
                             body='[{"op":"add","path":"/metadata/a",'
                             '"value":2}]')
         self.assertEqual(falcon.HTTP_200, self.srmock.status)
+
+    def test_queue_purge(self):
+        # Wrong key
+        queue_1 = self.url_prefix + '/queues/queue1/purge'
+        self.simulate_post(queue_1,
+                           self.project_id,
+                           body='{"wrong_key": ["messages"]}')
+        self.addCleanup(self.simulate_delete, queue_1, self.project_id,
+                        headers=self.headers)
+        self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+        # Wrong value
+        self.simulate_post(queue_1,
+                           self.project_id,
+                           body='{"resource_types": ["wrong_value"]}')
+        self.assertEqual(falcon.HTTP_400, self.srmock.status)
+
+        # Correct input
+        self.simulate_post(queue_1,
+                           self.project_id,
+                           body='{"resource_types": ["messages"]}')
+        self.assertEqual(falcon.HTTP_204, self.srmock.status)
diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py
index 5ef1f190d..db7d01697 100644
--- a/zaqar/transport/validation.py
+++ b/zaqar/transport/validation.py
@@ -27,6 +27,7 @@ MIN_MESSAGE_TTL = 60
 MIN_CLAIM_TTL = 60
 MIN_CLAIM_GRACE = 60
 MIN_SUBSCRIPTION_TTL = 60
+_PURGBLE_RESOURCE_TYPES = {'messages', 'subscriptions'}
 
 _TRANSPORT_LIMITS_OPTIONS = (
     cfg.IntOpt('max_queues_per_page', default=20,
@@ -320,6 +321,22 @@ class Validator(object):
                       ' and must be at least greater than 0.'),
                     self._limits_conf.max_messages_post_size)
 
+    def queue_purging(self, document):
+        """Restrictions the resource types to be purged for a queue.
+
+        :param resource_types: Type list of all resource under a queue
+        :raises: ValidationFailed if the resource types are invalid
+        """
+
+        if 'resource_types' not in document:
+            msg = _(u'Post body must contain key "resource_types".')
+            raise ValidationFailed(msg)
+
+        if (not set(document['resource_types']).issubset(
+                _PURGBLE_RESOURCE_TYPES)):
+            msg = _(u'Resource types must be a sub set of {0}.')
+            raise ValidationFailed(msg, _PURGBLE_RESOURCE_TYPES)
+
     def message_posting(self, messages):
         """Restrictions on a list of messages.
 
diff --git a/zaqar/transport/wsgi/v2_0/__init__.py b/zaqar/transport/wsgi/v2_0/__init__.py
index 46078bd6a..c621ff453 100644
--- a/zaqar/transport/wsgi/v2_0/__init__.py
+++ b/zaqar/transport/wsgi/v2_0/__init__.py
@@ -20,6 +20,7 @@ from zaqar.transport.wsgi.v2_0 import homedoc
 from zaqar.transport.wsgi.v2_0 import messages
 from zaqar.transport.wsgi.v2_0 import ping
 from zaqar.transport.wsgi.v2_0 import pools
+from zaqar.transport.wsgi.v2_0 import purge
 from zaqar.transport.wsgi.v2_0 import queues
 from zaqar.transport.wsgi.v2_0 import stats
 from zaqar.transport.wsgi.v2_0 import subscriptions
@@ -69,7 +70,8 @@ def public_endpoints(driver, conf):
                              message_controller)),
         ('/queues/{queue_name}/stats',
          stats.Resource(queue_controller)),
-
+        ('/queues/{queue_name}/purge',
+         purge.Resource(driver)),
         # Messages Endpoints
         ('/queues/{queue_name}/messages',
          messages.CollectionResource(driver._wsgi_conf,
diff --git a/zaqar/transport/wsgi/v2_0/homedoc.py b/zaqar/transport/wsgi/v2_0/homedoc.py
index 2c2ad578b..f019a2fa4 100644
--- a/zaqar/transport/wsgi/v2_0/homedoc.py
+++ b/zaqar/transport/wsgi/v2_0/homedoc.py
@@ -72,6 +72,19 @@ JSON_HOME = {
                 'accept-post': ['application/json'],
             },
         },
+        'rel/queue_purge': {
+            'href-template': '/v2/queues/{queue_name}/purge',
+            'href-vars': {
+                'queue_name': 'param/queue_name',
+            },
+            'hints': {
+                'allow': ['POST'],
+                'formats': {
+                    'application/json': {},
+                },
+                'accept-post': ['application/json'],
+            },
+        },
 
         # -----------------------------------------------------------------
         # Messages
diff --git a/zaqar/transport/wsgi/v2_0/purge.py b/zaqar/transport/wsgi/v2_0/purge.py
new file mode 100644
index 000000000..ce5876d42
--- /dev/null
+++ b/zaqar/transport/wsgi/v2_0/purge.py
@@ -0,0 +1,81 @@
+# Copyright 2016 Catalyst IT Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License.  You may obtain a copy
+# of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+import falcon
+
+from oslo_log import log as logging
+import six
+
+from zaqar.common import decorators
+from zaqar.transport import acl
+from zaqar.transport import validation
+from zaqar.transport.wsgi import errors as wsgi_errors
+from zaqar.transport.wsgi import utils as wsgi_utils
+
+LOG = logging.getLogger(__name__)
+
+
+class Resource(object):
+
+    __slots__ = ('_driver', '_conf', '_queue_ctrl',
+                 '_message_ctrl', '_subscription_ctrl', '_validate')
+
+    def __init__(self, driver):
+        self._driver = driver
+        self._conf = driver._conf
+        self._queue_ctrl = driver._storage.queue_controller
+        self._message_ctrl = driver._storage.message_controller
+        self._subscription_ctrl = driver._storage.subscription_controller
+        self._validate = driver._validate
+
+    @decorators.TransportLog("Queue item")
+    @acl.enforce("queues:purge")
+    def on_post(self, req, resp, project_id, queue_name):
+        try:
+            if req.content_length:
+                document = wsgi_utils.deserialize(req.stream,
+                                                  req.content_length)
+                self._validate.queue_purging(document)
+            else:
+                document = {'resource_types': ['messages', 'subscriptions']}
+        except ValueError as ex:
+            LOG.debug(ex)
+            raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
+        except validation.ValidationFailed as ex:
+            LOG.debug(ex)
+            raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
+
+        try:
+            if "messages" in document['resource_types']:
+                LOG.debug("Purge all messages under queue %s" % queue_name)
+                messages = self._message_ctrl.pop(queue_name, 10,
+                                                  project=project_id)
+                while messages:
+                    messages = self._message_ctrl.pop(queue_name, 10,
+                                                      project=project_id)
+
+            if "subscriptions" in document['resource_types']:
+                LOG.debug("Purge all subscriptions under queue %s" %
+                          queue_name)
+                results = self._subscription_ctrl.list(queue_name,
+                                                       project=project_id)
+                subscriptions = list(next(results))
+                for sub in subscriptions:
+                    self._subscription_ctrl.delete(queue_name,
+                                                   sub['id'],
+                                                   project=project_id)
+        except ValueError as err:
+            raise wsgi_errors.HTTPBadRequestAPI(str(err))
+
+        resp.status = falcon.HTTP_204