bug# 1198890

internal api tidy

Change-Id: I1b6d843eba5bc9f3a793f28392d0b675333cc17a
This commit is contained in:
Simon McCartney 2013-07-12 10:58:15 +01:00
parent b142a2ed6d
commit 496461b18a
24 changed files with 230 additions and 369 deletions

View File

@ -55,7 +55,7 @@ def create_domain():
def get_domains():
context = flask.request.environ.get('context')
domains = central_api.get_domains(context)
domains = central_api.find_domains(context)
return flask.jsonify(domains_schema.filter({'domains': domains}))

View File

@ -26,7 +26,7 @@ blueprint = flask.Blueprint('reports', __name__)
def reports_tenants():
context = flask.request.environ.get('context')
tenants = central_api.get_tenants(context)
tenants = central_api.find_tenants(context)
return flask.jsonify(tenants=tenants)

View File

@ -55,7 +55,7 @@ def create_record(domain_id):
def get_records(domain_id):
context = flask.request.environ.get('context')
records = central_api.get_records(context, domain_id)
records = central_api.find_records(context, domain_id)
return flask.jsonify(records_schema.filter({'records': records}))

View File

@ -54,7 +54,7 @@ def create_server():
def get_servers():
context = flask.request.environ.get('context')
servers = central_api.get_servers(context)
servers = central_api.find_servers(context)
return flask.jsonify(servers_schema.filter({'servers': servers}))

View File

@ -54,7 +54,7 @@ def create_tsigkey():
def get_tsigkeys():
context = flask.request.environ.get('context')
tsigkeys = central_api.get_tsigkeys(context)
tsigkeys = central_api.find_tsigkeys(context)
return flask.jsonify(tsigkeys_schema.filter({'tsigkeys': tsigkeys}))

View File

@ -41,7 +41,7 @@ class Bind9Backend(base.Backend):
super(Bind9Backend, self).start()
# TODO(kiall): This is a hack to ensure the data dir is 100% up to date
domains = self.central_service.get_domains(self.admin_context)
domains = self.central_service.find_domains(self.admin_context)
for domain in domains:
self._sync_domain(domain)
@ -77,7 +77,7 @@ class Bind9Backend(base.Backend):
# TODO(kiall): Rewrite this entire thing ASAP
LOG.debug('Synchronising domains')
domains = self.central_service.get_domains(self.admin_context)
domains = self.central_service.find_domains(self.admin_context)
output_folder = os.path.join(os.path.abspath(cfg.CONF.state_path),
'bind9')
@ -132,10 +132,10 @@ class Bind9Backend(base.Backend):
""" Sync a single domain's zone file """
LOG.debug('Synchronising Domain: %s' % domain['id'])
servers = self.central_service.get_servers(self.admin_context)
servers = self.central_service.find_servers(self.admin_context)
records = self.central_service.get_records(self.admin_context,
domain['id'])
records = self.central_service.find_records(self.admin_context,
domain['id'])
output_folder = os.path.join(os.path.abspath(cfg.CONF.state_path),
'bind9')

View File

@ -43,7 +43,7 @@ class DnsmasqBackend(base.Backend):
def _sync_domains_hack(self):
# TODO(Andrey): This is a hack to ensure the data dir is 100% up to
# date
domains = self.central_service.get_domains(self.admin_context)
domains = self.central_service.find_domains(self.admin_context)
for domain in domains:
self._sync_domain(domain)
@ -93,8 +93,8 @@ class DnsmasqBackend(base.Backend):
self._reload_dnsmasq()
def _write_zonefile(self, domain):
records = self.central_service.get_records(self.admin_context,
domain['id'])
records = self.central_service.find_records(self.admin_context,
domain['id'])
filename = os.path.join(self.output_folder, '%s.zone' % domain['id'])

View File

@ -245,7 +245,7 @@ class MySQLBind9Backend(base.Backend):
LOG.debug('create_domain()')
if cfg.CONF[self.name].write_database:
servers = self.central_service.get_servers(self.admin_context)
servers = self.central_service.find_servers(self.admin_context)
self._add_soa_record(domain, servers)
self._add_ns_records(domain, servers)
@ -256,7 +256,7 @@ class MySQLBind9Backend(base.Backend):
LOG.debug('update_domain()')
if cfg.CONF[self.name].write_database:
servers = self.central_service.get_servers(self.admin_context)
servers = self.central_service.find_servers(self.admin_context)
self._update_soa_record(domain, servers)
self._update_ns_records(domain, servers)
@ -298,7 +298,7 @@ class MySQLBind9Backend(base.Backend):
"""
LOG.debug('Synchronising domains')
domains = self.central_service.get_domains(self.admin_context)
domains = self.central_service.find_domains(self.admin_context)
output_folder = os.path.join(os.path.abspath(cfg.CONF.state_path),
'bind9')

View File

@ -116,7 +116,7 @@ class PowerDNSBackend(base.Backend):
# Domain Methods
def create_domain(self, context, domain):
servers = self.central_service.get_servers(self.admin_context)
servers = self.central_service.find_servers(self.admin_context)
domain_m = models.Domain()
domain_m.update({
@ -224,7 +224,7 @@ class PowerDNSBackend(base.Backend):
# Internal Methods
def _update_soa(self, domain):
servers = self.central_service.get_servers(self.admin_context)
servers = self.central_service.find_servers(self.admin_context)
domain_m = self._get_domain(domain['id'])
record_m = self._get_record(domain=domain_m, type='SOA')

View File

@ -30,17 +30,18 @@ class CentralAPI(rpc_proxy.RpcProxy):
1.1 - Add new finder methods
1.2 - Add get_tenant and get_tenants
1.3 - Add get_absolute_limits
2.0 - Renamed most get_resources to find_resources
"""
def __init__(self, topic=None):
topic = topic if topic else cfg.CONF.central_topic
super(CentralAPI, self).__init__(topic=topic, default_version='1.0')
super(CentralAPI, self).__init__(topic=topic, default_version='2.0')
# Misc Methods
def get_absolute_limits(self, context):
msg = self.make_msg('get_absolute_limits')
return self.call(context, msg, version='1.3')
return self.call(context, msg)
# Server Methods
def create_server(self, context, values):
@ -48,8 +49,8 @@ class CentralAPI(rpc_proxy.RpcProxy):
return self.call(context, msg)
def get_servers(self, context, criterion=None):
msg = self.make_msg('get_servers', criterion=criterion)
def find_servers(self, context, criterion=None):
msg = self.make_msg('find_servers', criterion=criterion)
return self.call(context, msg)
@ -75,8 +76,8 @@ class CentralAPI(rpc_proxy.RpcProxy):
return self.call(context, msg)
def get_tsigkeys(self, context, criterion=None):
msg = self.make_msg('get_tsigkeys', criterion=criterion)
def find_tsigkeys(self, context, criterion=None):
msg = self.make_msg('find_tsigkeys', criterion=criterion)
return self.call(context, msg)
@ -97,15 +98,15 @@ class CentralAPI(rpc_proxy.RpcProxy):
return self.call(context, msg)
# Tenant Methods
def get_tenants(self, context):
msg = self.make_msg('get_tenants')
def find_tenants(self, context):
msg = self.make_msg('find_tenants')
return self.call(context, msg, version='1.2')
return self.call(context, msg)
def get_tenant(self, context, tenant_id):
msg = self.make_msg('get_tenant', tenant_id=tenant_id)
return self.call(context, msg, version='1.2')
return self.call(context, msg)
def count_tenants(self, context):
msg = self.make_msg('count_tenants')
@ -118,11 +119,6 @@ class CentralAPI(rpc_proxy.RpcProxy):
return self.call(context, msg)
def get_domains(self, context, criterion=None):
msg = self.make_msg('get_domains', criterion=criterion)
return self.call(context, msg)
def get_domain(self, context, domain_id):
msg = self.make_msg('get_domain', domain_id=domain_id)
@ -133,15 +129,15 @@ class CentralAPI(rpc_proxy.RpcProxy):
return self.call(context, msg)
def find_domains(self, context, criterion):
def find_domains(self, context, criterion=None):
msg = self.make_msg('find_domains', criterion=criterion)
return self.call(context, msg, version='1.1')
return self.call(context, msg)
def find_domain(self, context, criterion):
msg = self.make_msg('find_domain', criterion=criterion)
return self.call(context, msg, version='1.1')
return self.call(context, msg)
def update_domain(self, context, domain_id, values, increment_serial=True):
msg = self.make_msg('update_domain',
@ -175,13 +171,6 @@ class CentralAPI(rpc_proxy.RpcProxy):
return self.call(context, msg)
def get_records(self, context, domain_id, criterion=None):
msg = self.make_msg('get_records',
domain_id=domain_id,
criterion=criterion)
return self.call(context, msg)
def get_record(self, context, domain_id, record_id):
msg = self.make_msg('get_record',
domain_id=domain_id,
@ -189,15 +178,17 @@ class CentralAPI(rpc_proxy.RpcProxy):
return self.call(context, msg)
def find_records(self, context, criterion):
msg = self.make_msg('find_records', criterion=criterion)
def find_records(self, context, domain_id, criterion=None):
msg = self.make_msg('find_records',
domain_id=domain_id,
criterion=criterion)
return self.call(context, msg, version='1.1')
return self.call(context, msg)
def find_record(self, context, criterion):
msg = self.make_msg('find_record', criterion=criterion)
return self.call(context, msg, version='1.1')
return self.call(context, msg)
def update_record(self, context, domain_id, record_id, values,
increment_serial=True):

View File

@ -43,7 +43,7 @@ def wrap_backend_call():
class Service(rpc_service.Service):
RPC_API_VERSION = '1.3'
RPC_API_VERSION = '2.0'
def __init__(self, *args, **kwargs):
backend_driver = cfg.CONF['service:central'].backend_driver
@ -148,8 +148,8 @@ class Service(rpc_service.Service):
if record_type != 'CNAME':
criterion['type'] = 'CNAME'
records = self.storage_api.get_records(context, domain['id'],
criterion=criterion)
records = self.storage_api.find_records(context, domain['id'],
criterion=criterion)
if ((len(records) == 1 and records[0]['id'] != record_id)
or len(records) > 1):
raise exceptions.InvalidRecordLocation('CNAME records may not '
@ -159,8 +159,8 @@ class Service(rpc_service.Service):
if record_type == 'CNAME':
# CNAME's may not have children. Ever.
criterion = {'name': '%%.%s' % record_name}
records = self.storage_api.get_records(context, domain['id'],
criterion=criterion)
records = self.storage_api.find_records(context, domain['id'],
criterion=criterion)
if len(records) > 0:
raise exceptions.InvalidRecordLocation('CNAME records may not '
@ -178,8 +178,8 @@ class Service(rpc_service.Service):
# Duplicate PTR's with the same name are not allowed
if record_type == 'PTR':
criterion = {'name': record_name, 'type': 'PTR'}
records = self.storage_api.get_records(context, domain['id'],
criterion=criterion)
records = self.storage_api.find_records(context, domain['id'],
criterion=criterion)
if ((len(records) == 1 and records[0]['id'] != record_id)
or len(records) > 1):
raise exceptions.DuplicateRecord()
@ -229,8 +229,8 @@ class Service(rpc_service.Service):
while (i <= j):
criterion['name'] = '.'.join(record_labels[i:])
records = self.storage_api.get_records(context, domain['id'],
criterion)
records = self.storage_api.find_records(context, domain['id'],
criterion)
if len(records) == 0:
i += 1
@ -268,10 +268,10 @@ class Service(rpc_service.Service):
return server
def get_servers(self, context, criterion=None):
policy.check('get_servers', context)
def find_servers(self, context, criterion=None):
policy.check('find_servers', context)
return self.storage_api.get_servers(context, criterion)
return self.storage_api.find_servers(context, criterion)
def get_server(self, context, server_id):
policy.check('get_server', context, {'server_id': server_id})
@ -311,10 +311,10 @@ class Service(rpc_service.Service):
return tsigkey
def get_tsigkeys(self, context, criterion=None):
policy.check('get_tsigkeys', context)
def find_tsigkeys(self, context, criterion=None):
policy.check('find_tsigkeys', context)
return self.storage_api.get_tsigkeys(context, criterion)
return self.storage_api.find_tsigkeys(context, criterion)
def get_tsigkey(self, context, tsigkey_id):
policy.check('get_tsigkey', context, {'tsigkey_id': tsigkey_id})
@ -343,9 +343,9 @@ class Service(rpc_service.Service):
utils.notify(context, 'central', 'tsigkey.delete', tsigkey)
# Tenant Methods
def get_tenants(self, context):
policy.check('get_tenants', context)
return self.storage_api.get_tenants(context)
def find_tenants(self, context):
policy.check('find_tenants', context)
return self.storage_api.find_tenants(context)
def get_tenant(self, context, tenant_id):
target = {
@ -398,7 +398,7 @@ class Service(rpc_service.Service):
# NOTE(kiall): Fetch the servers before creating the domain, this way
# we can prevent domain creation if no servers are
# configured.
servers = self.storage_api.get_servers(context)
servers = self.storage_api.find_servers(context)
if len(servers) == 0:
LOG.critical('No servers configured. Please create at least one '
@ -416,18 +416,6 @@ class Service(rpc_service.Service):
return domain
def get_domains(self, context, criterion=None):
target = {'tenant_id': context.tenant_id}
policy.check('get_domains', context, target)
if criterion is None:
criterion = {}
if not context.is_admin:
criterion['tenant_id'] = context.tenant_id
return self.storage_api.get_domains(context, criterion)
def get_domain(self, context, domain_id):
domain = self.storage_api.get_domain(context, domain_id)
@ -456,12 +444,15 @@ class Service(rpc_service.Service):
# TODO(kiall): Once we allow domains to be allocated on 1 of N server
# pools, return the filtered list here.
return self.storage_api.get_servers(context, criterion)
return self.storage_api.find_servers(context, criterion)
def find_domains(self, context, criterion):
def find_domains(self, context, criterion=None):
target = {'tenant_id': context.tenant_id}
policy.check('find_domains', context, target)
if criterion is None:
criterion = {}
if not context.is_admin:
criterion['tenant_id'] = context.tenant_id
@ -606,19 +597,6 @@ class Service(rpc_service.Service):
return record
def get_records(self, context, domain_id, criterion=None):
domain = self.storage_api.get_domain(context, domain_id)
target = {
'domain_id': domain_id,
'domain_name': domain['name'],
'tenant_id': domain['tenant_id']
}
policy.check('get_records', context, target)
return self.storage_api.get_records(context, domain_id, criterion)
def get_record(self, context, domain_id, record_id):
domain = self.storage_api.get_domain(context, domain_id)
record = self.storage_api.get_record(context, record_id)
@ -638,14 +616,21 @@ class Service(rpc_service.Service):
return record
def find_records(self, context, criterion):
target = {'tenant_id': context.tenant_id}
def find_records(self, context, domain_id, criterion=None):
domain = self.storage_api.get_domain(context, domain_id)
target = {
'domain_id': domain_id,
'domain_name': domain['name'],
'tenant_id': domain['tenant_id']
}
policy.check('find_records', context, target)
if not context.is_admin:
criterion['tenant_id'] = context.tenant_id
return self.storage_api.find_records(context, criterion)
return self.storage_api.find_records(context, domain_id, criterion)
def find_record(self, context, criterion):
target = {'tenant_id': context.tenant_id}
@ -739,12 +724,12 @@ class Service(rpc_service.Service):
def sync_domains(self, context):
policy.check('diagnostics_sync_domains', context)
domains = self.storage_api.get_domains(context)
domains = self.storage_api.find_domains(context)
results = {}
for domain in domains:
servers = self.storage_api.get_servers(context)
records = self.storage_api.get_records(context, domain['id'])
servers = self.storage_api.find_servers(context)
records = self.storage_api.find_records(context, domain['id'])
with wrap_backend_call():
results[domain['id']] = self.backend.sync_domain(context,
@ -765,7 +750,7 @@ class Service(rpc_service.Service):
policy.check('diagnostics_sync_domain', context, target)
records = self.storage_api.get_records(context, domain_id)
records = self.storage_api.find_records(context, domain_id)
with wrap_backend_call():
return self.backend.sync_domain(context, domain, records)

View File

@ -136,9 +136,9 @@ class BaseAddressHandler(Handler):
'managed_resource_type': resource_type
})
records = central_api.get_records(context,
cfg.CONF[self.name].domain_id,
criterion)
records = central_api.find_records(context,
cfg.CONF[self.name].domain_id,
criterion)
for record in records:
LOG.debug('Deleting record %s' % record['id'])

View File

@ -42,15 +42,6 @@ class StorageAPI(object):
self.storage.delete_quota(context, quota['id'])
raise
def get_quotas(self, context, criterion=None):
"""
Get Quotas.
:param context: RPC Context.
:param criterion: Criteria to filter by.
"""
return self.storage.get_quotas(context, criterion)
def get_quota(self, context, quota_id):
"""
Get a Quota via ID.
@ -124,15 +115,6 @@ class StorageAPI(object):
self.storage.delete_server(context, server['id'])
raise
def get_servers(self, context, criterion=None):
"""
Get Servers.
:param context: RPC Context.
:param criterion: Criteria to filter by.
"""
return self.storage.get_servers(context, criterion)
def get_server(self, context, server_id):
"""
Get a Server via ID.
@ -142,7 +124,7 @@ class StorageAPI(object):
"""
return self.storage.get_server(context, server_id)
def find_servers(self, context, criterion):
def find_servers(self, context, criterion=None):
"""
Find Servers
@ -205,15 +187,6 @@ class StorageAPI(object):
self.storage.delete_tsigkey(context, tsigkey['id'])
raise
def get_tsigkeys(self, context, criterion=None):
"""
Get TSIG Keys.
:param context: RPC Context.
:param criterion: Criteria to filter by.
"""
return self.storage.get_tsigkeys(context, criterion)
def get_tsigkey(self, context, tsigkey_id):
"""
Get a TSIG Key via ID.
@ -223,7 +196,7 @@ class StorageAPI(object):
"""
return self.storage.get_tsigkey(context, tsigkey_id)
def find_tsigkeys(self, context, criterion):
def find_tsigkeys(self, context, criterion=None):
"""
Find Tsigkey
@ -271,13 +244,13 @@ class StorageAPI(object):
yield self.storage.get_tsigkey(context, tsigkey_id)
self.storage.delete_tsigkey(context, tsigkey_id)
def get_tenants(self, context):
def find_tenants(self, context):
"""
Get all Tenants.
Find all Tenants.
:param context: RPC Context.
"""
return self.storage.get_tenants(context)
return self.storage.find_tenants(context)
def get_tenant(self, context, tenant_id):
"""
@ -312,15 +285,6 @@ class StorageAPI(object):
self.storage.delete_domain(context, domain['id'])
raise
def get_domains(self, context, criterion=None):
"""
Get all Domains.
:param context: RPC Context.
:param criterion: Criteria to filter by.
"""
return self.storage.get_domains(context, criterion)
def get_domain(self, context, domain_id):
"""
Get a Domain via its ID.
@ -330,7 +294,7 @@ class StorageAPI(object):
"""
return self.storage.get_domain(context, domain_id)
def find_domains(self, context, criterion):
def find_domains(self, context, criterion=None):
"""
Find Domains
@ -404,16 +368,6 @@ class StorageAPI(object):
self.storage.delete_record(context, record['id'])
raise
def get_records(self, context, domain_id, criterion=None):
"""
Get a list of records via a Domain's ID
:param context: RPC Context.
:param domain_id: Domain ID where the records reside.
:param criterion: Criteria to filter by.
"""
return self.storage.get_records(context, domain_id, criterion)
def get_record(self, context, record_id):
"""
Get a record via ID
@ -423,14 +377,15 @@ class StorageAPI(object):
"""
return self.storage.get_record(context, record_id)
def find_records(self, context, criterion):
def find_records(self, context, domain_id, criterion=None):
"""
Find Records.
:param context: RPC Context.
:param domain_id: Domain ID to find in
:param criterion: Criteria to filter by.
"""
return self.storage.find_records(context, criterion)
return self.storage.find_records(context, domain_id, criterion)
def find_record(self, context, criterion):
"""

View File

@ -32,15 +32,6 @@ class Storage(Plugin):
:param values: Values to create the new Quota from.
"""
@abc.abstractmethod
def get_quotas(self, context, criterion=None):
"""
Get Quotas.
:param context: RPC Context.
:param criterion: Criteria to filter by.
"""
@abc.abstractmethod
def get_quota(self, context, quota_id):
"""
@ -97,9 +88,9 @@ class Storage(Plugin):
"""
@abc.abstractmethod
def get_servers(self, context, criterion=None):
def find_servers(self, context, criterion=None):
"""
Get Servers.
Find Servers.
:param context: RPC Context.
:param criterion: Criteria to filter by.
@ -142,9 +133,9 @@ class Storage(Plugin):
"""
@abc.abstractmethod
def get_tsigkeys(self, context, criterion=None):
def find_tsigkeys(self, context, criterion=None):
"""
Get TSIG Keys.
Find TSIG Keys.
:param context: RPC Context.
:param criterion: Criteria to filter by.
@ -179,9 +170,9 @@ class Storage(Plugin):
"""
@abc.abstractmethod
def get_tenants(self, context):
def find_tenants(self, context):
"""
Get all Tenants.
Find all Tenants.
:param context: RPC Context.
"""
@ -212,15 +203,6 @@ class Storage(Plugin):
:param values: Values to create the new Domain from.
"""
@abc.abstractmethod
def get_domains(self, context, criterion=None):
"""
Get all Domains.
:param context: RPC Context.
:param criterion: Criteria to filter by.
"""
@abc.abstractmethod
def get_domain(self, context, domain_id):
"""
@ -231,7 +213,7 @@ class Storage(Plugin):
"""
@abc.abstractmethod
def find_domains(self, context, criterion):
def find_domains(self, context, criterion=None):
"""
Find Domains
@ -286,16 +268,6 @@ class Storage(Plugin):
:param values: Values to create the new Record from.
"""
@abc.abstractmethod
def get_records(self, context, domain_id, criterion=None):
"""
Get a list of records via a Domain's ID
:param context: RPC Context.
:param domain_id: Domain ID where the records reside.
:param criterion: Criteria to filter by.
"""
@abc.abstractmethod
def get_record(self, context, record_id):
"""
@ -306,11 +278,12 @@ class Storage(Plugin):
"""
@abc.abstractmethod
def find_records(self, context, criterion):
def find_records(self, context, domain_id, criterion=None):
"""
Find Records.
:param context: RPC Context.
:param domain_id: Domain ID where the records reside.
:param criterion: Criteria to filter by.
"""

View File

@ -75,6 +75,16 @@ class SQLAlchemyStorage(base.Storage):
return query
## CRUD for our resources (quota, server, tsigkey, tenant, domain & record)
## R - get_*, get_*s, find_*s (get_*s should be removed)
##
## Standard Arguments
## self - python object for the class
## context - a dictionary of details about the request (http etc),
## provided by flask.
## criterion - dictionary of filters to be applied
##
# Quota Methods
def create_quota(self, context, values):
quota = models.Quota()
@ -88,18 +98,6 @@ class SQLAlchemyStorage(base.Storage):
return dict(quota)
def get_quotas(self, context, criterion=None):
query = self.session.query(models.Quota)
query = self._apply_criterion(models.Quota, query, criterion)
try:
result = query.all()
except exc.NoResultFound:
LOG.debug('No results found')
return []
else:
return [dict(o) for o in result]
def _get_quota(self, context, quota_id):
query = self.session.query(models.Quota)
@ -129,7 +127,7 @@ class SQLAlchemyStorage(base.Storage):
quotas = query.all()
return [dict(q) for q in quotas]
def find_quotas(self, context, criterion):
def find_quotas(self, context, criterion=None):
return self._find_quotas(context, criterion)
def find_quota(self, context, criterion):
@ -165,7 +163,7 @@ class SQLAlchemyStorage(base.Storage):
return dict(server)
def get_servers(self, context, criterion=None):
def find_servers(self, context, criterion=None):
query = self.session.query(models.Server)
query = self._apply_criterion(models.Server, query, criterion)
@ -222,7 +220,7 @@ class SQLAlchemyStorage(base.Storage):
return dict(tsigkey)
def get_tsigkeys(self, context, criterion=None):
def find_tsigkeys(self, context, criterion=None):
query = self.session.query(models.TsigKey)
query = self._apply_criterion(models.TsigKey, query, criterion)
@ -266,8 +264,11 @@ class SQLAlchemyStorage(base.Storage):
tsigkey.delete(self.session)
# Tenant Methods
def get_tenants(self, context):
##
## Tenant Methods
##
# returns an array of tenant_id & count of their domains
def find_tenants(self, context):
query = self.session.query(models.Domain.tenant_id,
func.count(models.Domain.id))
query = self._apply_deleted_criteria(context, models.Domain, query)
@ -275,6 +276,7 @@ class SQLAlchemyStorage(base.Storage):
return [{'id': t[0], 'domain_count': t[1]} for t in query.all()]
# get list list & count of all domains owned by given tenant_id
def get_tenant(self, context, tenant_id):
query = self.session.query(models.Domain.name)
query = query.filter(models.Domain.tenant_id == tenant_id)
@ -288,15 +290,18 @@ class SQLAlchemyStorage(base.Storage):
'domains': [r[0] for r in result]
}
# tenants are the owner of domains, count the number of unique tenants
# select count(distinct tenant_id) from domains
def count_tenants(self, context):
# tenants are the owner of domains, count the number of unique tenants
# select count(distinct tenant_id) from domains
query = self.session.query(distinct(models.Domain.tenant_id))
query = self._apply_deleted_criteria(context, models.Domain, query)
return query.count()
# Domain Methods
##
## Domain Methods
##
def _find_domains(self, context, criterion, one=False):
query = self.session.query(models.Domain)
query = self._apply_criterion(models.Domain, query, criterion)
@ -322,17 +327,12 @@ class SQLAlchemyStorage(base.Storage):
return dict(domain)
def get_domains(self, context, criterion=None):
domains = self._find_domains(context, criterion)
return [dict(d) for d in domains]
def get_domain(self, context, domain_id):
domain = self._find_domains(context, {'id': domain_id}, True)
return dict(domain)
def find_domains(self, context, criterion):
def find_domains(self, context, criterion=None):
domains = self._find_domains(context, criterion)
return [dict(d) for d in domains]
@ -379,7 +379,7 @@ class SQLAlchemyStorage(base.Storage):
return dict(record)
def get_records(self, context, domain_id, criterion=None):
def find_records(self, context, domain_id, criterion=None):
query = self.session.query(models.Record)
query = query.filter_by(domain_id=domain_id)
query = self._apply_criterion(models.Record, query, criterion)
@ -415,9 +415,6 @@ class SQLAlchemyStorage(base.Storage):
records = query.all()
return [dict(r) for r in records]
def find_records(self, context, criterion):
return self._find_records(context, criterion)
def find_record(self, context, criterion):
return self._find_records(context, criterion, one=True)

View File

@ -102,7 +102,7 @@ class ApiV1DomainsTest(ApiV1Test):
self.assertIn('domains', response.json)
self.assertEqual(2, len(response.json['domains']))
@patch.object(central_service.Service, 'get_domains',
@patch.object(central_service.Service, 'find_domains',
side_effect=rpc_common.Timeout())
def test_get_domains_timeout(self, _):
self.get('domains', status_code=504)

View File

@ -179,7 +179,7 @@ class ApiV1RecordsTest(ApiV1Test):
self.assertIn('records', response.json)
self.assertEqual(2, len(response.json['records']))
@patch.object(central_service.Service, 'get_records',
@patch.object(central_service.Service, 'find_records',
side_effect=rpc_common.Timeout())
def test_get_records_timeout(self, _):
self.get('domains/%s/records' % self.domain['id'],

View File

@ -85,7 +85,7 @@ class ApiV1ServersTest(ApiV1Test):
self.assertIn('servers', response.json)
self.assertEqual(2, len(response.json['servers']))
@patch.object(central_service.Service, 'get_servers',
@patch.object(central_service.Service, 'find_servers',
side_effect=rpc_common.Timeout())
def test_get_servers_timeout(self, _):
self.get('servers', status_code=504)

View File

@ -133,18 +133,18 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNotNone(server['id'])
self.assertEqual(server['name'], values['name'])
def test_get_servers(self):
def test_find_servers(self):
context = self.get_admin_context()
# Ensure we have no servers to start with.
servers = self.central_service.get_servers(context)
servers = self.central_service.find_servers(context)
self.assertEqual(len(servers), 0)
# Create a single server (using default values)
self.create_server()
# Ensure we can retrieve the newly created server
servers = self.central_service.get_servers(context)
servers = self.central_service.find_servers(context)
self.assertEqual(len(servers), 1)
self.assertEqual(servers[0]['name'], 'ns1.example.org.')
@ -152,7 +152,7 @@ class CentralServiceTest(CentralTestCase):
self.create_server(name='ns2.example.org.')
# Ensure we can retrieve both servers
servers = self.central_service.get_servers(context)
servers = self.central_service.find_servers(context)
self.assertEqual(len(servers), 2)
self.assertEqual(servers[0]['name'], 'ns1.example.org.')
self.assertEqual(servers[1]['name'], 'ns2.example.org.')
@ -216,18 +216,18 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(tsigkey['algorithm'], values['algorithm'])
self.assertEqual(tsigkey['secret'], values['secret'])
def test_get_tsigkeys(self):
def test_find_tsigkeys(self):
context = self.get_admin_context()
# Ensure we have no tsigkeys to start with.
tsigkeys = self.central_service.get_tsigkeys(context)
tsigkeys = self.central_service.find_tsigkeys(context)
self.assertEqual(len(tsigkeys), 0)
# Create a single tsigkey (using default values)
tsigkey_one = self.create_tsigkey()
# Ensure we can retrieve the newly created tsigkey
tsigkeys = self.central_service.get_tsigkeys(context)
tsigkeys = self.central_service.find_tsigkeys(context)
self.assertEqual(len(tsigkeys), 1)
self.assertEqual(tsigkeys[0]['name'], tsigkey_one['name'])
@ -235,7 +235,7 @@ class CentralServiceTest(CentralTestCase):
tsigkey_two = self.create_tsigkey(fixture=1)
# Ensure we can retrieve both tsigkeys
tsigkeys = self.central_service.get_tsigkeys(context)
tsigkeys = self.central_service.find_tsigkeys(context)
self.assertEqual(len(tsigkeys), 2)
self.assertEqual(tsigkeys[0]['name'], tsigkey_one['name'])
self.assertEqual(tsigkeys[1]['name'], tsigkey_two['name'])
@ -466,18 +466,18 @@ class CentralServiceTest(CentralTestCase):
# Create an invalid domain
self.central_service.create_domain(context, values=values)
def test_get_domains(self):
def test_find_domains(self):
context = self.get_admin_context()
# Ensure we have no domains to start with.
domains = self.central_service.get_domains(context)
domains = self.central_service.find_domains(context)
self.assertEqual(len(domains), 0)
# Create a single domain (using default values)
self.create_domain()
# Ensure we can retrieve the newly created domain
domains = self.central_service.get_domains(context)
domains = self.central_service.find_domains(context)
self.assertEqual(len(domains), 1)
self.assertEqual(domains[0]['name'], 'example.com.')
@ -485,35 +485,49 @@ class CentralServiceTest(CentralTestCase):
self.create_domain(name='example.net.')
# Ensure we can retrieve both domain
domains = self.central_service.get_domains(context)
domains = self.central_service.find_domains(context)
self.assertEqual(len(domains), 2)
self.assertEqual(domains[0]['name'], 'example.com.')
self.assertEqual(domains[1]['name'], 'example.net.')
def test_get_domains_tenant_restrictions(self):
def test_find_domains_criteria(self):
context = self.get_admin_context()
# Create a domain
domain_name = '%d.example.com.' % random.randint(10, 1000)
expected_domain = self.create_domain(name=domain_name)
# Retrieve it, and ensure it's the same
criterion = {'name': domain_name}
domains = self.central_service.find_domains(context, criterion)
self.assertEqual(domains[0]['id'], expected_domain['id'])
self.assertEqual(domains[0]['name'], expected_domain['name'])
self.assertEqual(domains[0]['email'], expected_domain['email'])
def test_find_domains_tenant_restrictions(self):
admin_context = self.get_admin_context()
tenant_one_context = self.get_context(tenant=1)
tenant_two_context = self.get_context(tenant=2)
# Ensure we have no domains to start with.
domains = self.central_service.get_domains(admin_context)
domains = self.central_service.find_domains(admin_context)
self.assertEqual(len(domains), 0)
# Create a single domain (using default values)
self.create_domain(context=tenant_one_context)
# Ensure admins can retrieve the newly created domain
domains = self.central_service.get_domains(admin_context)
domains = self.central_service.find_domains(admin_context)
self.assertEqual(len(domains), 1)
self.assertEqual(domains[0]['name'], 'example.com.')
# Ensure tenant=1 can retrieve the newly created domain
domains = self.central_service.get_domains(tenant_one_context)
domains = self.central_service.find_domains(tenant_one_context)
self.assertEqual(len(domains), 1)
self.assertEqual(domains[0]['name'], 'example.com.')
# Ensure tenant=2 can NOT retrieve the newly created domain
domains = self.central_service.get_domains(tenant_two_context)
domains = self.central_service.find_domains(tenant_two_context)
self.assertEqual(len(domains), 0)
def test_get_domain(self):
@ -906,19 +920,19 @@ class CentralServiceTest(CentralTestCase):
self.central_service.create_record(context, domain['id'],
values=values)
def test_get_records(self):
def test_find_records(self):
context = self.get_admin_context()
domain = self.create_domain()
# Ensure we have no records to start with.
records = self.central_service.get_records(context, domain['id'])
records = self.central_service.find_records(context, domain['id'])
self.assertEqual(len(records), 0)
# Create a single record (using default values)
self.create_record(domain)
# Ensure we can retrieve the newly created record
records = self.central_service.get_records(context, domain['id'])
records = self.central_service.find_records(context, domain['id'])
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['name'], 'www.%s' % domain['name'])
@ -926,7 +940,7 @@ class CentralServiceTest(CentralTestCase):
self.create_record(domain, name='mail.%s' % domain['name'])
# Ensure we can retrieve both records
records = self.central_service.get_records(context, domain['id'])
records = self.central_service.find_records(context, domain['id'])
self.assertEqual(len(records), 2)
self.assertEqual(records[0]['name'], 'www.%s' % domain['name'])
self.assertEqual(records[1]['name'], 'mail.%s' % domain['name'])

View File

@ -40,16 +40,16 @@ class NovaFixedHandlerTest(NotificationHandlerTestCase):
self.assertIn(event_type, self.plugin.get_event_types())
# Ensure we start with 0 records
records = self.central_service.get_records(self.admin_context,
self.domain_id)
records = self.central_service.find_records(self.admin_context,
self.domain_id)
self.assertEqual(0, len(records))
self.plugin.process_notification(event_type, fixture['payload'])
# Ensure we now have exactly 1 record
records = self.central_service.get_records(self.admin_context,
self.domain_id)
records = self.central_service.find_records(self.admin_context,
self.domain_id)
self.assertEqual(len(records), 1)
@ -68,15 +68,15 @@ class NovaFixedHandlerTest(NotificationHandlerTestCase):
self.assertIn(event_type, self.plugin.get_event_types())
# Ensure we start with at least 1 record
records = self.central_service.get_records(self.admin_context,
self.domain_id)
records = self.central_service.find_records(self.admin_context,
self.domain_id)
self.assertGreaterEqual(len(records), 1)
self.plugin.process_notification(event_type, fixture['payload'])
# Ensure we now have exactly 0 records
records = self.central_service.get_records(self.admin_context,
self.domain_id)
records = self.central_service.find_records(self.admin_context,
self.domain_id)
self.assertEqual(0, len(records))

View File

@ -41,16 +41,16 @@ class QuantumFloatingHandlerTest(NotificationHandlerTestCase):
self.assertIn(event_type, self.plugin.get_event_types())
# Ensure we start with 0 records
records = self.central_service.get_records(self.admin_context,
self.domain_id)
records = self.central_service.find_records(self.admin_context,
self.domain_id)
self.assertEqual(0, len(records))
self.plugin.process_notification(event_type, fixture['payload'])
# Ensure we now have exactly 1 record
records = self.central_service.get_records(self.admin_context,
self.domain_id)
records = self.central_service.find_records(self.admin_context,
self.domain_id)
self.assertEqual(len(records), 1)
@ -68,14 +68,14 @@ class QuantumFloatingHandlerTest(NotificationHandlerTestCase):
self.assertIn(event_type, self.plugin.get_event_types())
# Ensure we start with at least 1 record
records = self.central_service.get_records(self.admin_context,
self.domain_id)
records = self.central_service.find_records(self.admin_context,
self.domain_id)
self.assertGreaterEqual(len(records), 1)
self.plugin.process_notification(event_type, fixture['payload'])
records = self.central_service.get_records(self.admin_context,
self.domain_id)
records = self.central_service.find_records(self.admin_context,
self.domain_id)
self.assertEqual(0, len(records))

View File

@ -73,14 +73,14 @@ class StorageTestCase(TestCase):
with self.assertRaises(exceptions.DuplicateQuota):
self.create_quota()
def test_get_quotas(self):
actual = self.storage.get_quotas(self.admin_context)
def test_find_quotas(self):
actual = self.storage.find_quotas(self.admin_context)
self.assertEqual(actual, [])
# Create a single quota
_, quota_one = self.create_quota()
actual = self.storage.get_quotas(self.admin_context)
actual = self.storage.find_quotas(self.admin_context)
self.assertEqual(len(actual), 1)
self.assertEqual(actual[0]['tenant_id'], quota_one['tenant_id'])
@ -90,14 +90,14 @@ class StorageTestCase(TestCase):
# Create a second quota
_, quota_two = self.create_quota(fixture=1)
actual = self.storage.get_quotas(self.admin_context)
actual = self.storage.find_quotas(self.admin_context)
self.assertEqual(len(actual), 2)
self.assertEqual(actual[1]['tenant_id'], quota_two['tenant_id'])
self.assertEqual(actual[1]['resource'], quota_two['resource'])
self.assertEqual(actual[1]['hard_limit'], quota_two['hard_limit'])
def test_get_quotas_criterion(self):
def test_find_quotas_criterion(self):
_, quota_one = self.create_quota(0)
_, quota_two = self.create_quota(1)
@ -106,7 +106,7 @@ class StorageTestCase(TestCase):
resource=quota_one['resource']
)
results = self.storage.get_quotas(self.admin_context, criterion)
results = self.storage.find_quotas(self.admin_context, criterion)
self.assertEqual(len(results), 1)
@ -119,7 +119,7 @@ class StorageTestCase(TestCase):
resource=quota_two['resource']
)
results = self.storage.get_quotas(self.admin_context, criterion)
results = self.storage.find_quotas(self.admin_context, criterion)
self.assertEqual(len(results), 1)
@ -238,14 +238,14 @@ class StorageTestCase(TestCase):
with self.assertRaises(exceptions.DuplicateServer):
self.create_server()
def test_get_servers(self):
actual = self.storage.get_servers(self.admin_context)
def test_find_servers(self):
actual = self.storage.find_servers(self.admin_context)
self.assertEqual(actual, [])
# Create a single server
_, server_one = self.create_server()
actual = self.storage.get_servers(self.admin_context)
actual = self.storage.find_servers(self.admin_context)
self.assertEqual(len(actual), 1)
self.assertEqual(str(actual[0]['name']), str(server_one['name']))
@ -253,12 +253,12 @@ class StorageTestCase(TestCase):
# Create a second server
_, server_two = self.create_server(fixture=1)
actual = self.storage.get_servers(self.admin_context)
actual = self.storage.find_servers(self.admin_context)
self.assertEqual(len(actual), 2)
self.assertEqual(str(actual[1]['name']), str(server_two['name']))
def test_get_servers_criterion(self):
def test_find_servers_criterion(self):
_, server_one = self.create_server(0)
_, server_two = self.create_server(1)
@ -266,7 +266,7 @@ class StorageTestCase(TestCase):
name=server_one['name']
)
results = self.storage.get_servers(self.admin_context, criterion)
results = self.storage.find_servers(self.admin_context, criterion)
self.assertEqual(len(results), 1)
@ -276,7 +276,7 @@ class StorageTestCase(TestCase):
name=server_two['name']
)
results = self.storage.get_servers(self.admin_context, criterion)
results = self.storage.find_servers(self.admin_context, criterion)
self.assertEqual(len(results), 1)
@ -356,14 +356,14 @@ class StorageTestCase(TestCase):
with self.assertRaises(exceptions.DuplicateTsigKey):
self.create_tsigkey(values=values)
def test_get_tsigkeys(self):
actual = self.storage.get_tsigkeys(self.admin_context)
def test_find_tsigkeys(self):
actual = self.storage.find_tsigkeys(self.admin_context)
self.assertEqual(actual, [])
# Create a single tsigkey
_, tsigkey_one = self.create_tsigkey()
actual = self.storage.get_tsigkeys(self.admin_context)
actual = self.storage.find_tsigkeys(self.admin_context)
self.assertEqual(len(actual), 1)
self.assertEqual(actual[0]['name'], tsigkey_one['name'])
@ -373,14 +373,14 @@ class StorageTestCase(TestCase):
# Create a second tsigkey
_, tsigkey_two = self.create_tsigkey(fixture=1)
actual = self.storage.get_tsigkeys(self.admin_context)
actual = self.storage.find_tsigkeys(self.admin_context)
self.assertEqual(len(actual), 2)
self.assertEqual(actual[1]['name'], tsigkey_two['name'])
self.assertEqual(actual[1]['algorithm'], tsigkey_two['algorithm'])
self.assertEqual(actual[1]['secret'], tsigkey_two['secret'])
def test_get_tsigkeys_criterion(self):
def test_find_tsigkeys_criterion(self):
_, tsigkey_one = self.create_tsigkey(fixture=0)
_, tsigkey_two = self.create_tsigkey(fixture=1)
@ -388,7 +388,7 @@ class StorageTestCase(TestCase):
name=tsigkey_one['name']
)
results = self.storage.get_tsigkeys(self.admin_context, criterion)
results = self.storage.find_tsigkeys(self.admin_context, criterion)
self.assertEqual(len(results), 1)
@ -398,7 +398,7 @@ class StorageTestCase(TestCase):
name=tsigkey_two['name']
)
results = self.storage.get_tsigkeys(self.admin_context, criterion)
results = self.storage.find_tsigkeys(self.admin_context, criterion)
self.assertEqual(len(results), 1)
@ -461,7 +461,7 @@ class StorageTestCase(TestCase):
self.storage.delete_tsigkey(self.admin_context, uuid)
# Tenant Tests
def test_get_tenants(self):
def test_find_tenants(self):
# create 3 domains in 2 tenants
self.create_domain(fixture=0, values={'tenant_id': 'One'})
_, domain = self.create_domain(fixture=1, values={'tenant_id': 'One'})
@ -471,7 +471,7 @@ class StorageTestCase(TestCase):
self.storage.delete_domain(self.admin_context, domain['id'])
# Ensure we get accurate results
result = self.storage.get_tenants(self.admin_context)
result = self.storage.find_tenants(self.admin_context)
expected = [{
'id': 'One',
@ -538,14 +538,14 @@ class StorageTestCase(TestCase):
with self.assertRaises(exceptions.DuplicateDomain):
self.create_domain()
def test_get_domains(self):
actual = self.storage.get_domains(self.admin_context)
def test_find_domains(self):
actual = self.storage.find_domains(self.admin_context)
self.assertEqual(actual, [])
# Create a single domain
fixture_one, domain_one = self.create_domain()
actual = self.storage.get_domains(self.admin_context)
actual = self.storage.find_domains(self.admin_context)
self.assertEqual(len(actual), 1)
self.assertEqual(actual[0]['name'], domain_one['name'])
@ -554,10 +554,10 @@ class StorageTestCase(TestCase):
# Create a second domain
self.create_domain(fixture=1)
actual = self.storage.get_domains(self.admin_context)
actual = self.storage.find_domains(self.admin_context)
self.assertEqual(len(actual), 2)
def test_get_domains_criterion(self):
def test_find_domains_criterion(self):
_, domain_one = self.create_domain(0)
_, domain_two = self.create_domain(1)
@ -565,7 +565,7 @@ class StorageTestCase(TestCase):
name=domain_one['name']
)
results = self.storage.get_domains(self.admin_context, criterion)
results = self.storage.find_domains(self.admin_context, criterion)
self.assertEqual(len(results), 1)
@ -576,7 +576,7 @@ class StorageTestCase(TestCase):
name=domain_two['name']
)
results = self.storage.get_domains(self.admin_context, criterion)
results = self.storage.find_domains(self.admin_context, criterion)
self.assertEqual(len(results), 1)
@ -719,15 +719,15 @@ class StorageTestCase(TestCase):
# Attempt to create the second/duplicate record
self.create_record(domain)
def test_get_records(self):
def test_find_records(self):
_, domain = self.create_domain()
actual = self.storage.get_records(self.admin_context, domain['id'])
actual = self.storage.find_records(self.admin_context, domain['id'])
self.assertEqual(actual, [])
# Create a single record
_, record_one = self.create_record(domain, fixture=0)
actual = self.storage.get_records(self.admin_context, domain['id'])
actual = self.storage.find_records(self.admin_context, domain['id'])
self.assertEqual(len(actual), 1)
self.assertEqual(actual[0]['name'], record_one['name'])
@ -737,14 +737,14 @@ class StorageTestCase(TestCase):
# Create a second record
_, record_two = self.create_record(domain, fixture=1)
actual = self.storage.get_records(self.admin_context, domain['id'])
actual = self.storage.find_records(self.admin_context, domain['id'])
self.assertEqual(len(actual), 2)
self.assertEqual(actual[1]['name'], record_two['name'])
self.assertEqual(actual[1]['type'], record_two['type'])
self.assertEqual(actual[1]['data'], record_two['data'])
def test_get_records_criterion(self):
def test_find_records_criterion(self):
_, domain = self.create_domain()
_, record_one = self.create_record(domain, fixture=0)
@ -754,8 +754,8 @@ class StorageTestCase(TestCase):
data=record_one['data']
)
results = self.storage.get_records(self.admin_context, domain['id'],
criterion)
results = self.storage.find_records(self.admin_context, domain['id'],
criterion)
self.assertEqual(len(results), 1)
@ -763,12 +763,12 @@ class StorageTestCase(TestCase):
type='A'
)
results = self.storage.get_records(self.admin_context, domain['id'],
criterion)
results = self.storage.find_records(self.admin_context, domain['id'],
criterion)
self.assertEqual(len(results), 2)
def test_get_records_criterion_wildcard(self):
def test_find_records_criterion_wildcard(self):
_, domain = self.create_domain()
values = {'name': 'one.%s' % domain['name']}
@ -778,8 +778,8 @@ class StorageTestCase(TestCase):
name="%%%s" % domain['name']
)
results = self.storage.get_records(self.admin_context, domain['id'],
criterion)
results = self.storage.find_records(self.admin_context, domain['id'],
criterion)
self.assertEqual(len(results), 1)

View File

@ -76,17 +76,6 @@ class StorageAPITest(TestCase):
self._assert_called_with('create_quota', context, values)
self._assert_called_with('delete_quota', context, 12345)
def test_get_quotas(self):
context = mock.sentinel.context
criterion = mock.sentinel.criterion
quota = mock.sentinel.quota
self._set_side_effect('get_quotas', [[quota]])
result = self.storage_api.get_quotas(context, criterion)
self._assert_called_with('get_quotas', context, criterion)
self.assertEquals([quota], result)
def test_get_quota(self):
context = mock.sentinel.context
quota_id = mock.sentinel.quota_id
@ -193,17 +182,6 @@ class StorageAPITest(TestCase):
self._assert_called_with('create_server', context, values)
self._assert_called_with('delete_server', context, 12345)
def test_get_servers(self):
context = mock.sentinel.context
criterion = mock.sentinel.criterion
server = mock.sentinel.server
self._set_side_effect('get_servers', [[server]])
result = self.storage_api.get_servers(context, criterion)
self._assert_called_with('get_servers', context, criterion)
self.assertEquals([server], result)
def test_get_server(self):
context = mock.sentinel.context
server_id = mock.sentinel.server_id
@ -310,17 +288,6 @@ class StorageAPITest(TestCase):
self._assert_called_with('create_tsigkey', context, values)
self._assert_called_with('delete_tsigkey', context, 12345)
def test_get_tsigkeys(self):
context = mock.sentinel.context
criterion = mock.sentinel.criterion
tsigkey = mock.sentinel.tsigkey
self._set_side_effect('get_tsigkeys', [[tsigkey]])
result = self.storage_api.get_tsigkeys(context, criterion)
self._assert_called_with('get_tsigkeys', context, criterion)
self.assertEquals([tsigkey], result)
def test_get_tsigkey(self):
context = mock.sentinel.context
tsigkey_id = mock.sentinel.tsigkey_id
@ -402,14 +369,14 @@ class StorageAPITest(TestCase):
self._assert_call_count('delete_tsigkey', 0)
# Tenant Tests
def test_get_tenants(self):
def test_find_tenants(self):
context = mock.sentinel.context
tenant = mock.sentinel.tenant
self._set_side_effect('get_tenants', [[tenant]])
self._set_side_effect('find_tenants', [[tenant]])
result = self.storage_api.get_tenants(context)
self._assert_called_with('get_tenants', context)
result = self.storage_api.find_tenants(context)
self._assert_called_with('find_tenants', context)
self.assertEquals([tenant], result)
def test_get_tenant(self):
@ -457,17 +424,6 @@ class StorageAPITest(TestCase):
self._assert_called_with('create_domain', context, values)
self._assert_called_with('delete_domain', context, 12345)
def test_get_domains(self):
context = mock.sentinel.context
criterion = mock.sentinel.criterion
domain = mock.sentinel.domain
self._set_side_effect('get_domains', [[domain]])
result = self.storage_api.get_domains(context, criterion)
self._assert_called_with('get_domains', context, criterion)
self.assertEquals([domain], result)
def test_get_domain(self):
context = mock.sentinel.context
domain_id = mock.sentinel.domain_id
@ -574,17 +530,6 @@ class StorageAPITest(TestCase):
self._assert_called_with('create_record', context, 123, values)
self._assert_called_with('delete_record', context, 12345)
def test_get_records(self):
context = mock.sentinel.context
criterion = mock.sentinel.criterion
record = mock.sentinel.record
self._set_side_effect('get_records', [[record]])
result = self.storage_api.get_records(context, 123, criterion)
self._assert_called_with('get_records', context, 123, criterion)
self.assertEquals([record], result)
def test_get_record(self):
context = mock.sentinel.context
record_id = mock.sentinel.record_id
@ -598,13 +543,14 @@ class StorageAPITest(TestCase):
def test_find_records(self):
context = mock.sentinel.context
domain_id = mock.sentinel.domain_id
criterion = mock.sentinel.criterion
record = mock.sentinel.record
self._set_side_effect('find_records', [[record]])
result = self.storage_api.find_records(context, criterion)
self._assert_called_with('find_records', context, criterion)
result = self.storage_api.find_records(context, domain_id, criterion)
self._assert_called_with('find_records', context, domain_id, criterion)
self.assertEquals([record], result)
def test_find_record(self):

View File

@ -6,18 +6,18 @@
"default": "rule:admin_or_owner",
"create_server": "rule:admin",
"get_servers": "rule:admin",
"find_servers": "rule:admin",
"get_server": "rule:admin",
"update_server": "rule:admin",
"delete_server": "rule:admin",
"create_tsigkey": "rule:admin",
"get_tsigkeys": "rule:admin",
"find_tsigkeys": "rule:admin",
"get_tsigkey": "rule:admin",
"update_tsigkey": "rule:admin",
"delete_tsigkey": "rule:admin",
"get_tenants": "rule:admin",
"find_tenants": "rule:admin",
"get_tenant": "rule:admin",
"count_tenants": "rule:admin",