Move Pool NS Records to their own table

The provides part 1 of the Pools API changes discussed at the
mid cycle. Th majority of the remaining changes are new code/
objects rather than refactoring - so this change has been
split into it's own review.

Change-Id: I2291d76d36e8fcdeb8dffca855e7292d8cb22bfb
This commit is contained in:
Kiall Mac Innes 2015-03-17 14:19:42 +00:00
parent eaa914489d
commit e0dc0e77b4
25 changed files with 711 additions and 758 deletions

View File

@ -29,17 +29,16 @@ domains_schema = schema.Schema('v1', 'domains')
servers_schema = schema.Schema('v1', 'servers')
def _poolattribute_to_server(pool_attribute):
def _pool_ns_record_to_server(pool_ns_record):
server_values = {
'id': pool_attribute.id,
'created_at': pool_attribute.created_at,
'updated_at': pool_attribute.updated_at,
'version': pool_attribute.version,
'name': pool_attribute.value
'id': pool_ns_record.id,
'created_at': pool_ns_record.created_at,
'updated_at': pool_ns_record.updated_at,
'version': pool_ns_record.version,
'name': pool_ns_record.hostname
}
server = objects.Server(**server_values)
return server
return objects.Server.from_dict(server_values)
@blueprint.route('/schemas/domain', methods=['GET'])
@ -153,6 +152,6 @@ def get_domain_servers(domain_id):
servers = objects.ServerList()
for ns in nameservers:
servers.append(_poolattribute_to_server(ns))
servers.append(_pool_ns_record_to_server(ns))
return flask.jsonify(servers_schema.filter({'servers': servers}))

View File

@ -34,17 +34,16 @@ default_pool_id = cfg.CONF['service:central'].default_pool_id
# to work
def _poolattribute_to_server(pool_attribute):
def _pool_ns_record_to_server(pool_ns_record):
server_values = {
'id': pool_attribute.id,
'created_at': pool_attribute.created_at,
'updated_at': pool_attribute.updated_at,
'version': pool_attribute.version,
'name': pool_attribute.value
'id': pool_ns_record.id,
'created_at': pool_ns_record.created_at,
'updated_at': pool_ns_record.updated_at,
'version': pool_ns_record.version,
'name': pool_ns_record.hostname
}
server = objects.Server(**server_values)
return server
return objects.Server.from_dict(server_values)
@blueprint.route('/schemas/server', methods=['GET'])
@ -65,19 +64,18 @@ def create_server():
# Validate against the original server schema
server_schema.validate(values)
# Create a PoolAttribute object
pa_values = {
'pool_id': default_pool_id,
'key': 'name_server',
'value': values['name']
# Create a PoolNsRecord object
pns_values = {
'priority': 10,
'hostname': values['name']
}
nameserver = objects.NameServer(**pa_values)
ns_record = objects.PoolNsRecord.from_dict(pns_values)
# Get the default pool
pool = central_api.get_pool(context, default_pool_id)
# Add the new PoolAttribute to the pool as a nameserver
pool.nameservers.append(nameserver)
pool.ns_records.append(ns_record)
try:
# Update the pool
@ -86,15 +84,15 @@ def create_server():
except exceptions.DuplicatePoolAttribute:
raise exceptions.DuplicateServer()
# Go through the pool.nameservers to find the right one to get the ID
for ns in updated_pool.nameservers:
if ns.value == pa_values['value']:
created_nameserver = ns
# Go through the pool.ns_records to find the right one to get the ID
for ns in updated_pool.ns_records:
if ns.hostname == pns_values['hostname']:
created_ns_record = ns
break
# Convert the PoolAttribute to a Server so we can validate with the
# original schema and display
server = _poolattribute_to_server(created_nameserver)
server = _pool_ns_record_to_server(created_ns_record)
response = flask.jsonify(server_schema.filter(server))
response.status_int = 201
@ -114,8 +112,8 @@ def get_servers():
servers = objects.ServerList()
for ns in pool.nameservers:
servers.append(_poolattribute_to_server(ns))
for ns in pool.ns_records:
servers.append(_pool_ns_record_to_server(ns))
return flask.jsonify(servers_schema.filter({'servers': servers}))
@ -129,11 +127,11 @@ def get_server(server_id):
# Get the default pool
pool = central_api.get_pool(context, default_pool_id)
# Create an empty PoolAttribute object
nameserver = objects.NameServer()
# Create an empty PoolNsRecord object
nameserver = objects.PoolNsRecord()
# Get the desired nameserver from the pool
for ns in pool.nameservers:
for ns in pool.ns_records:
if ns.id == server_id:
nameserver = ns
break
@ -142,7 +140,7 @@ def get_server(server_id):
if nameserver.id != server_id:
raise exceptions.ServerNotFound
server = _poolattribute_to_server(nameserver)
server = _pool_ns_record_to_server(nameserver)
return flask.jsonify(server_schema.filter(server))
@ -159,31 +157,31 @@ def update_server(server_id):
# Get the Nameserver from the pool
index = -1
nameservers = pool.nameservers
for ns in nameservers:
ns_records = pool.ns_records
for ns in ns_records:
if ns.id == server_id:
index = nameservers.index(ns)
index = ns_records.index(ns)
break
if index == -1:
raise exceptions.ServerNotFound
# Get the nameserver from the pool so we can update it
nameserver = nameservers.pop(index)
# Get the ns_record from the pool so we can update it
nameserver = ns_records.pop(index)
# Update it with the new values
nameserver.update({'value': values['name']})
nameserver.update({'hostname': values['name']})
# Change it to a server, so we can use the original validation. We want
# to make sure we don't change anything in v1
server = _poolattribute_to_server(nameserver)
server = _pool_ns_record_to_server(nameserver)
server_data = server_schema.filter(server)
server_data.update(values)
# Validate the new set of data
server_schema.validate(server_data)
# Now that it's been validated, add it back to the pool and persist it
pool.nameservers.append(nameserver)
pool.ns_records.append(nameserver)
central_api.update_pool(context, pool)
return flask.jsonify(server_schema.filter(server))
@ -200,17 +198,17 @@ def delete_server(server_id):
# Get the Nameserver from the pool
index = -1
nameservers = pool.nameservers
for ns in nameservers:
ns_records = pool.ns_records
for ns in ns_records:
if ns.id == server_id:
index = nameservers.index(ns)
index = ns_records.index(ns)
break
if index == -1:
raise exceptions.ServerNotFound
# Remove the nameserver from the pool so it will be deleted
nameservers.pop(index)
ns_records.pop(index)
# Update the pool without the deleted server
central_api.update_pool(context, pool)

View File

@ -1,38 +0,0 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pecan
from oslo_log import log as logging
from designate import utils
from designate.api.v2.controllers import rest
from designate.api.v2.views import nameservers as nameservers_view
LOG = logging.getLogger(__name__)
class NameServersController(rest.RestController):
_view = nameservers_view.NameServerView()
@pecan.expose(template='json:', content_type='application/json')
@utils.validate_uuid('zone_id')
def get_all(self, zone_id):
request = pecan.request
context = pecan.request.environ['context']
servers = self.central_api.get_domain_servers(context, zone_id)
return self._view.list(context, request, servers, [zone_id])

View File

@ -23,7 +23,6 @@ from designate import utils
from designate import schema
from designate import dnsutils
from designate.api.v2.controllers import rest
from designate.api.v2.controllers import nameservers
from designate.api.v2.controllers import recordsets
from designate.api.v2.controllers.zones import tasks
from designate.api.v2.views import zones as zones_view
@ -40,7 +39,6 @@ class ZonesController(rest.RestController):
SORT_KEYS = ['created_at', 'id', 'updated_at', 'name', 'tenant_id',
'serial', 'ttl', 'status']
nameservers = nameservers.NameServersController()
recordsets = recordsets.RecordSetsController()
tasks = tasks.TasksController()

View File

@ -1,43 +0,0 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from designate.api.v2.views import base as base_view
LOG = logging.getLogger(__name__)
class NameServerView(base_view.BaseView):
"""Model a NameServer API response as a python dictionary"""
_resource_name = 'nameserver'
_collection_name = 'nameservers'
def _get_base_href(self, parents=None):
assert len(parents) == 1
href = "%s/v2/zones/%s/nameservers" % (self.base_uri, parents[0])
return href.rstrip('?')
def show_basic(self, context, request, nameserver):
"""Basic view of a nameserver"""
return {
"id": nameserver["id"],
"name": nameserver["value"]
}

View File

@ -34,7 +34,8 @@ class PoolsView(base_view.BaseView):
"name": pool['name'],
"project_id": pool['tenant_id'],
"attributes": dict((r.key, r.value) for r in pool['attributes']),
"nameservers": [r.value for r in pool['nameservers']],
"ns_records": [{'priority': n.priority, 'hostname': n.hostname}
for n in pool['ns_records']],
"description": pool['description'],
"created_at": pool['created_at'],
"updated_at": pool['updated_at'],
@ -43,13 +44,14 @@ class PoolsView(base_view.BaseView):
def load(self, context, request, body):
"""Extract a "central" compatible dict from an API call"""
valid_keys = ('name', 'attributes', 'nameservers', 'description')
valid_keys = ('name', 'attributes', 'ns_records', 'description')
result = self._load(context, request, body, valid_keys)
if 'nameservers' in result:
result['nameservers'] = objects.NameServerList(
objects=[objects.NameServer(key='name_server', value=r)
for r in result['nameservers']])
if 'ns_records' in result:
result['ns_records'] = objects.PoolNsRecordList(
objects=[objects.PoolNsRecord(priority=r['priority'],
hostname=r['hostname'])
for r in result['ns_records']])
if 'attributes' in result:
result['attributes'] = objects.PoolAttributeList(

View File

@ -477,8 +477,8 @@ class Service(service.RPCService, service.Service):
return domain
# SOA Recordset Methods
def _build_soa_record(self, zone, nameservers):
return "%s %s. %d %d %d %d %d" % (nameservers[0]['value'],
def _build_soa_record(self, zone, ns_records):
return "%s %s. %d %d %d %d %d" % (ns_records[0]['hostname'],
zone['email'].replace("@", "."),
zone['serial'],
zone['refresh'],
@ -487,17 +487,14 @@ class Service(service.RPCService, service.Service):
zone['minimum'])
def _create_soa(self, context, zone):
# Need elevated context to get the servers
# Need elevated context to get the pool
elevated_context = context.elevated()
elevated_context.all_tenants = True
# Get the nameservers
nameservers = self.storage.find_pool_attributes(
context=elevated_context,
criterion={'pool_id': zone.pool_id, 'key': 'name_server'}
)
# Get the pool for it's list of ns_records
pool = self.storage.get_pool(elevated_context, zone.pool_id)
soa_values = [self._build_soa_record(zone, nameservers)]
soa_values = [self._build_soa_record(zone, pool.ns_records)]
recordlist = objects.RecordList(objects=[
objects.Record(data=r, managed=True) for r in soa_values])
values = {
@ -515,29 +512,31 @@ class Service(service.RPCService, service.Service):
if zone.type != 'PRIMARY':
return
nameservers = self.get_domain_servers(context, zone['id'])
# Need elevated context to get the pool
elevated_context = context.elevated()
elevated_context.all_tenants = True
# Get the pool for it's list of ns_records
pool = self.storage.get_pool(elevated_context, zone.pool_id)
soa = self.find_recordset(context,
criterion={'domain_id': zone['id'],
'type': "SOA"})
soa.records[0].data = self._build_soa_record(zone, nameservers)
soa.records[0].data = self._build_soa_record(zone, pool.ns_records)
self._update_recordset_in_storage(context, zone, soa,
increment_serial=False)
# NS Recordset Methods
def _create_ns(self, context, zone, nameservers):
def _create_ns(self, context, zone, ns_records):
# NOTE: We should not be creating NS records when a zone is SECONDARY.
if zone.type != 'PRIMARY':
return
# Create an NS record for each server
ns_values = []
for s in nameservers:
ns_values.append(s.value)
recordlist = objects.RecordList(objects=[
objects.Record(data=r, managed=True) for r in ns_values])
objects.Record(data=r, managed=True) for r in ns_records])
values = {
'name': zone['name'],
'type': "NS",
@ -549,51 +548,32 @@ class Service(service.RPCService, service.Service):
return ns
def _update_ns(self, context, zone, orig_name, new_name):
# NOTE: We should not be updating NS records when a zone is SECONDARY.
if zone.type != 'PRIMARY':
return
# Get the zone's NS recordset
ns = self.find_recordset(context,
criterion={'domain_id': zone['id'],
'type': "NS"})
for r in ns.records:
if r.data == orig_name:
r.data = new_name
self._update_recordset_in_storage(context, zone, ns)
def _add_ns(self, context, zone, nameserver):
def _add_ns(self, context, zone, ns_record):
# Get NS recordset
# If the zone doesn't have an NS recordset yet, create one
try:
ns = self.find_recordset(context,
criterion={'domain_id': zone['id'],
'type': "NS"})
ns_recordset = self.find_recordset(
context, criterion={'domain_id': zone['id'], 'type': "NS"})
except exceptions.RecordSetNotFound:
nameservers = objects.PoolAttributeList()
nameservers.append(nameserver)
self._create_ns(context, zone, nameservers)
self._create_ns(context, zone, [ns_record])
return
# Add new record to recordset based on the new nameserver
ns_record = objects.Record(data=nameserver.value)
ns.records.append(ns_record)
ns_recordset.records.append(
objects.Record(data=ns_record, managed=True))
self._update_recordset_in_storage(context, zone, ns)
self._update_recordset_in_storage(context, zone, ns_recordset)
def _delete_ns(self, context, zone, nameserver):
ns = self.find_recordset(context,
criterion={'domain_id': zone['id'],
'type': "NS"})
records = ns.records
def _delete_ns(self, context, zone, ns_record):
ns_recordset = self.find_recordset(
context, criterion={'domain_id': zone['id'], 'type': "NS"})
for r in records:
if r.data == nameserver.value:
ns.records.remove(r)
for record in copy.deepcopy(ns_recordset.records):
if record.data == ns_record:
ns_recordset.records.remove(record)
self._update_recordset_in_storage(context, zone, ns)
self._update_recordset_in_storage(context, zone, ns_recordset)
# Quota Enforcement Methods
def _enforce_domain_quota(self, context, tenant_id):
@ -833,12 +813,9 @@ class Service(service.RPCService, service.Service):
# configured.
elevated_context = context.elevated()
elevated_context.all_tenants = True
nameservers = self.storage.find_pool_attributes(
context=elevated_context,
criterion={'pool_id': domain.pool_id, 'key': 'name_server'}
)
pool = self.storage.get_pool(elevated_context, domain.pool_id)
if len(nameservers) == 0:
if len(pool.ns_records) == 0:
LOG.critical(_LC('No nameservers configured. '
'Please create at least one nameserver'))
raise exceptions.NoServersConfigured()
@ -871,12 +848,12 @@ class Service(service.RPCService, service.Service):
domain.status = 'PENDING'
domain = self.storage.create_domain(context, domain)
nameservers = self.get_domain_servers(context, domain['id'])
pool_ns_records = self.get_domain_servers(context, domain['id'])
# Create the SOA and NS recordsets for the new domain. The SOA
# record will always be the first 'created_at' record for a domain.
self._create_soa(context, domain)
self._create_ns(context, domain, nameservers)
self._create_ns(context, domain, [n.hostname for n in pool_ns_records])
if domain.obj_attr_is_set('recordsets'):
for rrset in domain.recordsets:
@ -913,15 +890,14 @@ class Service(service.RPCService, service.Service):
policy.check('get_domain_servers', context, target)
nameservers = self.storage.find_pool_attributes(
context=context,
criterion={
'pool_id': pool_id,
'key': 'name_server'
}
)
# Need elevated context to get the pool
elevated_context = context.elevated()
elevated_context.all_tenants = True
return nameservers
# Get the pool for it's list of ns_records
pool = self.storage.get_pool(elevated_context, pool_id)
return pool.ns_records
def find_domains(self, context, criterion=None, marker=None, limit=None,
sort_key=None, sort_dir=None):
@ -2018,53 +1994,52 @@ class Service(service.RPCService, service.Service):
# If there is a nameserver, then additional steps need to be done
# Since these are treated as mutable objects, we're only going to
# be comparing the nameserver.value which is the FQDN
if pool.obj_attr_is_set('nameservers'):
if pool.obj_attr_is_set('ns_records'):
elevated_context = context.elevated()
elevated_context.all_tenants = True
# Get all existing nameserver FQDNs and put them in a set
existing_ns = \
set([ns.value for ns in self.storage.find_pool_attributes(
context=context,
criterion={'pool_id': pool.id, 'key': 'name_server'}
)])
# Get all nameserver FQDNs from the request and put them in a set
request_ns = set([ns.value for ns in pool.nameservers.objects])
# Get the new ones to be created
# TODO(kiall): ListObjects should be able to give you their
# original set of values.
original_pool = self.storage.get_pool(elevated_context, pool.id)
# Find the current NS hostnames
existing_ns = set([n.hostname for n in original_pool.ns_records])
# Find the desired NS hostnames
request_ns = set([n.hostname for n in pool.ns_records])
# Get the NS's to be created and deleted, ignoring the ones that
# are in both sets, as those haven't changed.
# TODO(kiall): Factor in priority
create_ns = request_ns.difference(existing_ns)
# Get the ones to be deleted
delete_ns = existing_ns.difference(request_ns)
# Ignore the ones that are in both sets, as those haven't changed
updated_pool = self.storage.update_pool(context, pool)
# After the update, handle new nameservers
# After the update, handle new ns_records
for ns in create_ns:
# Create new NS recordsets for every zone
zones = self.find_domains(
context=elevated_context,
criterion={'pool_id': pool.id})
for z in zones:
self._add_ns(elevated_context, z,
objects.PoolAttribute(value=ns))
# Then handle the nameservers to delete
for z in zones:
self._add_ns(elevated_context, z, ns)
# Then handle the ns_records to delete
for ns in delete_ns:
# Cannot delete the last nameserver, so verify that first.
nameservers = pool.nameservers
if len(nameservers) == 0:
if len(pool.ns_records) == 0:
raise exceptions.LastServerDeleteNotAllowed(
"Not allowed to delete last of servers"
)
# Delete the NS recordsets for every zone
# Delete the NS record for every zone
zones = self.find_domains(
context=elevated_context,
criterion={'pool_id': pool.id})
for z in zones:
self._delete_ns(elevated_context, z,
objects.PoolAttribute(value=ns))
self._delete_ns(elevated_context, z, ns)
return updated_pool

View File

@ -252,6 +252,10 @@ class DuplicateDomainAttribute(Duplicate):
error_type = 'duplicate_domain_attribute'
class DuplicatePoolNsRecord(Duplicate):
error_type = 'duplicate_pool_ns_record'
class MethodNotAllowed(Base):
expected = True
error_code = 405
@ -324,6 +328,10 @@ class PoolAttributeNotFound(NotFound):
error_type = 'pool_attribute_not_found'
class PoolNsRecordNotFound(NotFound):
error_type = 'pool_ns_record_not_found'
class ZoneTransferRequestNotFound(NotFound):
error_type = 'zone_transfer_request_not_found'

View File

@ -26,7 +26,7 @@ from designate.objects.pool_manager_status import PoolManagerStatus, PoolManager
from designate.objects.pool_server import PoolServer, PoolServerList # noqa
from designate.objects.pool import Pool, PoolList # noqa
from designate.objects.pool_attribute import PoolAttribute, PoolAttributeList # noqa
from designate.objects.nameserver import NameServer, NameServerList # noqa
from designate.objects.pool_ns_record import PoolNsRecord, PoolNsRecordList # noqa
from designate.objects.quota import Quota, QuotaList # noqa
from designate.objects.rrdata_a import RRData_A # noqa
from designate.objects.rrdata_aaaa import RRData_AAAA # noqa

View File

@ -1,28 +0,0 @@
# Copyright (c) 2014 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.objects import base
class NameServer(base.DictObjectMixin, base.PersistentObjectMixin,
base.DesignateObject):
FIELDS = {
'pool_id': {},
'key': {},
'value': {}
}
class NameServerList(base.ListObjectMixin, base.DesignateObject):
LIST_ITEM_TYPE = NameServer

View File

@ -55,9 +55,9 @@ class Pool(base.DictObjectMixin, base.PersistentObjectMixin,
'relation_cls': 'PoolAttributeList',
'required': True
},
'nameservers': {
'ns_records': {
'relation': True,
'relation_cls': 'NameServerList',
'relation_cls': 'PoolNsRecordList',
'required': True
},
}

View File

@ -0,0 +1,51 @@
# Copyright (c) 2014 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.objects import base
class PoolNsRecord(base.DictObjectMixin, base.PersistentObjectMixin,
base.DesignateObject):
FIELDS = {
'pool_id': {
'schema': {
'type': 'string',
'description': 'Pool identifier',
'format': 'uuid',
},
'required': True
},
'priority': {
'schema': {
'type': 'integer',
'description': 'NS Record Priority Order',
'minimum': 1,
'maximum': 10000
},
},
'hostname': {
'schema': {
'type': 'string',
'description': 'NS Record Hostname',
'format': 'domainname',
'maxLength': 255,
},
'immutable': True,
'required': True
}
}
class PoolNsRecordList(base.ListObjectMixin, base.DesignateObject):
LIST_ITEM_TYPE = PoolNsRecord

View File

@ -13,7 +13,7 @@
"pool": {
"type": "object",
"additionalProperties": false,
"required": ["name", "attributes", "nameservers"],
"required": ["name", "attributes", "ns_records"],
"properties": {
"id": {
@ -42,6 +42,7 @@
"attributes": {
"type": "object",
"description": "Pool attributes as name value pairs",
"additionalProperties": true,
"properties": {
"scope": {
"type": "string",
@ -49,14 +50,26 @@
}
}
},
"nameservers": {
"ns_records": {
"type": "array",
"description": "Nameservers that would go in the NS record",
"description": "List of NS Records for Zones in this pool",
"minItems": 1,
"items": {
"type": "string",
"format": "hostname",
"maxLength": 255
"type": "object",
"additionalProperties": false,
"required": ["priority", "hostname"],
"properties": {
"priority": {
"type": "integer",
"minimum": 0,
"maximum": 1000
},
"hostname": {
"type": "string",
"format": "hostname",
"maxLength": 255
}
}
}
},
"version": {

View File

@ -706,71 +706,70 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage):
# Pool methods
def _find_pools(self, context, criterion, one=False, marker=None,
limit=None, sort_key=None, sort_dir=None):
return self._find(context, tables.pools, objects.Pool,
objects.PoolList, exceptions.PoolNotFound,
criterion, one, marker, limit, sort_key,
sort_dir)
pools = self._find(context, tables.pools, objects.Pool,
objects.PoolList, exceptions.PoolNotFound,
criterion, one, marker, limit, sort_key,
sort_dir)
# Load Relations
def _load_relations(pool):
pool.attributes = self._find_pool_attributes(
context, {'pool_id': pool.id})
pool.ns_records = self._find_pool_ns_records(
context, {'pool_id': pool.id})
pool.obj_reset_changes(['attributes', 'ns_records'])
if one:
_load_relations(pools)
else:
for pool in pools:
_load_relations(pool)
return pools
def create_pool(self, context, pool):
pool = self._create(
tables.pools, pool, exceptions.DuplicatePool,
['attributes', 'nameservers'])
['attributes', 'ns_records'])
if pool.obj_attr_is_set('attributes'):
for pool_attribute in pool.attributes:
self.create_pool_attribute(context, pool.id, pool_attribute)
else:
pool.attributes = objects.PoolAttributeList()
pool.obj_reset_changes(['attributes'])
if pool.obj_attr_is_set('nameservers'):
for nameserver in pool.nameservers:
self.create_pool_attribute(context, pool.id, nameserver)
if pool.obj_attr_is_set('ns_records'):
for ns_record in pool.ns_records:
self.create_pool_ns_record(context, pool.id, ns_record)
else:
pool.nameservers = objects.NameServerList()
pool.obj_reset_changes(['nameservers'])
pool.ns_records = objects.PoolNsRecordList()
pool.obj_reset_changes(['attributes', 'ns_records'])
return pool
def get_pool(self, context, pool_id):
pool = self._find_pools(context, {'id': pool_id}, one=True)
pool.attributes = self._find_pool_attributes(
context, {'pool_id': pool_id, 'key': '!name_server'})
pool.nameservers = self._find_pool_attributes(
context, {'pool_id': pool_id, 'key': 'name_server'})
pool.obj_reset_changes(['attributes', 'nameservers'])
return pool
return self._find_pools(context, {'id': pool_id}, one=True)
def find_pools(self, context, criterion=None, marker=None,
limit=None, sort_key=None, sort_dir=None):
pools = self._find_pools(context, criterion, marker=marker,
return self._find_pools(context, criterion, marker=marker,
limit=limit, sort_key=sort_key,
sort_dir=sort_dir)
for pool in pools:
pool.attributes = self._find_pool_attributes(
context, {'pool_id': pool.id, 'key': '!name_server'})
pool.nameservers = self._find_pool_attributes(
context, {'pool_id': pool.id, 'key': 'name_server'})
pool.obj_reset_changes(['attributes', 'nameservers'])
return pools
def find_pool(self, context, criterion):
pool = self._find_pools(context, criterion, one=True)
pool.attributes = self._find_pool_attributes(
context, {'pool_id': pool.id, 'key': '!name_server'})
pool.nameservers = self._find_pool_attributes(
context, {'pool_id': pool.id, 'key': 'name_server'})
pool.obj_reset_changes(['attributes', 'nameservers'])
return pool
return self._find_pools(context, criterion, one=True)
def update_pool(self, context, pool):
pool = self._update(context, tables.pools, pool,
exceptions.DuplicatePool, exceptions.PoolNotFound,
['attributes', 'nameservers'])
if pool.obj_attr_is_set('attributes') or \
pool.obj_attr_is_set('nameservers'):
['attributes', 'ns_records'])
# TODO(kiall): These two sections below are near identical, we should
# refactor into a single reusable method.
if pool.obj_attr_is_set('attributes'):
# Gather the pool ID's we have
have_attributes = set([r.id for r in self._find_pool_attributes(
context, {'pool_id': pool.id})])
@ -784,9 +783,6 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage):
if pool.obj_attr_is_set('attributes'):
for r in pool.attributes.objects:
attributes.append(r)
if pool.obj_attr_is_set('nameservers'):
for r in pool.nameservers.objects:
attributes.append(r)
# Determine what to change
for attribute in attributes:
@ -801,7 +797,7 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage):
# NOTE: Since we're dealing with mutable objects, the return value
# of create/update/delete attribute is not needed. The
# original item will be mutated in place on the input
# "pool.attributes" or "pool.nameservers" list.
# "pool.attributes" list.
# Delete attributes
for attribute_id in have_attributes - keep_attributes:
@ -816,7 +812,50 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage):
self.create_pool_attribute(
context, pool.id, attribute)
# Call get_pool to get the ids of all the attributes/nameservers
if pool.obj_attr_is_set('ns_records'):
# Gather the pool ID's we have
have_ns_records = set([r.id for r in self._find_pool_ns_records(
context, {'pool_id': pool.id})])
# Prep some lists of changes
keep_ns_records = set([])
create_ns_records = []
update_ns_records = []
ns_records = []
if pool.obj_attr_is_set('ns_records'):
for r in pool.ns_records.objects:
ns_records.append(r)
# Determine what to change
for ns_record in ns_records:
keep_ns_records.add(ns_record.id)
try:
ns_record.obj_get_original_value('id')
except KeyError:
create_ns_records.append(ns_record)
else:
update_ns_records.append(ns_record)
# NOTE: Since we're dealing with mutable objects, the return value
# of create/update/delete ns_record is not needed. The
# original item will be mutated in place on the input
# "pool.ns_records" list.
# Delete ns_records
for ns_record_id in have_ns_records - keep_ns_records:
self.delete_pool_ns_record(context, ns_record_id)
# Update ns_records
for ns_record in update_ns_records:
self.update_pool_ns_record(context, ns_record)
# Create ns_records
for ns_record in create_ns_records:
self.create_pool_ns_record(
context, pool.id, ns_record)
# Call get_pool to get the ids of all the attributes/ns_records
# refreshed in the pool object
updated_pool = self.get_pool(context, pool.id)
@ -869,6 +908,47 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage):
return deleted_pool_attribute
# Pool ns_record methods
def _find_pool_ns_records(self, context, criterion, one=False, marker=None,
limit=None, sort_key=None, sort_dir=None):
return self._find(context, tables.pool_ns_records,
objects.PoolNsRecord, objects.PoolNsRecordList,
exceptions.PoolNsRecordNotFound, criterion, one,
marker, limit, sort_key, sort_dir)
def create_pool_ns_record(self, context, pool_id, pool_ns_record):
pool_ns_record.pool_id = pool_id
return self._create(tables.pool_ns_records, pool_ns_record,
exceptions.DuplicatePoolNsRecord)
def get_pool_ns_record(self, context, pool_ns_record_id):
return self._find_pool_ns_records(
context, {'id': pool_ns_record_id}, one=True)
def find_pool_ns_records(self, context, criterion=None, marker=None,
limit=None, sort_key=None, sort_dir=None):
return self._find_pool_ns_records(context, criterion, marker=marker,
limit=limit, sort_key=sort_key,
sort_dir=sort_dir)
def find_pool_ns_record(self, context, criterion):
return self._find_pool_ns_records(context, criterion, one=True)
def update_pool_ns_record(self, context, pool_ns_record):
return self._update(context, tables.pool_ns_records, pool_ns_record,
exceptions.DuplicatePoolNsRecord,
exceptions.PoolNsRecordNotFound)
def delete_pool_ns_record(self, context, pool_ns_record_id):
pool_ns_record = self._find_pool_ns_records(
context, {'id': pool_ns_record_id}, one=True)
deleted_pool_ns_record = self._delete(
context, tables.pool_ns_records, pool_ns_record,
exceptions.PoolNsRecordNotFound)
return deleted_pool_ns_record
# Zone Transfer Methods
def _find_zone_transfer_requests(self, context, criterion, one=False,
marker=None, limit=None, sort_key=None,

View File

@ -0,0 +1,120 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Integer, String, DateTime, ForeignKeyConstraint
from sqlalchemy.schema import Table, Column, MetaData
from sqlalchemy.sql import select
from oslo.config import cfg
from designate import utils
from designate.sqlalchemy.types import UUID
meta = MetaData()
# Get the default pool_id from the config file
default_pool_id = cfg.CONF['service:central'].default_pool_id.replace('-', '')
pool_ns_records_table = Table('pool_ns_records', meta,
Column('id', UUID(), default=utils.generate_uuid, primary_key=True),
Column('created_at', DateTime()),
Column('updated_at', DateTime()),
Column('version', Integer(), default=1, nullable=False),
Column('pool_id', UUID(), nullable=False),
Column('priority', Integer(), nullable=False),
Column('hostname', String(255), nullable=False),
ForeignKeyConstraint(['pool_id'], ['pools.id'], ondelete='CASCADE'),
mysql_engine='INNODB',
mysql_charset='utf8')
def upgrade(migrate_engine):
meta.bind = migrate_engine
# Load the pool_attributes_table table schema
pool_attributes_table = Table('pool_attributes', meta, autoload=True)
# Create the pool_ns_records DB table
pool_ns_records_table.create()
# Find the existing name server entries
pool_ns_records = select(
columns=[
pool_attributes_table.c.id,
pool_attributes_table.c.key,
pool_attributes_table.c.value,
pool_attributes_table.c.created_at,
pool_attributes_table.c.updated_at,
pool_attributes_table.c.version
]
).where(pool_attributes_table.c.key == 'name_server').execute().fetchall()
# Create matching entries in the new table.
for pool_ns_record in pool_ns_records:
pool_ns_records_table.insert().execute(
id=pool_ns_record.id,
created_at=pool_ns_record.created_at,
updated_at=pool_ns_record.updated_at,
version=pool_ns_record.version,
pool_id=default_pool_id,
priority=1,
hostname=pool_ns_record.value,
)
# Delete the old nameserver attr rows from the Database
pool_attributes_table.delete()\
.where(pool_attributes_table.c.key == 'name_server')\
.execute()
def downgrade(migrate_engine):
meta.bind = migrate_engine
# Load the pool_attributes and pool_ns_records table schema
pool_attributes_table = Table('pool_attributes', meta, autoload=True)
pool_ns_records_table = Table('pool_ns_records', meta, autoload=True)
# Find the nameservers for the default_pool_id
pool_ns_records = select(
columns=[
pool_ns_records_table.c.id,
pool_ns_records_table.c.created_at,
pool_ns_records_table.c.updated_at,
pool_ns_records_table.c.version,
pool_ns_records_table.c.hostname,
]
).where(pool_attributes_table.c.pool_id == default_pool_id)\
.execute().fetchall()
# Create matching entries in the new table.
for pool_ns_record in pool_ns_records:
pool_attributes_table.insert().execute(
id=pool_ns_record.id,
created_at=pool_ns_record.created_at,
updated_at=pool_ns_record.updated_at,
version=pool_ns_record.version,
key='name_server',
value=pool_ns_record.hostname,
)
# Delete the pool_ns_records table from the DB
pool_ns_records_table.drop()

View File

@ -249,6 +249,21 @@ pool_attributes = Table('pool_attributes', metadata,
mysql_charset='utf8'
)
pool_ns_records = Table('pool_ns_records', metadata,
Column('id', UUID(), default=utils.generate_uuid, primary_key=True),
Column('created_at', DateTime, default=lambda: timeutils.utcnow()),
Column('updated_at', DateTime, onupdate=lambda: timeutils.utcnow()),
Column('version', Integer(), default=1, nullable=False),
Column('pool_id', UUID(), nullable=False),
Column('priority', Integer(), nullable=False),
Column('hostname', String(255), nullable=False),
ForeignKeyConstraint(['pool_id'], ['pools.id'], ondelete='CASCADE'),
mysql_engine='INNODB',
mysql_charset='utf8')
zone_transfer_requests = Table('zone_transfer_requests', metadata,
Column('id', UUID, default=utils.generate_uuid, primary_key=True),
Column('version', Integer(), default=1, nullable=False),

View File

@ -190,24 +190,34 @@ class TestCase(base.BaseTestCase):
'pattern': 'blacklisted.org.'
}]
pool_fixtures = [
{'name': 'Pool-One',
'description': 'Pool-One description',
'attributes': [{'key': 'scope', 'value': 'public'}],
'ns_records': [{'priority': 0, 'hostname': 'ns1.example.org.'},
{'priority': 1, 'hostname': 'ns2.example.org.'}]},
{'name': 'Pool-Two',
'description': 'Pool-Two description',
'attributes': [{'key': 'scope', 'value': 'public'}],
'ns_records': [{'priority': 0, 'hostname': 'ns1.example.org.'}]},
]
pool_attribute_fixtures = [
{'scope': 'public'},
{'scope': 'private'},
{'scope': 'unknown'}
]
pool_attributes_fixtures = [
{'pool_id': default_pool_id,
'key': 'name_server',
'value': 'ns1.example.com.'},
'key': 'continent',
'value': 'NA'},
{'pool_id': default_pool_id,
'key': 'scope',
'value': 'public'}
]
pool_attribute_nameserver_fixtures = [
{'pool_id': default_pool_id,
'key': 'name_server',
'value': 'ns1.example.org'},
{'pool_id': default_pool_id,
'key': 'name_server',
'value': 'ns2.example.org'},
]
pool_manager_status_fixtures = [{
'server_id': '1d7a26e6-e604-4aa0-bbc5-d01081bf1f45',
'status': 'SUCCESS',
@ -220,29 +230,6 @@ class TestCase(base.BaseTestCase):
'action': 'DELETE'
}]
pool_fixtures = [
{'name': 'Pool-One',
'description': 'Pool-One description',
'attributes': [{'key': 'scope', 'value': 'public'}],
'nameservers': [{'key': 'name_server', 'value': 'ns1.example.org.'}]},
{'name': 'Pool-Two',
'description': 'Pool-Two description',
'attributes': [{'key': 'scope', 'value': 'public'}],
'nameservers': [{'key': 'name_server', 'value': 'ns1.example.org.'}]},
]
pool_attribute_fixtures = [
{'scope': 'public'},
{'scope': 'private'},
{'scope': 'unknown'}
]
name_server_fixtures = [
['examplens1.com.', 'examplens2.com.'],
['examplens1.org.', 'examplens2.org.']
]
zone_transfers_request_fixtures = [{
"description": "Test Transfer",
}, {
@ -315,6 +302,9 @@ class TestCase(base.BaseTestCase):
storage_driver = cfg.CONF['service:central'].storage_driver
self.storage = storage.get_storage(storage_driver)
# Setup the Default Pool with some useful settings
self._setup_default_pool()
def _setup_pool_manager_cache(self):
self.config(
@ -340,6 +330,17 @@ class TestCase(base.BaseTestCase):
connection_debug=connection_debug,
group='pool_manager_cache:sqlalchemy')
def _setup_default_pool(self):
# Fetch the default pool
pool = self.storage.get_pool(self.admin_context, default_pool_id)
# Add a NS record to it
pool.ns_records.append(
objects.PoolNsRecord(priority=0, hostname='ns1.example.org.'))
# Save the default pool
self.storage.update_pool(self.admin_context, pool)
# Config Methods
def config(self, **kwargs):
group = kwargs.pop('group', None)
@ -461,27 +462,6 @@ class TestCase(base.BaseTestCase):
_values.update(values)
return _values
def get_pool_attributes_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_attributes_fixtures[fixture])
_values.update(values)
return _values
def get_pool_attribute_nameserver_fixtures(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_attribute_nameserver_fixtures[fixture])
_values.update(values)
return _values
def get_pool_manager_status_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_manager_status_fixtures[fixture])
_values.update(values)
return _values
def get_pool_fixture(self, fixture=0, values=None):
values = values or {}
@ -496,11 +476,20 @@ class TestCase(base.BaseTestCase):
_values.update(values)
return _values
def get_nameserver_fixture(self, fixture=0, values=None):
if values:
_values = copy.copy(values)
else:
_values = copy.copy(self.name_server_fixtures[fixture])
def get_pool_attributes_fixture(self, fixture=0, values=None):
# TODO(kiall): Remove this method, in favor of the
# get_pool_attribute_fixture method above.
values = values or {}
_values = copy.copy(self.pool_attributes_fixtures[fixture])
_values.update(values)
return _values
def get_pool_manager_status_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_manager_status_fixtures[fixture])
_values.update(values)
return _values
def get_zone_transfer_request_fixture(self, fixture=0, values=None):
@ -517,33 +506,6 @@ class TestCase(base.BaseTestCase):
_values.update(values)
return _values
def create_nameserver(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_pool_attribute_nameserver_fixtures(fixture=fixture,
values=kwargs)
nameserver = objects.PoolAttribute.from_dict(values)
# Get the default pool
pool = self.central_service.get_pool(
self.admin_context, default_pool_id)
# Add the new PoolAttribute to the pool as a nameserver
pool.nameservers.append(nameserver)
# Update the pool
self.central_service.update_pool(self.admin_context, pool)
# Get the new PoolAttribute from the db in order to return it
created_nameserver = self.storage.find_pool_attribute(
context=context,
criterion=values
)
return created_nameserver
def create_tld(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
@ -583,18 +545,6 @@ class TestCase(base.BaseTestCase):
fixture = kwargs.pop('fixture', 0)
domain_type = kwargs.pop('type', None)
try:
# We always need a server to create a server
nameservers = self.storage.find_pool_attributes(
context=self.admin_context,
criterion={'pool_id': default_pool_id,
'key': 'name_server'}
)
if len(nameservers) == 0:
self.create_nameserver()
except exceptions.DuplicatePoolAttribute:
pass
values = self.get_domain_fixture(domain_type=domain_type,
fixture=fixture, values=kwargs)
@ -647,6 +597,21 @@ class TestCase(base.BaseTestCase):
return self.central_service.create_pool(
context, objects.Pool.from_dict(values))
def create_pool_attribute(self, **kwargs):
# TODO(kiall): This method should require a "pool" be passed in,
# rather than hardcoding the default pool ID into the
# fixture.
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_pool_attributes_fixture(fixture=fixture,
values=kwargs)
# TODO(kiall): We shouldn't be assuming the default_pool_id here
return self.storage.create_pool_attribute(
context, default_pool_id,
objects.PoolAttribute.from_dict(values))
def create_zone_transfer_request(self, domain, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
@ -680,18 +645,6 @@ class TestCase(base.BaseTestCase):
return self.central_service.create_zone_transfer_accept(
context, objects.ZoneTransferAccept.from_dict(values))
def create_pool_attribute(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_pool_attributes_fixture(fixture=fixture,
values=kwargs)
# TODO(kiall): We shouldn't be assuming the default_pool_id here
return self.storage.create_pool_attribute(
context, default_pool_id,
objects.PoolAttribute.from_dict(values))
def _ensure_interface(self, interface, implementation):
for name in interface.__abstractmethods__:
in_arginfo = inspect.getargspec(getattr(interface, name))

View File

@ -31,9 +31,6 @@ LOG = logging.getLogger(__name__)
class ApiV1DomainsTest(ApiV1Test):
def test_create_domain(self):
# Create a server
self.create_nameserver()
# Create a domain
fixture = self.get_domain_fixture(0)
@ -47,9 +44,6 @@ class ApiV1DomainsTest(ApiV1Test):
self.assertEqual(response.json['name'], fixture['name'])
def test_create_domain_junk(self):
# Create a server
self.create_nameserver()
# Create a domain
fixture = self.get_domain_fixture(0)
@ -59,15 +53,6 @@ class ApiV1DomainsTest(ApiV1Test):
# Ensure it fails with a 400
self.post('domains', data=fixture, status_code=400)
def test_create_domain_no_servers(self):
# Create a domain
fixture = self.get_domain_fixture(0)
# V1 doesn't have these
del fixture['type']
self.post('domains', data=fixture, status_code=500)
@patch.object(central_service.Service, 'create_domain',
side_effect=messaging.MessagingTimeout())
def test_create_domain_timeout(self, _):
@ -109,9 +94,6 @@ class ApiV1DomainsTest(ApiV1Test):
self.post('domains', data=fixture, status_code=400)
def test_create_domain_utf_description(self):
# Create a nameserver
self.create_nameserver()
# Create a domain
fixture = self.get_domain_fixture(0)
@ -125,9 +107,6 @@ class ApiV1DomainsTest(ApiV1Test):
self.post('domains', data=fixture)
def test_create_domain_description_too_long(self):
# Create a server
self.create_nameserver()
# Create a domain
fixture = self.get_domain_fixture(0)
fixture['description'] = "x" * 161
@ -136,12 +115,11 @@ class ApiV1DomainsTest(ApiV1Test):
self.post('domains', data=fixture, status_code=400)
def test_create_domain_with_unwanted_attributes(self):
# Create a server
domain_id = "2d1d1d1d-1324-4a80-aa32-1f69a91bf2c8"
created_at = datetime.datetime(2014, 6, 22, 21, 50, 0)
updated_at = datetime.datetime(2014, 6, 22, 21, 50, 0)
serial = 1234567
self.create_nameserver()
# Create a domain
fixture = self.get_domain_fixture(0)

View File

@ -15,6 +15,7 @@
# under the License.
from mock import patch
from oslo import messaging
from oslo_config import cfg
from oslo_log import log as logging
from designate import exceptions
@ -22,6 +23,10 @@ from designate import objects
from designate.central import service as central_service
from designate.tests.test_api.test_v1 import ApiV1Test
cfg.CONF.import_opt('default_pool_id',
'designate.central',
group='service:central')
default_pool_id = cfg.CONF['service:central'].default_pool_id
LOG = logging.getLogger(__name__)
@ -82,26 +87,39 @@ class ApiV1ServersTest(ApiV1Test):
self.post('servers', data=fixture, status_code=409)
def test_get_servers(self):
# Fetch the default pool
pool = self.storage.get_pool(self.admin_context, default_pool_id)
# Fetch the list of servers
response = self.get('servers')
self.assertIn('servers', response.json)
self.assertEqual(0, len(response.json['servers']))
self.assertEqual(len(pool.ns_records), len(response.json['servers']))
# Create the nameserver
self.create_nameserver()
# Add a new NS record to the pool
pool.ns_records.append(
objects.PoolNsRecord(priority=0, hostname='new-ns1.example.org.'))
# Save the pool to add a new nameserver
self.storage.update_pool(self.admin_context, pool)
# Fetch the list of servers
response = self.get('servers')
self.assertIn('servers', response.json)
self.assertEqual(len(pool.ns_records), len(response.json['servers']))
# Add a new NS record to the pool
pool.ns_records.append(
objects.PoolNsRecord(priority=0, hostname='new-ns2.example.org.'))
# Save the pool to add a new nameserver
self.storage.update_pool(self.admin_context, pool)
response = self.get('servers')
self.assertIn('servers', response.json)
self.assertEqual(1, len(response.json['servers']))
# Create a second server
self.create_nameserver(fixture=1)
response = self.get('servers')
self.assertIn('servers', response.json)
self.assertEqual(2, len(response.json['servers']))
self.assertEqual(len(pool.ns_records), len(response.json['servers']))
@patch.object(central_service.Service, 'get_pool',
side_effect=messaging.MessagingTimeout())
@ -109,29 +127,22 @@ class ApiV1ServersTest(ApiV1Test):
self.get('servers', status_code=504)
def test_get_server(self):
# Create a server
nameserver = self.create_nameserver()
# Fetch the default pool
pool = self.storage.get_pool(self.admin_context, default_pool_id)
response = self.get('servers/%s' % nameserver['id'])
# Fetch the Server from the pool
response = self.get('servers/%s' % pool.ns_records[0].id)
self.assertIn('id', response.json)
self.assertEqual(response.json['id'], nameserver['id'])
self.assertEqual(response.json['id'], pool.ns_records[0]['id'])
@patch.object(central_service.Service, 'get_pool',
side_effect=messaging.MessagingTimeout())
def test_get_server_timeout(self, _):
# # Create a server
# nameserver = self.create_nameserver()
# Fetch the default pool
pool = self.storage.get_pool(self.admin_context, default_pool_id)
fixture = self.get_server_fixture(0)
values = {
'key': 'name_server',
'value': fixture['name'],
'id': '2fdadfb1-cf96-4259-ac6b-bb7b6d2ff980'
}
nameserver = objects.PoolAttribute.from_dict(values)
self.get('servers/%s' % nameserver['id'], status_code=504)
self.get('servers/%s' % pool.ns_records[0].id, status_code=504)
def test_get_server_with_invalid_id(self):
self.get('servers/2fdadfb1-cf96-4259-ac6b-bb7b6d2ff98GH',
@ -142,18 +153,19 @@ class ApiV1ServersTest(ApiV1Test):
status_code=404)
def test_update_server(self):
# Create a server
server = self.create_nameserver()
# Fetch the default pool
pool = self.storage.get_pool(self.admin_context, default_pool_id)
data = {'name': 'test.example.org.'}
data = {'name': 'new-ns1.example.org.'}
response = self.put('servers/%s' % server['id'], data=data)
response = self.put('servers/%s' % pool.ns_records[0].id,
data=data)
self.assertIn('id', response.json)
self.assertEqual(response.json['id'], server['id'])
self.assertEqual(response.json['id'], pool.ns_records[0].id)
self.assertIn('name', response.json)
self.assertEqual(response.json['name'], 'test.example.org.')
self.assertEqual(response.json['name'], 'new-ns1.example.org.')
def test_update_server_missing(self):
data = {'name': 'test.example.org.'}
@ -161,29 +173,35 @@ class ApiV1ServersTest(ApiV1Test):
status_code=404)
def test_update_server_junk(self):
# Create a server
server = self.create_nameserver()
# Fetch the default pool
pool = self.storage.get_pool(self.admin_context, default_pool_id)
data = {'name': 'test.example.org.', 'junk': 'Junk Field'}
self.put('servers/%s' % server['id'], data=data, status_code=400)
self.put('servers/%s' % pool.ns_records[0].id, data=data,
status_code=400)
def test_delete_server(self):
# Create a server
server = self.create_nameserver()
# Fetch the default pool
pool = self.storage.get_pool(self.admin_context, default_pool_id)
# Create a second server so that we can delete the first
# because the last remaining server is not allowed to be deleted
server2 = self.create_nameserver(fixture=1)
# Add a new NS record to the pool
pool.ns_records.append(
objects.PoolNsRecord(priority=0, hostname='new-ns2.example.org.'))
# Save the pool to add a new nameserver
self.storage.update_pool(self.admin_context, pool)
# Now delete the server
self.delete('servers/%s' % server['id'])
self.delete('servers/%s' % pool.ns_records[1].id)
# Ensure we can no longer fetch the deleted server
self.get('servers/%s' % server['id'], status_code=404)
self.get('servers/%s' % pool.ns_records[1].id, status_code=404)
# Also, verify we cannot delete last remaining server
self.delete('servers/%s' % server2['id'], status_code=400)
self.delete('servers/%s' % pool.ns_records[0].id, status_code=400)
def test_delete_server_with_invalid_id(self):
self.delete('servers/9fdadfb1-cf96-4259-ac6b-bb7b6d2ff98GH',

View File

@ -44,8 +44,6 @@ class ApiV2ReverseFloatingIPTest(ApiV2TestCase):
self.assertEqual(None, fip_record['ptrdname'])
def test_get_floatingip_with_record(self):
self.create_nameserver()
fixture = self.get_ptr_fixture()
context = self.get_context(tenant='a')
@ -110,8 +108,6 @@ class ApiV2ReverseFloatingIPTest(ApiV2TestCase):
self.assertEqual(None, fip_record['description'])
def test_list_floatingip_with_record(self):
self.create_nameserver()
fixture = self.get_ptr_fixture()
context = self.get_context(tenant='a')
@ -138,7 +134,6 @@ class ApiV2ReverseFloatingIPTest(ApiV2TestCase):
self.assertEqual(fixture['ptrdname'], fip_record['ptrdname'])
def test_set_floatingip(self):
self.create_nameserver()
fixture = self.get_ptr_fixture()
fip = self.network_api.fake.allocate_floatingip('tenant')
@ -184,8 +179,6 @@ class ApiV2ReverseFloatingIPTest(ApiV2TestCase):
url, {})
def test_unset_floatingip(self):
self.create_nameserver()
fixture = self.get_ptr_fixture()
context = self.get_context(tenant='a')
elevated_context = context.elevated()
@ -229,8 +222,6 @@ class ApiV2ReverseFloatingIPTest(ApiV2TestCase):
self.assertEqual(None, fip['ptrdname'])
def test_unset_floatingip_not_allocated(self):
self.create_nameserver()
fixture = self.get_ptr_fixture()
context = self.get_context(tenant='a')

View File

@ -1,73 +0,0 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import patch
from oslo import messaging
from oslo_log import log as logging
from designate.central import service as central_service
from designate.tests.test_api.test_v2 import ApiV2TestCase
LOG = logging.getLogger(__name__)
class ApiV2NameServersTest(ApiV2TestCase):
def setUp(self):
super(ApiV2NameServersTest, self).setUp()
# Create a domain
self.domain = self.create_domain()
def test_get_nameservers(self):
url = '/zones/%s/nameservers' % self.domain['id']
response = self.client.get(url)
# Check the headers are what we expect
self.assertEqual(200, response.status_int)
self.assertEqual('application/json', response.content_type)
# Check the body structure is what we expect
self.assertIn('nameservers', response.json)
self.assertIn('links', response.json)
self.assertIn('self', response.json['links'])
# We should start with 0 nameservers
self.assertEqual(1, len(response.json['nameservers']))
servers = self.central_service.get_domain_servers(
self.admin_context, self.domain['id'])
self.assertEqual(servers[0]['id'],
response.json['nameservers'][0]['id'])
self.assertEqual(servers[0]['value'],
response.json['nameservers'][0]['name'])
self.create_nameserver(value='nsx.mydomain.com.')
response = self.client.get(url)
self.assertEqual(2, len(response.json['nameservers']))
def test_get_nameservers_invalid_id(self):
self._assert_invalid_uuid(self.client.get, '/zones/%s/nameservers')
@patch.object(central_service.Service, 'get_domain_servers',
side_effect=messaging.MessagingTimeout())
def test_get_nameservers_timeout(self, _):
url = '/zones/ba751950-6193-11e3-949a-0800200c9a66/nameservers'
self._assert_exception('timeout', 504, self.client.get, url)

View File

@ -20,6 +20,14 @@ from designate.tests.test_api.test_v2 import ApiV2TestCase
LOG = logging.getLogger(__name__)
def _attributes_to_api(attributes):
result = {}
for attribute in attributes:
result[attribute['key']] = attribute['value']
return result
class ApiV2PoolsTest(ApiV2TestCase):
def setUp(self):
super(ApiV2PoolsTest, self).setUp()
@ -31,8 +39,8 @@ class ApiV2PoolsTest(ApiV2TestCase):
def test_create_pool(self):
# Prepare a Pool fixture
fixture = self.get_pool_fixture(fixture=0)
fixture['attributes'] = self.get_pool_attribute_fixture(fixture=0)
fixture['nameservers'] = self.get_nameserver_fixture(fixture=0)
fixture['attributes'] = _attributes_to_api(fixture['attributes'])
response = self.client.post_json(
'/pools', {'pool': fixture})
@ -55,16 +63,18 @@ class ApiV2PoolsTest(ApiV2TestCase):
self.assertEqual(
response.json['pool']['attributes'], fixture['attributes'])
self.assertEqual(
response.json['pool']['nameservers'], fixture['nameservers'])
response.json['pool']['ns_records'], fixture['ns_records'])
def test_create_pool_validation(self):
# NOTE: The schemas should be tested separatly to the API. So we
# don't need to test every variation via the API itself.
# Fetch a fixture
fixture = self.get_pool_fixture(fixture=0)
# Set an invalid scope
fixture['attributes'] = self.get_pool_attribute_fixture(fixture=2)
fixture['nameservers'] = self.get_nameserver_fixture(fixture=0)
fixture['attributes'] = {
'scope': 'INVALID'
}
body = {'pool': fixture}
# Ensure it fails with a 400
@ -88,8 +98,8 @@ class ApiV2PoolsTest(ApiV2TestCase):
def test_create_pool_duplicate(self):
# Prepare a Pool fixture
fixture = self.get_pool_fixture(fixture=0)
fixture['attributes'] = self.get_pool_attribute_fixture(fixture=0)
fixture['nameservers'] = self.get_nameserver_fixture(fixture=0)
fixture['attributes'] = _attributes_to_api(fixture['attributes'])
body = {'pool': fixture}
response = self.client.post_json('/pools', body)
@ -158,10 +168,11 @@ class ApiV2PoolsTest(ApiV2TestCase):
attribute['value'],
response.json['pool']['attributes'][attribute['key']])
self.assertEqual(len(pool['nameservers']),
len(response.json['pool']['nameservers']))
self.assertEqual([r.value for r in pool['nameservers']],
response.json['pool']['nameservers'])
self.assertEqual(len(pool['ns_records']),
len(response.json['pool']['ns_records']))
self.assertEqual(
[n.hostname for n in pool['ns_records']],
[n['hostname'] for n in response.json['pool']['ns_records']])
def test_update_pool(self):
# Create a pool
@ -196,17 +207,21 @@ class ApiV2PoolsTest(ApiV2TestCase):
attribute['value'],
response.json['pool']['attributes'][attribute['key']])
self.assertEqual(len(pool['nameservers']),
len(response.json['pool']['nameservers']))
self.assertEqual([r.value for r in pool['nameservers']],
response.json['pool']['nameservers'])
self.assertEqual(len(pool['ns_records']),
len(response.json['pool']['ns_records']))
self.assertEqual(
[n.hostname for n in pool['ns_records']],
[n['hostname'] for n in response.json['pool']['ns_records']])
def test_update_pool_nameservers(self):
def test_update_pool_ns_records(self):
# Create a pool
pool = self.create_pool()
# Prepare an update body
body = {'pool': {'nameservers': ['newns1.com.', 'newns2.com.']}}
body = {'pool': {'ns_records': [
{'priority': 1, 'hostname': 'new-ns1.example.org.'},
{'priority': 2, 'hostname': 'new-ns2.example.org.'},
]}}
url = '/pools/%s' % pool['id']
response = self.client.patch_json(url, body, status=200)
@ -217,26 +232,14 @@ class ApiV2PoolsTest(ApiV2TestCase):
# Check the body structure is what we expect
self.assertIn('pool', response.json)
self.assertIn('id', response.json['pool'])
self.assertIn('links', response.json['pool'])
self.assertIn('self', response.json['pool']['links'])
self.assertEqual(2, len(response.json['pool']['nameservers']))
self.assertEqual(['newns1.com.', 'newns2.com.'],
response.json['pool']['nameservers'])
# Check the values returned are what we expect
self.assertIn('id', response.json['pool'])
self.assertIsNotNone(response.json['pool']['updated_at'])
# Check the rest of the values are unchanged
self.assertEqual(pool['name'], response.json['pool']['name'])
self.assertEqual(
pool['description'], response.json['pool']['description'])
self.assertEqual(len(pool['attributes']),
len(response.json['pool']['attributes']))
for attribute in pool['attributes']:
self.assertEqual(
attribute['value'],
response.json['pool']['attributes'][attribute['key']])
self.assertEqual(2, len(response.json['pool']['ns_records']))
self.assertEqual(['new-ns1.example.org.', 'new-ns2.example.org.'],
[n['hostname'] for n in
response.json['pool']['ns_records']])
def test_update_pool_attributes(self):
# Create a pool
@ -252,27 +255,11 @@ class ApiV2PoolsTest(ApiV2TestCase):
self.assertEqual(200, response.status_int)
self.assertEqual('application/json', response.content_type)
# Check the body structure is what we expect
self.assertIn('pool', response.json)
self.assertIn('links', response.json['pool'])
self.assertIn('self', response.json['pool']['links'])
# Check the values returned are what we expect
self.assertIn('id', response.json['pool'])
self.assertIsNotNone(response.json['pool']['updated_at'])
self.assertEqual(1, len(response.json['pool']['attributes']))
self.assertEqual('private',
response.json['pool']['attributes']['scope'])
# Check the rest of the values are unchanged
self.assertEqual(pool['name'], response.json['pool']['name'])
self.assertEqual(
pool['description'], response.json['pool']['description'])
self.assertEqual(len(pool['nameservers']),
len(response.json['pool']['nameservers']))
self.assertEqual([r.value for r in pool['nameservers']],
response.json['pool']['nameservers'])
def test_delete_pool(self):
pool = self.create_pool()
url = '/pools/%s' % pool['id']

View File

@ -31,9 +31,6 @@ class ApiV2ZonesTest(ApiV2TestCase):
def setUp(self):
super(ApiV2ZonesTest, self).setUp()
# Create a server
self.create_nameserver()
# Create the default TLDs
self.create_default_tlds()

View File

@ -408,9 +408,6 @@ class CentralServiceTest(CentralTestCase):
# Domain Tests
@mock.patch.object(notifier.Notifier, "info")
def _test_create_domain(self, values, mock_notifier):
# Create a server
self.create_nameserver()
# Reset the mock to avoid the calls from the create_nameserver() call
mock_notifier.reset_mock()
@ -426,6 +423,20 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(mock_notifier.call_count, 1)
# Ensure the correct NS Records are in place
pool = self.central_service.get_pool(
self.admin_context, domain.pool_id)
ns_recordset = self.central_service.find_recordset(
self.admin_context,
criterion={'domain_id': domain.id, 'type': "NS"})
self.assertIsNotNone(ns_recordset.id)
self.assertEqual(ns_recordset.type, 'NS')
self.assertIsNotNone(ns_recordset.records)
self.assertEqual(set([n.hostname for n in pool.ns_records]),
set([n.data for n in ns_recordset.records]))
mock_notifier.assert_called_once_with(
self.admin_context, 'dns.domain.create', domain)
@ -567,9 +578,6 @@ class CentralServiceTest(CentralTestCase):
email='info@blacklisted.com'
)
# Create a server
self.create_nameserver()
# Create a zone that is blacklisted
domain = self.central_service.create_domain(
self.admin_context, objects.Domain.from_dict(values))
@ -603,9 +611,6 @@ class CentralServiceTest(CentralTestCase):
self.admin_context, objects.Domain.from_dict(values))
def test_create_domain_invalid_tld_fail(self):
# Create a server
self.create_nameserver()
# add a tld for com
self.create_tld(fixture=0)
@ -638,9 +643,6 @@ class CentralServiceTest(CentralTestCase):
values = self.get_domain_fixture(fixture=1)
values['ttl'] = 0
# Create a server
self.create_nameserver()
with testtools.ExpectedException(exceptions.InvalidTTL):
self.central_service.create_domain(
context, objects.Domain.from_dict(values))
@ -652,9 +654,6 @@ class CentralServiceTest(CentralTestCase):
values = self.get_domain_fixture(fixture=1)
values['ttl'] = -100
# Create a server
self.create_nameserver()
# Create domain with random TTL
domain = self.central_service.create_domain(
self.admin_context, objects.Domain.from_dict(values))
@ -1886,8 +1885,6 @@ class CentralServiceTest(CentralTestCase):
self.central_service.count_records(self.get_context())
def test_get_floatingip_no_record(self):
self.create_nameserver()
context = self.get_context(tenant='a')
fip = self.network_api.fake.allocate_floatingip(context.tenant)
@ -1901,8 +1898,6 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(None, fip_ptr['ptrdname'])
def test_get_floatingip_with_record(self):
self.create_nameserver()
context = self.get_context(tenant='a')
fixture = self.get_ptr_fixture()
@ -1929,8 +1924,6 @@ class CentralServiceTest(CentralTestCase):
context, fip['region'], fip['id'])
def test_get_floatingip_deallocated_and_invalidate(self):
self.create_nameserver()
context_a = self.get_context(tenant='a')
elevated_a = context_a.elevated()
elevated_a.all_tenants = True
@ -2008,8 +2001,6 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(None, fips[0]['description'])
def test_list_floatingips_with_record(self):
self.create_nameserver()
context = self.get_context(tenant='a')
fixture = self.get_ptr_fixture()
@ -2029,8 +2020,6 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(fip_ptr['description'], fips[0]['description'])
def test_list_floatingips_deallocated_and_invalidate(self):
self.create_nameserver()
context_a = self.get_context(tenant='a')
elevated_a = context_a.elevated()
elevated_a.all_tenants = True
@ -2086,8 +2075,6 @@ class CentralServiceTest(CentralTestCase):
self.central_service.find_record(elevated_a, criterion)
def test_set_floatingip(self):
self.create_nameserver()
context = self.get_context(tenant='a')
fixture = self.get_ptr_fixture()
@ -2103,8 +2090,6 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNotNone(fip_ptr['ttl'])
def test_set_floatingip_removes_old_record(self):
self.create_nameserver()
context_a = self.get_context(tenant='a')
elevated_a = context_a.elevated()
elevated_a.all_tenants = True
@ -2176,8 +2161,6 @@ class CentralServiceTest(CentralTestCase):
context, fip['region'], fip['id'], fixture)
def test_unset_floatingip(self):
self.create_nameserver()
context = self.get_context(tenant='a')
fixture = self.get_ptr_fixture()
@ -2349,68 +2332,6 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(int(soa_record_values[2]), updated_zone['serial'])
# NS Recordset tests
def test_create_ns(self):
# Anytime a zone is created, an NS recordset should be
# automatically be created, with a record for each server
# Create a nameserver
nameserver = self.create_nameserver(value='ns1.example.org.')
# Create a zone
zone = self.create_domain(name='example3.org.')
# Make sure an NS recordset was created
ns = self.central_service.find_recordset(self.admin_context,
criterion={'domain_id': zone['id'],
'type': "NS"})
# Ensure all values have been set correctly
self.assertIsNotNone(ns.id)
self.assertEqual('NS', ns.type)
self.assertIsNotNone(ns.records)
self.assertEqual(ns.records[0].data, nameserver.value)
def test_add_ns(self):
# Anytime a server is created, the NS recordset for each zone
# should be automatically updated to contain the new server
# Create a server
nameserver1 = self.create_nameserver(value='ns1.example.net.')
# Create a zone
zone = self.create_domain(name='example3.net.')
original_serial = zone.serial
# Make sure an NS recordset was created
ns_rs = self.central_service.find_recordset(
self.admin_context,
criterion={'domain_id': zone['id'], 'type': "NS"})
# Ensure all values have been set correctly
self.assertIsNotNone(ns_rs.id)
self.assertEqual('NS', ns_rs.type)
self.assertIsNotNone(ns_rs.records)
self.assertEqual(ns_rs.records[0].data, nameserver1.value)
# Create another server
nameserver2 = self.create_nameserver(value='ns2.example.net.')
# Get the NS recordset again
ns_rs = self.central_service.find_recordset(
self.admin_context,
criterion={'domain_id': zone['id'], 'type': "NS"})
# Get zone again to check serial number
updated_zone = self.central_service.get_domain(self.admin_context,
zone.id)
new_serial = updated_zone.serial
# Ensure another record was added to the recordset
self.assertEqual(ns_rs.records[0].data, nameserver1.value)
self.assertEqual(ns_rs.records[1].data, nameserver2.value)
self.assertThat(new_serial, GreaterThan(original_serial))
# Pool Tests
def test_create_pool(self):
# Get the values
@ -2426,21 +2347,21 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNotNone(pool['tenant_id'])
self.assertIsNone(pool['updated_at'])
self.assertIsNotNone(pool['attributes'])
self.assertIsNotNone(pool['nameservers'])
self.assertIsNotNone(pool['ns_records'])
self.assertEqual(pool['name'], values['name'])
# Compare the actual values of attributes and nameservers
# Compare the actual values of attributes and ns_records
for k in range(0, len(values['attributes'])):
self.assertDictContainsSubset(
values['attributes'][k],
pool['attributes'][k].to_primitive()['designate_object.data']
)
for k in range(0, len(values['nameservers'])):
for k in range(0, len(values['ns_records'])):
self.assertDictContainsSubset(
values['nameservers'][k],
pool['nameservers'][k].to_primitive()['designate_object.data'])
values['ns_records'][k],
pool['ns_records'][k].to_primitive()['designate_object.data'])
def test_get_pool(self):
# Create a server pool
@ -2456,17 +2377,17 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(pool['tenant_id'], expected['tenant_id'])
self.assertEqual(pool['name'], expected['name'])
# Compare the actual values of attributes and nameservers
# Compare the actual values of attributes and ns_records
for k in range(0, len(expected['attributes'])):
self.assertEqual(
pool['attributes'][k].to_primitive()['designate_object.data'],
expected['attributes'][k].to_primitive()
['designate_object.data'])
for k in range(0, len(expected['nameservers'])):
for k in range(0, len(expected['ns_records'])):
self.assertEqual(
pool['nameservers'][k].to_primitive()['designate_object.data'],
expected['nameservers'][k].to_primitive()
pool['ns_records'][k].to_primitive()['designate_object.data'],
expected['ns_records'][k].to_primitive()
['designate_object.data'])
def test_find_pools(self):
@ -2485,18 +2406,18 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(len(pools), 2)
self.assertEqual(pools[1]['name'], values['name'])
# Compare the actual values of attributes and nameservers
# Compare the actual values of attributes and ns_records
expected_attributes = values['attributes'][0]
actual_attributes = \
pools[1]['attributes'][0].to_primitive()['designate_object.data']
for k in expected_attributes:
self.assertEqual(actual_attributes[k], expected_attributes[k])
expected_nameservers = values['nameservers'][0]
actual_nameservers = \
pools[1]['nameservers'][0].to_primitive()['designate_object.data']
for k in expected_nameservers:
self.assertEqual(actual_nameservers[k], expected_nameservers[k])
expected_ns_records = values['ns_records'][0]
actual_ns_records = \
pools[1]['ns_records'][0].to_primitive()['designate_object.data']
for k in expected_ns_records:
self.assertEqual(actual_ns_records[k], expected_ns_records[k])
def test_find_pool(self):
# Create a server pool
@ -2508,60 +2429,91 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(pool['name'], expected['name'])
# Compare the actual values of attributes and nameservers
# Compare the actual values of attributes and ns_records
for k in range(0, len(expected['attributes'])):
self.assertEqual(
pool['attributes'][k].to_primitive()['designate_object.data'],
expected['attributes'][k].to_primitive()
['designate_object.data'])
for k in range(0, len(expected['nameservers'])):
for k in range(0, len(expected['ns_records'])):
self.assertEqual(
pool['nameservers'][k].to_primitive()['designate_object.data'],
expected['nameservers'][k].to_primitive()
pool['ns_records'][k].to_primitive()['designate_object.data'],
expected['ns_records'][k].to_primitive()
['designate_object.data'])
def test_update_pool(self):
# Create a server pool
pool = self.create_pool(fixture=0)
# Update the pool
# Update and save the pool
pool.description = 'New Comment'
attribute_values = self.get_pool_attribute_fixture(fixture=1)
pool_attributes = pool.attributes = objects.PoolAttributeList(
objects=[objects.PoolAttribute(key=r, value=attribute_values[r])
for r in attribute_values])
self.central_service.update_pool(self.admin_context, pool)
nameserver_values = self.get_nameserver_fixture(fixture=1)
pool_nameservers = pool.nameservers = objects.NameServerList(
objects=[objects.NameServer(key='name_server', value=r)
for r in nameserver_values])
# Update pool
pool = self.central_service.update_pool(self.admin_context, pool)
# GET the pool
# Fetch the pool
pool = self.central_service.get_pool(self.admin_context, pool.id)
# Verify that the pool was updated correctly
self.assertEqual("New Comment", pool.description)
# Compare the actual values of attributes and nameservers
for k in range(0, len(pool_attributes)):
actual_attributes = \
pool['attributes'][0].to_primitive()['designate_object.data']
expected_attributes = \
pool_attributes[0].to_primitive()['designate_object.data']
self.assertDictContainsSubset(
expected_attributes, actual_attributes)
def test_update_pool_add_ns_record(self):
# Create a server pool and domain
pool = self.create_pool(fixture=0)
domain = self.create_domain(pool_id=pool.id)
for k in range(0, len(pool_nameservers)):
actual_nameservers = \
pool['nameservers'][k].to_primitive()['designate_object.data']
expected_nameservers = \
pool_nameservers[k].to_primitive()['designate_object.data']
self.assertDictContainsSubset(
expected_nameservers, actual_nameservers)
ns_record_count = len(pool.ns_records)
new_ns_record = objects.PoolNsRecord(
priority=10,
hostname='ns-new.example.org.')
# Update and save the pool
pool.ns_records.append(new_ns_record)
self.central_service.update_pool(self.admin_context, pool)
# Fetch the pool
pool = self.central_service.get_pool(self.admin_context, pool.id)
# Verify that the pool was updated correctly
self.assertEqual(ns_record_count + 1, len(pool.ns_records))
self.assertIn(new_ns_record.hostname,
[n.hostname for n in pool.ns_records])
# Fetch the domains NS recordset
ns_recordset = self.central_service.find_recordset(
self.admin_context,
criterion={'domain_id': domain.id, 'type': "NS"})
# Verify that the doamins NS records ware updated correctly
self.assertEqual(set([n.hostname for n in pool.ns_records]),
set([n.data for n in ns_recordset.records]))
def test_update_pool_remove_ns_record(self):
# Create a server pool and domain
pool = self.create_pool(fixture=0)
domain = self.create_domain(pool_id=pool.id)
ns_record_count = len(pool.ns_records)
# Update and save the pool
removed_ns_record = pool.ns_records.pop(-1)
self.central_service.update_pool(self.admin_context, pool)
# Fetch the pool
pool = self.central_service.get_pool(self.admin_context, pool.id)
# Verify that the pool was updated correctly
self.assertEqual(ns_record_count - 1, len(pool.ns_records))
self.assertNotIn(removed_ns_record.hostname,
[n.hostname for n in pool.ns_records])
# Fetch the domains NS recordset
ns_recordset = self.central_service.find_recordset(
self.admin_context,
criterion={'domain_id': domain.id, 'type': "NS"})
# Verify that the doamins NS records ware updated correctly
self.assertEqual(set([n.hostname for n in pool.ns_records]),
set([n.data for n in ns_recordset.records]))
def test_delete_pool(self):
# Create a server pool

View File

@ -2179,8 +2179,8 @@ class StorageTestCase(object):
def test_create_pool_attribute(self):
values = {
'pool_id': "d5d10661-0312-4ae1-8664-31188a4310b7",
'key': "name_server",
'value': 'ns1.example.org.'
'key': "test-attribute",
'value': 'test-value'
}
result = self.storage.create_pool_attribute(