Support query filter in queue.
This patch will support to query queues filtered by name and metadata in mongodb backend. Other backends will support in following patchs. Co-Authored-By: gecong <ge.cong@zte.com.cn> Change-Id: I5fc6a5959e5d94942aebce9cedb22666e5577cb8 Partial-implements: blueprint queue-filter-support
This commit is contained in:
parent
ae7d88d62f
commit
b4c395c79a
@ -23,6 +23,10 @@ instead of 200, because there was no information to send back.
|
||||
This operation lists queues for the project. The queues are sorted
|
||||
alphabetically by name.
|
||||
|
||||
When queue listing , we can add filter in query string parameter
|
||||
to filter queue, like name and metadata. If metadata or name of queue is
|
||||
consistent with the filter,the queue will be listed to the user,
|
||||
otherwise the queue will be filtered.
|
||||
|
||||
Normal response codes: 200
|
||||
|
||||
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Support for queue filter when queue listing. With this feature, users can
|
||||
add filter of name or metadata in query string parameters in queue list
|
||||
to filter queues.
|
@ -309,8 +309,8 @@ class Queue(ControllerBase):
|
||||
numbers of queues.
|
||||
"""
|
||||
|
||||
def list(self, project=None, marker=None,
|
||||
limit=DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
def list(self, project=None, kfilter={}, marker=None,
|
||||
limit=DEFAULT_QUEUES_PER_PAGE, detailed=False, name=None):
|
||||
"""Base method for listing queues.
|
||||
|
||||
:param project: Project id
|
||||
@ -321,7 +321,7 @@ class Queue(ControllerBase):
|
||||
:returns: An iterator giving a sequence of queues
|
||||
and the marker of the next page.
|
||||
"""
|
||||
return self._list(project, marker, limit, detailed)
|
||||
return self._list(project, kfilter, marker, limit, detailed, name)
|
||||
|
||||
_list = abc.abstractmethod(lambda x: None)
|
||||
|
||||
|
@ -196,10 +196,11 @@ class QueueController(storage.Queue):
|
||||
except errors.QueueDoesNotExist:
|
||||
return {}
|
||||
|
||||
def _list(self, project=None, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
def _list(self, project=None, kfilter={}, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False,
|
||||
name=None):
|
||||
|
||||
query = utils.scoped_query(marker, project)
|
||||
query = utils.scoped_query(marker, project, name, kfilter)
|
||||
|
||||
projection = {'p_q': 1, '_id': 0}
|
||||
if detailed:
|
||||
|
@ -191,7 +191,7 @@ def parse_scoped_project_queue(scoped_name):
|
||||
return scoped_name.split('/')
|
||||
|
||||
|
||||
def scoped_query(queue, project):
|
||||
def scoped_query(queue, project, name=None, kfilter={}):
|
||||
"""Returns a dict usable for querying for scoped project/queues.
|
||||
|
||||
:param queue: name of queue to seek
|
||||
@ -207,14 +207,28 @@ def scoped_query(queue, project):
|
||||
|
||||
if not scoped_name.startswith('/'):
|
||||
# NOTE(kgriffs): scoped queue, e.g., 'project-id/queue-name'
|
||||
project_prefix = '^' + project + '/'
|
||||
if name:
|
||||
project_prefix = '^' + project + '/.*' + name + '.*'
|
||||
else:
|
||||
project_prefix = '^' + project + '/'
|
||||
query[key] = {'$regex': project_prefix, '$gt': scoped_name}
|
||||
elif scoped_name == '/':
|
||||
# NOTE(kgriffs): list global queues, but exclude scoped ones
|
||||
query[key] = {'$regex': '^/'}
|
||||
if name:
|
||||
query[key] = {'$regex': '^/.*' + name + '.*'}
|
||||
else:
|
||||
query[key] = {'$regex': '^/'}
|
||||
else:
|
||||
# NOTE(kgriffs): unscoped queue, e.g., '/my-global-queue'
|
||||
query[key] = {'$regex': '^/', '$gt': scoped_name}
|
||||
if name:
|
||||
query[key] = {'$regex': '^/.*' + name + '.*', '$gt': scoped_name}
|
||||
else:
|
||||
query[key] = {'$regex': '^/', '$gt': scoped_name}
|
||||
|
||||
# Handler the metadata filter in request.
|
||||
for key, value in kfilter.items():
|
||||
key = 'm.' + key
|
||||
query[key] = {'$eq': value}
|
||||
|
||||
return query
|
||||
|
||||
|
@ -172,15 +172,18 @@ class QueueController(storage.Queue):
|
||||
self._mgt_queue_ctrl = self._pool_catalog.control.queue_controller
|
||||
self._get_controller = self._pool_catalog.get_queue_controller
|
||||
|
||||
def _list(self, project=None, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
def _list(self, project=None, kfilter={}, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False,
|
||||
name=None):
|
||||
|
||||
def all_pages():
|
||||
yield next(self._mgt_queue_ctrl.list(
|
||||
project=project,
|
||||
kfilter=kfilter,
|
||||
marker=marker,
|
||||
limit=limit,
|
||||
detailed=detailed))
|
||||
detailed=detailed,
|
||||
name=name))
|
||||
|
||||
# make a heap compared with 'name'
|
||||
ls = heapq.merge(*[
|
||||
|
@ -83,8 +83,9 @@ class QueueController(storage.Queue):
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def _list(self, project=None, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
def _list(self, project=None, kfilter={}, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False,
|
||||
name=None):
|
||||
client = self._client
|
||||
qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project)
|
||||
marker = utils.scope_queue_name(marker, project)
|
||||
|
@ -23,8 +23,9 @@ from zaqar.storage.sqlalchemy import utils
|
||||
|
||||
class QueueController(storage.Queue):
|
||||
|
||||
def _list(self, project, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
|
||||
def _list(self, project, kfilter={}, marker=None,
|
||||
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False,
|
||||
name=None):
|
||||
|
||||
if project is None:
|
||||
project = ''
|
||||
|
@ -109,7 +109,8 @@ class PoolCatalogTest(testing.TestBase):
|
||||
flavor='fake')
|
||||
|
||||
def test_queues_list_on_multi_pools(self):
|
||||
def fake_list(project=None, marker=None, limit=10, detailed=False):
|
||||
def fake_list(project=None, kfilter={}, marker=None, limit=10,
|
||||
detailed=False, name=None):
|
||||
yield iter([{'name': 'fake_queue'}])
|
||||
|
||||
list_str = 'zaqar.storage.mongodb.queues.QueueController.list'
|
||||
|
@ -108,7 +108,8 @@ class PoolCatalogTest(testing.TestBase):
|
||||
flavor='fake')
|
||||
|
||||
def test_queues_list_on_multi_pools(self):
|
||||
def fake_list(project=None, marker=None, limit=10, detailed=False):
|
||||
def fake_list(project=None, kfilter={}, marker=None, limit=10,
|
||||
detailed=False, name=None):
|
||||
yield iter([{'name': 'fake_queue'}])
|
||||
|
||||
list_str = 'zaqar.storage.mongodb.queues.QueueController.list'
|
||||
|
@ -455,6 +455,13 @@ class TestQueueLifecycleMongoDB(base.V2Base):
|
||||
'_dead_letter_queue_messages_ttl': None,
|
||||
'_max_claim_count': None}, result_doc)
|
||||
|
||||
# queue filter
|
||||
result = self.simulate_get(self.queue_path, headers=header,
|
||||
query_string='node=34')
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(0, len(result_doc['queues']))
|
||||
|
||||
# List tail
|
||||
self.simulate_get(target, headers=header, query_string=params)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
@ -488,6 +495,50 @@ class TestQueueLifecycleMongoDB(base.V2Base):
|
||||
self.simulate_get(self.queue_path, headers=header)
|
||||
self.assertEqual(falcon.HTTP_503, self.srmock.status)
|
||||
|
||||
def test_list_with_filter(self):
|
||||
arbitrary_number = 644079696574693
|
||||
project_id = str(arbitrary_number)
|
||||
client_id = uuidutils.generate_uuid()
|
||||
header = {
|
||||
'X-Project-ID': project_id,
|
||||
'Client-ID': client_id
|
||||
}
|
||||
|
||||
# Create some
|
||||
def create_queue(name, project_id, body):
|
||||
altheader = {'Client-ID': client_id}
|
||||
if project_id is not None:
|
||||
altheader['X-Project-ID'] = project_id
|
||||
uri = self.queue_path + '/' + name
|
||||
self.simulate_put(uri, headers=altheader, body=body)
|
||||
|
||||
create_queue('q1', project_id, '{"test_metadata_key1": "value1"}')
|
||||
create_queue('q2', project_id, '{"_max_messages_post_size": 2000}')
|
||||
create_queue('q3', project_id, '{"test_metadata_key2": 30}')
|
||||
|
||||
# List (filter query)
|
||||
result = self.simulate_get(self.queue_path, headers=header,
|
||||
query_string='name=q&test_metadata_key2=30')
|
||||
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(1, len(result_doc['queues']))
|
||||
self.assertEqual('q3', result_doc['queues'][0]['name'])
|
||||
|
||||
# List (filter query)
|
||||
result = self.simulate_get(self.queue_path, headers=header,
|
||||
query_string='_max_messages_post_size=2000')
|
||||
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(1, len(result_doc['queues']))
|
||||
self.assertEqual('q2', result_doc['queues'][0]['name'])
|
||||
|
||||
# List (filter query)
|
||||
result = self.simulate_get(self.queue_path, headers=header,
|
||||
query_string='name=q')
|
||||
|
||||
result_doc = jsonutils.loads(result[0])
|
||||
self.assertEqual(3, len(result_doc['queues']))
|
||||
|
||||
|
||||
class TestQueueLifecycleFaultyDriver(base.V2BaseFaulty):
|
||||
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
import falcon
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
@ -252,20 +253,11 @@ class CollectionResource(object):
|
||||
self._queue_controller = queue_controller
|
||||
self._validate = validate
|
||||
|
||||
@decorators.TransportLog("Queues collection")
|
||||
@acl.enforce("queues:get_all")
|
||||
def on_get(self, req, resp, project_id):
|
||||
kwargs = {}
|
||||
|
||||
# NOTE(kgriffs): This syntax ensures that
|
||||
# we don't clobber default values with None.
|
||||
req.get_param('marker', store=kwargs)
|
||||
req.get_param_as_int('limit', store=kwargs)
|
||||
req.get_param_as_bool('detailed', store=kwargs)
|
||||
|
||||
def _queue_list(self, project_id, path, kfilter, **kwargs):
|
||||
try:
|
||||
self._validate.queue_listing(**kwargs)
|
||||
results = self._queue_controller.list(project=project_id, **kwargs)
|
||||
results = self._queue_controller.list(project=project_id,
|
||||
kfilter=kfilter, **kwargs)
|
||||
|
||||
# Buffer list of queues
|
||||
queues = list(next(results))
|
||||
@ -283,13 +275,29 @@ class CollectionResource(object):
|
||||
kwargs['marker'] = next(results) or kwargs.get('marker', '')
|
||||
reserved_metadata = _get_reserved_metadata(self._validate).items()
|
||||
for each_queue in queues:
|
||||
each_queue['href'] = req.path + '/' + each_queue['name']
|
||||
each_queue['href'] = path + '/' + each_queue['name']
|
||||
if kwargs.get('detailed'):
|
||||
for meta, value in reserved_metadata:
|
||||
if not each_queue.get('metadata', {}).get(meta):
|
||||
each_queue['metadata'][meta] = value
|
||||
|
||||
return queues, kwargs['marker']
|
||||
|
||||
def _on_get_with_kfilter(self, req, resp, project_id, kfilter={}):
|
||||
kwargs = {}
|
||||
|
||||
# NOTE(kgriffs): This syntax ensures that
|
||||
# we don't clobber default values with None.
|
||||
req.get_param('marker', store=kwargs)
|
||||
req.get_param_as_int('limit', store=kwargs)
|
||||
req.get_param_as_bool('detailed', store=kwargs)
|
||||
req.get_param('name', store=kwargs)
|
||||
|
||||
queues, marker = self._queue_list(project_id,
|
||||
req.path, kfilter, **kwargs)
|
||||
|
||||
links = []
|
||||
kwargs['marker'] = marker
|
||||
if queues:
|
||||
links = [
|
||||
{
|
||||
@ -305,3 +313,24 @@ class CollectionResource(object):
|
||||
|
||||
resp.body = utils.to_json(response_body)
|
||||
# status defaults to 200
|
||||
|
||||
@decorators.TransportLog("Queues collection")
|
||||
@acl.enforce("queues:get_all")
|
||||
def on_get(self, req, resp, project_id):
|
||||
field = ('marker', 'limit', 'detailed', 'name')
|
||||
kfilter = copy.deepcopy(req.params)
|
||||
|
||||
for key in req.params.keys():
|
||||
if key in field:
|
||||
kfilter.pop(key)
|
||||
|
||||
kfilter = kfilter if len(kfilter) > 0 else {}
|
||||
for key in kfilter.keys():
|
||||
# Since we get the filter value from URL, so need to
|
||||
# turn the string to integer if using integer filter value.
|
||||
try:
|
||||
kfilter[key] = int(kfilter[key])
|
||||
except ValueError:
|
||||
continue
|
||||
self._on_get_with_kfilter(req, resp, project_id, kfilter)
|
||||
# status defaults to 200
|
||||
|
Loading…
x
Reference in New Issue
Block a user