use dns api def from neutron-lib

neutron-lib contains the dns API definition and associated exceptions,
constants, etc. This patch moves all references over to use the API
def from neutron-lib.

NeutronLibImpact

Change-Id: If180cf92d8ae31a0857080239e8233095cd6c768
This commit is contained in:
Boden R 2017-07-26 15:47:18 -06:00
parent c0fe9e34a1
commit 72b6db9379
15 changed files with 156 additions and 393 deletions

View File

@ -75,9 +75,6 @@ LOG = logging.getLogger(__name__)
# IP allocations being cleaned up by cascade.
AUTO_DELETE_PORT_OWNERS = [constants.DEVICE_OWNER_DHCP]
DNS_DOMAIN_DEFAULT = 'openstacklocal.'
FQDN_MAX_LEN = 255
def _check_subnet_not_used(context, subnet_id):
try:

View File

@ -13,15 +13,16 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import dns as dns_apidef
from neutron_lib.api import validators
from neutron_lib import exceptions as n_exc
from neutron_lib.exceptions import dns as dns_exc
from oslo_config import cfg
from oslo_log import log as logging
from neutron._i18n import _
from neutron.common import utils
from neutron.db import _resource_extend as resource_extend
from neutron.extensions import dns
from neutron.extensions import l3
from neutron.objects import floatingip as fip_obj
from neutron.objects import network
@ -61,7 +62,7 @@ class DNSDbMixin(object):
except ImportError:
LOG.exception("ImportError exception occurred while loading "
"the external DNS service driver")
raise dns.ExternalDNSDriverNotFound(
raise dns_exc.ExternalDNSDriverNotFound(
driver=cfg.CONF.external_dns_driver)
@staticmethod
@ -77,13 +78,13 @@ class DNSDbMixin(object):
def _process_dns_floatingip_create_precommit(self, context,
floatingip_data, req_data):
# expects to be called within a plugin's session
dns_domain = req_data.get(dns.DNSDOMAIN)
dns_domain = req_data.get(dns_apidef.DNSDOMAIN)
if not validators.is_attr_set(dns_domain):
return
if not self.dns_driver:
return
dns_name = req_data[dns.DNSNAME]
dns_name = req_data[dns_apidef.DNSNAME]
self._validate_floatingip_dns(dns_name, dns_domain)
current_dns_name, current_dns_domain = (
@ -93,8 +94,8 @@ class DNSDbMixin(object):
if current_dns_name and current_dns_domain:
fip_obj.FloatingIPDNS(context,
floatingip_id=floatingip_data['id'],
dns_name=req_data[dns.DNSNAME],
dns_domain=req_data[dns.DNSDOMAIN],
dns_name=req_data[dns_apidef.DNSNAME],
dns_domain=req_data[dns_apidef.DNSDOMAIN],
published_dns_name=current_dns_name,
published_dns_domain=current_dns_domain).create()
dns_actions_data = DNSActionsData(
@ -118,7 +119,7 @@ class DNSDbMixin(object):
floatingip_data):
# expects to be called within a plugin's session
if not utils.is_extension_supported(self._core_plugin,
dns.Dns.get_alias()):
dns_apidef.ALIAS):
return
if not self.dns_driver:
return
@ -175,7 +176,7 @@ class DNSDbMixin(object):
def _process_dns_floatingip_delete(self, context, floatingip_data):
if not utils.is_extension_supported(self._core_plugin,
dns.Dns.get_alias()):
dns_apidef.ALIAS):
return
dns_data_db = fip_obj.FloatingIPDNS.get_object(context,
floatingip_id=floatingip_data['id'])
@ -209,7 +210,7 @@ class DNSDbMixin(object):
try:
self.dns_driver.delete_record_set(context, dns_domain, dns_name,
records)
except (dns.DNSDomainNotFound, dns.DuplicateRecordSet) as e:
except (dns_exc.DNSDomainNotFound, dns_exc.DuplicateRecordSet) as e:
LOG.exception("Error deleting Floating IP data from external "
"DNS service. Name: '%(name)s'. Domain: "
"'%(domain)s'. IP addresses '%(ips)s'. DNS "
@ -222,9 +223,9 @@ class DNSDbMixin(object):
def _get_requested_state_for_external_dns_service_create(self, context,
floatingip_data,
req_data):
fip_dns_name = req_data[dns.DNSNAME]
fip_dns_name = req_data[dns_apidef.DNSNAME]
if fip_dns_name:
return fip_dns_name, req_data[dns.DNSDOMAIN]
return fip_dns_name, req_data[dns_apidef.DNSDOMAIN]
if floatingip_data['port_id']:
return self._get_internal_port_dns_data(context, floatingip_data)
return None, None
@ -240,7 +241,7 @@ class DNSDbMixin(object):
try:
self.dns_driver.create_record_set(context, dns_domain, dns_name,
records)
except (dns.DNSDomainNotFound, dns.DuplicateRecordSet) as e:
except (dns_exc.DNSDomainNotFound, dns_exc.DuplicateRecordSet) as e:
LOG.exception("Error publishing floating IP data in external "
"DNS service. Name: '%(name)s'. Domain: "
"'%(domain)s'. DNS service driver message "

View File

@ -22,11 +22,10 @@ Create Date: 2015-08-23 00:22:47.618593
"""
from alembic import op
from neutron_lib.db import constants
import sqlalchemy as sa
from neutron.db import migration
from neutron.extensions import dns
# revision identifiers, used by Alembic.
revision = '34af2b5c5a59'
@ -39,5 +38,5 @@ neutron_milestone = [migration.LIBERTY]
def upgrade():
op.add_column('ports',
sa.Column('dns_name',
sa.String(length=dns.FQDN_MAX_LEN),
sa.String(length=constants.FQDN_FIELD_SIZE),
nullable=True))

View File

@ -26,10 +26,9 @@ revision = '659bf3d90664'
down_revision = 'c3a73f615e4'
from alembic import op
from neutron_lib.db import constants
import sqlalchemy as sa
from neutron.extensions import dns
def upgrade():
op.create_table('networkdnsdomains',
@ -37,7 +36,8 @@ def upgrade():
sa.String(length=36),
nullable=False,
index=True),
sa.Column('dns_domain', sa.String(length=dns.FQDN_MAX_LEN),
sa.Column('dns_domain', sa.String(
length=constants.FQDN_FIELD_SIZE),
nullable=False),
sa.ForeignKeyConstraint(['network_id'],
['networks.id'],
@ -49,15 +49,17 @@ def upgrade():
sa.String(length=36),
nullable=False,
index=True),
sa.Column('dns_name', sa.String(length=dns.FQDN_MAX_LEN),
sa.Column('dns_name', sa.String(
length=constants.FQDN_FIELD_SIZE),
nullable=False),
sa.Column('dns_domain', sa.String(length=dns.FQDN_MAX_LEN),
sa.Column('dns_domain', sa.String(
length=constants.FQDN_FIELD_SIZE),
nullable=False),
sa.Column('published_dns_name',
sa.String(length=dns.FQDN_MAX_LEN),
sa.String(length=constants.FQDN_FIELD_SIZE),
nullable=False),
sa.Column('published_dns_domain',
sa.String(length=dns.FQDN_MAX_LEN),
sa.String(length=constants.FQDN_FIELD_SIZE),
nullable=False),
sa.ForeignKeyConstraint(['floatingip_id'],
['floatingips.id'],
@ -70,16 +72,16 @@ def upgrade():
nullable=False,
index=True),
sa.Column('current_dns_name',
sa.String(length=dns.FQDN_MAX_LEN),
sa.String(length=constants.FQDN_FIELD_SIZE),
nullable=False),
sa.Column('current_dns_domain',
sa.String(length=dns.FQDN_MAX_LEN),
sa.String(length=constants.FQDN_FIELD_SIZE),
nullable=False),
sa.Column('previous_dns_name',
sa.String(length=dns.FQDN_MAX_LEN),
sa.String(length=constants.FQDN_FIELD_SIZE),
nullable=False),
sa.Column('previous_dns_domain',
sa.String(length=dns.FQDN_MAX_LEN),
sa.String(length=constants.FQDN_FIELD_SIZE),
nullable=False),
sa.ForeignKeyConstraint(['port_id'],
['ports.id'],

View File

@ -21,14 +21,15 @@ down_revision = 'b67e765a3524'
depends_on = ('a963b38d82f4',)
from alembic import op
from neutron.extensions import dns
from neutron_lib.db import constants
import sqlalchemy as sa
ports = sa.Table(
'ports', sa.MetaData(),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('dns_name', sa.String(length=dns.FQDN_MAX_LEN), nullable=True))
sa.Column('dns_name', sa.String(length=constants.FQDN_FIELD_SIZE),
nullable=True))
portdnses = sa.Table('portdnses', sa.MetaData(),

View File

@ -26,14 +26,13 @@ revision = '349b6fd605a6'
down_revision = 'c8c222d42aa9'
from alembic import op
from neutron_lib.db import constants
import sqlalchemy as sa
from neutron.extensions import dns
def upgrade():
op.add_column('portdnses',
sa.Column('dns_domain',
sa.String(length=dns.FQDN_MAX_LEN),
sa.String(length=constants.FQDN_FIELD_SIZE),
nullable=False,
server_default=''))

View File

@ -10,13 +10,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.db import constants
from neutron_lib.db import model_base
import sqlalchemy as sa
from sqlalchemy import orm
from neutron.db.models import l3 as l3_models
from neutron.db import models_v2
from neutron.extensions import dns
class NetworkDNSDomain(model_base.BASEV2):
@ -85,7 +85,7 @@ class PortDNS(model_base.BASEV2):
previous_dns_domain = sa.Column(sa.String(255),
nullable=False)
dns_name = sa.Column(sa.String(255), nullable=False)
dns_domain = sa.Column(sa.String(dns.FQDN_MAX_LEN),
dns_domain = sa.Column(sa.String(constants.FQDN_FIELD_SIZE),
nullable=False,
server_default='')
# Add a relationship to the Port model in order to instruct

View File

@ -13,253 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import re
from neutron_lib.api.definitions import network as net_def
from neutron_lib.api.definitions import dns as dns_apidef
from neutron_lib.api import extensions
from neutron_lib.api import validators
from neutron_lib import exceptions as n_exc
from oslo_config import cfg
import six
from neutron._i18n import _
from neutron.extensions import l3
DNS_LABEL_MAX_LEN = 63
DNS_LABEL_REGEX = "[a-z0-9-]{1,%d}$" % DNS_LABEL_MAX_LEN
FQDN_MAX_LEN = 255
DNS_DOMAIN_DEFAULT = 'openstacklocal.'
class DNSDomainNotFound(n_exc.NotFound):
message = _("Domain %(dns_domain)s not found in the external DNS service")
class DuplicateRecordSet(n_exc.Conflict):
message = _("Name %(dns_name)s is duplicated in the external DNS service")
class ExternalDNSDriverNotFound(n_exc.NotFound):
message = _("External DNS driver %(driver)s could not be found.")
class InvalidPTRZoneConfiguration(n_exc.Conflict):
message = _("Value of %(parameter)s has to be multiple of %(number)s, "
"with maximum value of %(maximum)s and minimum value of "
"%(minimum)s")
def _validate_dns_name(data, max_len=FQDN_MAX_LEN):
msg = _validate_dns_format(data, max_len)
if msg:
return msg
request_dns_name = _get_request_dns_name(data)
if request_dns_name:
msg = _validate_dns_name_with_dns_domain(request_dns_name)
if msg:
return msg
def _validate_fip_dns_name(data, max_len=FQDN_MAX_LEN):
msg = validators.validate_string(data)
if msg:
return msg
if not data:
return
if data.endswith('.'):
msg = _("'%s' is a FQDN. It should be a relative domain name") % data
return msg
msg = _validate_dns_format(data, max_len)
if msg:
return msg
length = len(data)
if length > max_len - 3:
msg = _("'%(data)s' contains '%(length)s' characters. Adding a "
"domain name will cause it to exceed the maximum length "
"of a FQDN of '%(max_len)s'") % {"data": data,
"length": length,
"max_len": max_len}
return msg
def _validate_dns_domain(data, max_len=FQDN_MAX_LEN):
msg = validators.validate_string(data)
if msg:
return msg
if not data:
return
if not data.endswith('.'):
msg = _("'%s' is not a FQDN") % data
return msg
msg = _validate_dns_format(data, max_len)
if msg:
return msg
length = len(data)
if length > max_len - 2:
msg = _("'%(data)s' contains '%(length)s' characters. Adding a "
"sub-domain will cause it to exceed the maximum length of a "
"FQDN of '%(max_len)s'") % {"data": data,
"length": length,
"max_len": max_len}
return msg
def _validate_dns_format(data, max_len=FQDN_MAX_LEN):
# NOTE: An individual name regex instead of an entire FQDN was used
# because its easier to make correct. The logic should validate that the
# dns_name matches RFC 1123 (section 2.1) and RFC 952.
if not data:
return
try:
# Trailing periods are allowed to indicate that a name is fully
# qualified per RFC 1034 (page 7).
trimmed = data if not data.endswith('.') else data[:-1]
if len(trimmed) > max_len:
raise TypeError(
_("'%(trimmed)s' exceeds the %(maxlen)s character FQDN "
"limit") % {'trimmed': trimmed, 'maxlen': max_len})
names = trimmed.split('.')
for name in names:
if not name:
raise TypeError(_("Encountered an empty component."))
if name.endswith('-') or name[0] == '-':
raise TypeError(
_("Name '%s' must not start or end with a hyphen.") % name)
if not re.match(DNS_LABEL_REGEX, name):
raise TypeError(
_("Name '%s' must be 1-63 characters long, each of "
"which can only be alphanumeric or a hyphen.") % name)
# RFC 1123 hints that a TLD can't be all numeric. last is a TLD if
# it's an FQDN.
if len(names) > 1 and re.match("^[0-9]+$", names[-1]):
raise TypeError(_("TLD '%s' must not be all numeric") % names[-1])
except TypeError as e:
msg = _("'%(data)s' not a valid PQDN or FQDN. Reason: %(reason)s") % {
'data': data, 'reason': str(e)}
return msg
def _validate_dns_name_with_dns_domain(request_dns_name):
# If a PQDN was passed, make sure the FQDN that will be generated is of
# legal size
dns_domain = _get_dns_domain()
higher_labels = dns_domain
if dns_domain:
higher_labels = '.%s' % dns_domain
higher_labels_len = len(higher_labels)
dns_name_len = len(request_dns_name)
if not request_dns_name.endswith('.'):
if dns_name_len + higher_labels_len > FQDN_MAX_LEN:
msg = _("The dns_name passed is a PQDN and its size is "
"'%(dns_name_len)s'. The dns_domain option in "
"neutron.conf is set to %(dns_domain)s, with a "
"length of '%(higher_labels_len)s'. When the two are "
"concatenated to form a FQDN (with a '.' at the end), "
"the resulting length exceeds the maximum size "
"of '%(fqdn_max_len)s'"
) % {'dns_name_len': dns_name_len,
'dns_domain': cfg.CONF.dns_domain,
'higher_labels_len': higher_labels_len,
'fqdn_max_len': FQDN_MAX_LEN}
return msg
return
# A FQDN was passed
if (dns_name_len <= higher_labels_len or not
request_dns_name.endswith(higher_labels)):
msg = _("The dns_name passed is a FQDN. Its higher level labels "
"must be equal to the dns_domain option in neutron.conf, "
"that has been set to '%(dns_domain)s'. It must also "
"include one or more valid DNS labels to the left "
"of '%(dns_domain)s'") % {'dns_domain':
cfg.CONF.dns_domain}
return msg
def _get_dns_domain():
if not cfg.CONF.dns_domain:
return ''
if cfg.CONF.dns_domain.endswith('.'):
return cfg.CONF.dns_domain
return '%s.' % cfg.CONF.dns_domain
def _get_request_dns_name(data):
dns_domain = _get_dns_domain()
if ((dns_domain and dns_domain != DNS_DOMAIN_DEFAULT)):
return data
return ''
def convert_to_lowercase(data):
if isinstance(data, six.string_types):
return data.lower()
msg = _("'%s' cannot be converted to lowercase string") % data
raise n_exc.InvalidInput(error_message=msg)
validators.add_validator('dns_name', _validate_dns_name)
validators.add_validator('fip_dns_name', _validate_fip_dns_name)
validators.add_validator('dns_domain', _validate_dns_domain)
DNSNAME = 'dns_name'
DNSDOMAIN = 'dns_domain'
DNSASSIGNMENT = 'dns_assignment'
EXTENDED_ATTRIBUTES_2_0 = {
'ports': {
DNSNAME: {'allow_post': True, 'allow_put': True,
'default': '',
'convert_to': convert_to_lowercase,
'validate': {'type:dns_name': FQDN_MAX_LEN},
'is_visible': True},
DNSASSIGNMENT: {'allow_post': False, 'allow_put': False,
'is_visible': True},
},
l3.FLOATINGIPS: {
DNSNAME: {'allow_post': True, 'allow_put': False,
'default': '',
'convert_to': convert_to_lowercase,
'validate': {'type:fip_dns_name': FQDN_MAX_LEN},
'is_visible': True},
DNSDOMAIN: {'allow_post': True, 'allow_put': False,
'default': '',
'convert_to': convert_to_lowercase,
'validate': {'type:dns_domain': FQDN_MAX_LEN},
'is_visible': True},
},
net_def.COLLECTION_NAME: {
DNSDOMAIN: {'allow_post': True, 'allow_put': True,
'default': '',
'convert_to': convert_to_lowercase,
'validate': {'type:dns_domain': FQDN_MAX_LEN},
'is_visible': True},
},
}
class Dns(extensions.ExtensionDescriptor):
class Dns(extensions.APIExtensionDescriptor):
"""Extension class supporting DNS Integration."""
@classmethod
def get_name(cls):
return "DNS Integration"
@classmethod
def get_alias(cls):
return "dns-integration"
@classmethod
def get_description(cls):
return "Provides integration with DNS."
@classmethod
def get_updated(cls):
return "2015-08-15T18:00:00-00:00"
def get_required_extensions(self):
return ["router"]
def get_extended_resources(self, version):
if version == "2.0":
return EXTENDED_ATTRIBUTES_2_0
else:
return {}
api_definition = dns_apidef

View File

@ -14,19 +14,20 @@
# under the License.
from neutron_lib.api import converters
from neutron_lib.api.definitions import dns as dns_apidef
from neutron_lib.api import extensions
from neutron.extensions import dns
from neutron_lib.db import constants
EXTENDED_ATTRIBUTES_2_0 = {
'ports': {
dns.DNSDOMAIN: {'allow_post': True, 'allow_put': True,
'default': '',
'convert_to':
converters.convert_string_to_case_insensitive,
'validate': {'type:dns_domain': dns.FQDN_MAX_LEN},
'is_visible': True},
dns_apidef.DNSDOMAIN: {
'allow_post': True, 'allow_put': True,
'default': '',
'convert_to': converters.convert_string_to_case_insensitive,
'validate': {
'type:dns_domain_name': constants.FQDN_FIELD_SIZE},
'is_visible': True},
},
}
@ -51,7 +52,7 @@ class Dns_domain_ports(extensions.ExtensionDescriptor):
return "2017-06-25T18:00:00-00:00"
def get_required_extensions(self):
return ["dns-integration"]
return [dns_apidef.ALIAS]
def get_extended_resources(self, version):
if version == "2.0":

View File

@ -13,16 +13,18 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import dns as dns_apidef
from neutron_lib.api import validators
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as lib_const
from neutron_lib.exceptions import dns as dns_exc
from neutron_lib.plugins import directory
from oslo_config import cfg
from oslo_log import log as logging
from neutron.db import segments_db
from neutron.extensions import dns
from neutron.objects import network as net_obj
from neutron.objects import ports as port_obj
from neutron.plugins.common import utils as plugin_utils
@ -30,18 +32,17 @@ from neutron.plugins.ml2 import driver_api as api
from neutron.services.externaldns import driver
LOG = logging.getLogger(__name__)
DNS_DOMAIN_DEFAULT = 'openstacklocal.'
class DNSExtensionDriver(api.ExtensionDriver):
_supported_extension_alias = 'dns-integration'
_supported_extension_alias = dns_apidef.ALIAS
@property
def extension_alias(self):
return self._supported_extension_alias
def process_create_network(self, plugin_context, request_data, db_data):
dns_domain = request_data.get(dns.DNSDOMAIN)
dns_domain = request_data.get(dns_apidef.DNSDOMAIN)
if not validators.is_attr_set(dns_domain):
return
@ -49,14 +50,14 @@ class DNSExtensionDriver(api.ExtensionDriver):
net_obj.NetworkDNSDomain(plugin_context,
network_id=db_data['id'],
dns_domain=dns_domain).create()
db_data[dns.DNSDOMAIN] = dns_domain
db_data[dns_apidef.DNSDOMAIN] = dns_domain
def process_update_network(self, plugin_context, request_data, db_data):
new_value = request_data.get(dns.DNSDOMAIN)
new_value = request_data.get(dns_apidef.DNSDOMAIN)
if not validators.is_attr_set(new_value):
return
current_dns_domain = db_data.get(dns.DNSDOMAIN)
current_dns_domain = db_data.get(dns_apidef.DNSDOMAIN)
if current_dns_domain == new_value:
return
@ -67,20 +68,20 @@ class DNSExtensionDriver(api.ExtensionDriver):
network_id=net_id)
if new_value:
net_dns_domain['dns_domain'] = new_value
db_data[dns.DNSDOMAIN] = new_value
db_data[dns_apidef.DNSDOMAIN] = new_value
net_dns_domain.update()
else:
net_dns_domain.delete()
db_data[dns.DNSDOMAIN] = ''
db_data[dns_apidef.DNSDOMAIN] = ''
elif new_value:
net_obj.NetworkDNSDomain(plugin_context,
network_id=net_id,
dns_domain=new_value).create()
db_data[dns.DNSDOMAIN] = new_value
db_data[dns_apidef.DNSDOMAIN] = new_value
def process_create_port(self, plugin_context, request_data, db_data):
if not (request_data.get(dns.DNSNAME) or
request_data.get(dns.DNSDOMAIN)):
if not (request_data.get(dns_apidef.DNSNAME) or
request_data.get(dns_apidef.DNSDOMAIN)):
return
dns_name, is_dns_domain_default = self._get_request_dns_name(
request_data)
@ -92,8 +93,8 @@ class DNSExtensionDriver(api.ExtensionDriver):
def _create_port_dns_record(self, plugin_context, request_data, db_data,
network, dns_name):
external_dns_domain = (request_data.get(dns.DNSDOMAIN) or
network.get(dns.DNSDOMAIN))
external_dns_domain = (request_data.get(dns_apidef.DNSDOMAIN) or
network.get(dns_apidef.DNSDOMAIN))
current_dns_name, current_dns_domain = (
self._calculate_current_dns_name_and_domain(
dns_name, external_dns_domain,
@ -107,7 +108,7 @@ class DNSExtensionDriver(api.ExtensionDriver):
previous_dns_name='',
previous_dns_domain='',
dns_name=dns_name,
dns_domain=request_data.get(dns.DNSDOMAIN, ''))
dns_domain=request_data.get(dns_apidef.DNSDOMAIN, ''))
dns_data_obj.create()
return dns_data_obj
@ -130,17 +131,17 @@ class DNSExtensionDriver(api.ExtensionDriver):
return dns_name, external_dns_domain
def _update_dns_db(self, plugin_context, request_data, db_data, network):
dns_name = request_data.get(dns.DNSNAME)
dns_domain = request_data.get(dns.DNSDOMAIN)
dns_name = request_data.get(dns_apidef.DNSNAME)
dns_domain = request_data.get(dns_apidef.DNSDOMAIN)
has_fixed_ips = 'fixed_ips' in request_data
dns_data_db = port_obj.PortDNS.get_object(
plugin_context,
port_id=db_data['id'])
if dns_data_db:
is_dns_name_changed = (dns_name is not None and
dns_data_db[dns.DNSNAME] != dns_name)
dns_data_db[dns_apidef.DNSNAME] != dns_name)
is_dns_domain_changed = (dns_domain is not None and
dns_data_db[dns.DNSDOMAIN] != dns_domain)
dns_data_db[dns_apidef.DNSDOMAIN] != dns_domain)
if (is_dns_name_changed or is_dns_domain_changed or
(has_fixed_ips and dns_data_db['current_dns_name'])):
dns_data_db = self._populate_previous_external_dns_data(
@ -174,15 +175,15 @@ class DNSExtensionDriver(api.ExtensionDriver):
is_dns_domain_changed):
if is_dns_name_changed or is_dns_domain_changed:
if is_dns_name_changed:
dns_data_db[dns.DNSNAME] = dns_name
external_dns_domain = (dns_data_db[dns.DNSDOMAIN] or
network.get(dns.DNSDOMAIN))
dns_data_db[dns_apidef.DNSNAME] = dns_name
external_dns_domain = (dns_data_db[dns_apidef.DNSDOMAIN] or
network.get(dns_apidef.DNSDOMAIN))
if is_dns_domain_changed:
dns_data_db[dns.DNSDOMAIN] = dns_domain
external_dns_domain = request_data[dns.DNSDOMAIN]
dns_data_db[dns_apidef.DNSDOMAIN] = dns_domain
external_dns_domain = request_data[dns_apidef.DNSDOMAIN]
if not external_dns_domain:
external_dns_domain = network.get(dns.DNSDOMAIN)
dns_data_db['current_dns_name'] = dns_data_db[dns.DNSNAME]
external_dns_domain = network.get(dns_apidef.DNSDOMAIN)
dns_data_db['current_dns_name'] = dns_data_db[dns_apidef.DNSNAME]
dns_data_db['current_dns_domain'] = external_dns_domain
if not (dns_data_db['current_dns_name'] and
dns_data_db['current_dns_domain']):
@ -191,9 +192,9 @@ class DNSExtensionDriver(api.ExtensionDriver):
return dns_data_db
def process_update_port(self, plugin_context, request_data, db_data):
has_dns_name = dns.DNSNAME in request_data
has_dns_name = dns_apidef.DNSNAME in request_data
has_fixed_ips = 'fixed_ips' in request_data
has_dns_domain = dns.DNSDOMAIN in request_data
has_dns_domain = dns_apidef.DNSDOMAIN in request_data
if not any((has_dns_name, has_fixed_ips, has_dns_domain)):
return
is_dns_domain_default = self._get_request_dns_name(
@ -218,17 +219,18 @@ class DNSExtensionDriver(api.ExtensionDriver):
def _process_only_port_update(self, plugin_context, request_data,
db_data):
dns_name = request_data.get(dns.DNSNAME)
dns_domain = request_data.get(dns.DNSDOMAIN)
dns_name = request_data.get(dns_apidef.DNSNAME)
dns_domain = request_data.get(dns_apidef.DNSDOMAIN)
dns_data_db = port_obj.PortDNS.get_object(
plugin_context,
port_id=db_data['id'])
if dns_data_db:
if dns_name is not None and dns_data_db[dns.DNSNAME] != dns_name:
dns_data_db[dns.DNSNAME] = dns_name
if dns_name is not None and dns_data_db[
dns_apidef.DNSNAME] != dns_name:
dns_data_db[dns_apidef.DNSNAME] = dns_name
if (dns_domain is not None and
dns_data_db[dns.DNSDOMAIN] != dns_domain):
dns_data_db[dns.DNSDOMAIN] = dns_domain
dns_data_db[dns_apidef.DNSDOMAIN] != dns_domain):
dns_data_db[dns_apidef.DNSDOMAIN] = dns_domain
dns_data_db.update()
return dns_data_db
dns_data_db = port_obj.PortDNS(plugin_context,
@ -252,9 +254,10 @@ class DNSExtensionDriver(api.ExtensionDriver):
pass
def extend_network_dict(self, session, db_data, response_data):
response_data[dns.DNSDOMAIN] = ''
response_data[dns_apidef.DNSDOMAIN] = ''
if db_data.dns_domain:
response_data[dns.DNSDOMAIN] = db_data.dns_domain[dns.DNSDOMAIN]
response_data[dns_apidef.DNSDOMAIN] = db_data.dns_domain[
dns_apidef.DNSDOMAIN]
return response_data
def _get_dns_domain(self):
@ -266,14 +269,14 @@ class DNSExtensionDriver(api.ExtensionDriver):
def _get_request_dns_name(self, port):
dns_domain = self._get_dns_domain()
if ((dns_domain and dns_domain != DNS_DOMAIN_DEFAULT)):
return (port.get(dns.DNSNAME, ''), False)
return ('', True)
if dns_domain and dns_domain != lib_const.DNS_DOMAIN_DEFAULT:
return port.get(dns_apidef.DNSNAME, ''), False
return '', True
def _get_request_dns_name_and_domain_name(self, dns_data_db):
dns_domain = self._get_dns_domain()
dns_name = ''
if ((dns_domain and dns_domain != DNS_DOMAIN_DEFAULT)):
if dns_domain and dns_domain != lib_const.DNS_DOMAIN_DEFAULT:
if dns_data_db:
dns_name = dns_data_db.dns_name
return dns_name, dns_domain
@ -306,9 +309,9 @@ class DNSExtensionDriver(api.ExtensionDriver):
def _extend_port_dict(self, session, db_data, response_data, dns_data_db):
if not dns_data_db:
response_data[dns.DNSNAME] = ''
response_data[dns_apidef.DNSNAME] = ''
else:
response_data[dns.DNSNAME] = dns_data_db[dns.DNSNAME]
response_data[dns_apidef.DNSNAME] = dns_data_db[dns_apidef.DNSNAME]
response_data['dns_assignment'] = self._get_dns_name_for_port_get(
db_data, dns_data_db)
return response_data
@ -377,7 +380,7 @@ class DNSExtensionDriverML2(DNSExtensionDriver):
class DNSDomainPortsExtensionDriver(DNSExtensionDriverML2):
_supported_extension_aliases = ['dns-integration', 'dns-domain-ports']
_supported_extension_aliases = [dns_apidef.ALIAS, 'dns-domain-ports']
@property
def extension_aliases(self):
@ -391,9 +394,10 @@ class DNSDomainPortsExtensionDriver(DNSExtensionDriverML2):
super(DNSDomainPortsExtensionDriver, self).extend_port_dict(
session, db_data, response_data))
dns_data_db = db_data.dns
response_data[dns.DNSDOMAIN] = ''
response_data[dns_apidef.DNSDOMAIN] = ''
if dns_data_db:
response_data[dns.DNSDOMAIN] = dns_data_db[dns.DNSDOMAIN]
response_data[dns_apidef.DNSDOMAIN] = dns_data_db[
dns_apidef.DNSDOMAIN]
DNS_DRIVER = None
@ -413,7 +417,7 @@ def _get_dns_driver():
except ImportError:
LOG.exception("ImportError exception occurred while loading "
"the external DNS service driver")
raise dns.ExternalDNSDriverNotFound(
raise dns_exc.ExternalDNSDriverNotFound(
driver=cfg.CONF.external_dns_driver)
@ -438,7 +442,7 @@ def _send_data_to_external_dns_service(context, dns_driver, dns_domain,
dns_name, records):
try:
dns_driver.create_record_set(context, dns_domain, dns_name, records)
except (dns.DNSDomainNotFound, dns.DuplicateRecordSet) as e:
except (dns_exc.DNSDomainNotFound, dns_exc.DuplicateRecordSet) as e:
LOG.exception("Error publishing port data in external DNS "
"service. Name: '%(name)s'. Domain: '%(domain)s'. "
"DNS service driver message '%(message)s'",
@ -451,7 +455,7 @@ def _remove_data_from_external_dns_service(context, dns_driver, dns_domain,
dns_name, records):
try:
dns_driver.delete_record_set(context, dns_domain, dns_name, records)
except (dns.DNSDomainNotFound, dns.DuplicateRecordSet) as e:
except (dns_exc.DNSDomainNotFound, dns_exc.DuplicateRecordSet) as e:
LOG.exception("Error deleting port data from external DNS "
"service. Name: '%(name)s'. Domain: '%(domain)s'. "
"IP addresses '%(ips)s'. DNS service driver message "
@ -473,11 +477,11 @@ def _update_port_in_external_dns_service(resource, event, trigger, **kwargs):
return
original_ips = [ip['ip_address'] for ip in original_port['fixed_ips']]
updated_ips = [ip['ip_address'] for ip in updated_port['fixed_ips']]
is_dns_name_changed = (updated_port[dns.DNSNAME] !=
original_port[dns.DNSNAME])
is_dns_domain_changed = (dns.DNSDOMAIN in updated_port and
updated_port[dns.DNSDOMAIN] !=
original_port[dns.DNSDOMAIN])
is_dns_name_changed = (updated_port[dns_apidef.DNSNAME] !=
original_port[dns_apidef.DNSNAME])
is_dns_domain_changed = (dns_apidef.DNSDOMAIN in updated_port and
updated_port[dns_apidef.DNSDOMAIN] !=
original_port[dns_apidef.DNSDOMAIN])
ips_changed = set(original_ips) != set(updated_ips)
if not any((is_dns_name_changed, is_dns_domain_changed, ips_changed)):
return

View File

@ -21,10 +21,10 @@ from keystoneauth1.identity.generic import password
from keystoneauth1 import loading
from keystoneauth1 import token_endpoint
from neutron_lib import constants
from neutron_lib.exceptions import dns as dns_exc
from oslo_config import cfg
from neutron.conf.services import extdns_designate_driver
from neutron.extensions import dns
from neutron.services.externaldns import driver
IPV4_PTR_ZONE_PREFIX_MIN_SIZE = 8
@ -71,7 +71,7 @@ class Designate(driver.ExternalDNSService):
if (ipv4_ptr_zone_size < IPV4_PTR_ZONE_PREFIX_MIN_SIZE or
ipv4_ptr_zone_size > IPV4_PTR_ZONE_PREFIX_MAX_SIZE or
(ipv4_ptr_zone_size % 8) != 0):
raise dns.InvalidPTRZoneConfiguration(
raise dns_exc.InvalidPTRZoneConfiguration(
parameter='ipv4_ptr_zone_size', number='8',
maximum=str(IPV4_PTR_ZONE_PREFIX_MAX_SIZE),
minimum=str(IPV4_PTR_ZONE_PREFIX_MIN_SIZE))
@ -79,7 +79,7 @@ class Designate(driver.ExternalDNSService):
if (ipv6_ptr_zone_size < IPV6_PTR_ZONE_PREFIX_MIN_SIZE or
ipv6_ptr_zone_size > IPV6_PTR_ZONE_PREFIX_MAX_SIZE or
(ipv6_ptr_zone_size % 4) != 0):
raise dns.InvalidPTRZoneConfiguration(
raise dns_exc.InvalidPTRZoneConfiguration(
parameter='ipv6_ptr_zone_size', number='4',
maximum=str(IPV6_PTR_ZONE_PREFIX_MAX_SIZE),
minimum=str(IPV6_PTR_ZONE_PREFIX_MIN_SIZE))
@ -93,9 +93,9 @@ class Designate(driver.ExternalDNSService):
if v6:
designate.recordsets.create(dns_domain, dns_name, 'AAAA', v6)
except d_exc.NotFound:
raise dns.DNSDomainNotFound(dns_domain=dns_domain)
raise dns_exc.DNSDomainNotFound(dns_domain=dns_domain)
except d_exc.Conflict:
raise dns.DuplicateRecordSet(dns_name=dns_name)
raise dns_exc.DuplicateRecordSet(dns_name=dns_name)
if not CONF.designate.allow_reverse_dns_lookup:
return
@ -165,9 +165,9 @@ class Designate(driver.ExternalDNSService):
recordsets = designate_client.recordsets.list(
dns_domain, criterion={"name": "%s" % name})
except d_exc.NotFound:
raise dns.DNSDomainNotFound(dns_domain=dns_domain)
raise dns_exc.DNSDomainNotFound(dns_domain=dns_domain)
ids = [rec['id'] for rec in recordsets]
ips = [str(ip) for rec in recordsets for ip in rec['records']]
if set(ips) != set(records):
raise dns.DuplicateRecordSet(dns_name=name)
raise dns_exc.DuplicateRecordSet(dns_name=name)
return ids

View File

@ -17,6 +17,7 @@ import math
import netaddr
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.db import constants as db_const
from neutron_lib.plugins import directory
from oslo_config import cfg
@ -362,10 +363,10 @@ class DnsExtensionTestCase(test_plugin.Ml2PluginV2TestCase):
self):
cfg.CONF.set_override('dns_domain', 'example.com')
num_labels = int(
math.floor(dns.FQDN_MAX_LEN / dns.DNS_LABEL_MAX_LEN))
math.floor(db_const.FQDN_FIELD_SIZE / constants.DNS_LABEL_MAX_LEN))
filler_len = int(
math.floor(dns.FQDN_MAX_LEN % dns.DNS_LABEL_MAX_LEN))
dns_name = (('a' * (dns.DNS_LABEL_MAX_LEN - 1) + '.') *
math.floor(db_const.FQDN_FIELD_SIZE % constants.DNS_LABEL_MAX_LEN))
dns_name = (('a' * (constants.DNS_LABEL_MAX_LEN - 1) + '.') *
num_labels + 'a' * filler_len)
res = self._test_create_port_with_multiple_ipv4_and_ipv6_subnets(
dns_name=dns_name)
@ -436,15 +437,15 @@ class DnsExtensionTestCase(test_plugin.Ml2PluginV2TestCase):
def test_api_extension_validation_with_bad_dns_names(self):
num_labels = int(
math.floor(dns.FQDN_MAX_LEN / dns.DNS_LABEL_MAX_LEN))
math.floor(db_const.FQDN_FIELD_SIZE / constants.DNS_LABEL_MAX_LEN))
filler_len = int(
math.floor(dns.FQDN_MAX_LEN % dns.DNS_LABEL_MAX_LEN))
math.floor(db_const.FQDN_FIELD_SIZE % constants.DNS_LABEL_MAX_LEN))
dns_names = [555, '\f\n\r', '.', '-vm01', '_vm01', 'vm01-',
'-vm01.test1', 'vm01.-test1', 'vm01._test1',
'vm01.test1-', 'vm01.te$t1', 'vm0#1.test1.',
'vm01.123.', '-' + 'a' * dns.DNS_LABEL_MAX_LEN,
'a' * (dns.DNS_LABEL_MAX_LEN + 1),
('a' * (dns.DNS_LABEL_MAX_LEN - 1) + '.') *
'vm01.123.', '-' + 'a' * constants.DNS_LABEL_MAX_LEN,
'a' * (constants.DNS_LABEL_MAX_LEN + 1),
('a' * (constants.DNS_LABEL_MAX_LEN - 1) + '.') *
num_labels + 'a' * (filler_len + 1)]
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
@ -463,25 +464,26 @@ class DnsExtensionTestCase(test_plugin.Ml2PluginV2TestCase):
error_message = res.json['NeutronError']['message']
is_expected_message = (
'cannot be converted to lowercase string' in error_message or
'not a valid PQDN or FQDN. Reason:' in error_message)
'not a valid PQDN or FQDN. Reason:' in error_message or
'must be string type' in error_message)
self.assertTrue(is_expected_message)
def test_api_extension_validation_with_good_dns_names(self):
cfg.CONF.set_override('dns_domain', 'example.com')
higher_labels_len = len('example.com.')
num_labels = int(
math.floor((dns.FQDN_MAX_LEN - higher_labels_len) /
dns.DNS_LABEL_MAX_LEN))
math.floor((db_const.FQDN_FIELD_SIZE - higher_labels_len) /
constants.DNS_LABEL_MAX_LEN))
filler_len = int(
math.floor((dns.FQDN_MAX_LEN - higher_labels_len) %
dns.DNS_LABEL_MAX_LEN))
math.floor((db_const.FQDN_FIELD_SIZE - higher_labels_len) %
constants.DNS_LABEL_MAX_LEN))
dns_names = ['', 'www.1000.com', 'vM01', 'vm01.example.com.',
'8vm01', 'vm-01.example.com.', 'vm01.test',
'vm01.test.example.com.', 'vm01.test-100',
'vm01.test-100.example.com.',
'a' * dns.DNS_LABEL_MAX_LEN,
('a' * dns.DNS_LABEL_MAX_LEN) + '.example.com.',
('a' * (dns.DNS_LABEL_MAX_LEN - 1) + '.') *
'a' * constants.DNS_LABEL_MAX_LEN,
('a' * constants.DNS_LABEL_MAX_LEN) + '.example.com.',
('a' * (constants.DNS_LABEL_MAX_LEN - 1) + '.') *
num_labels + 'a' * (filler_len - 1)]
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)

View File

@ -19,6 +19,7 @@ import copy
import mock
import netaddr
from neutron_lib.api.definitions import dns as dns_apidef
from neutron_lib.api.definitions import portbindings
from neutron_lib.callbacks import events
from neutron_lib.callbacks import exceptions
@ -52,7 +53,6 @@ from neutron.db import l3_dvrscheduler_db
from neutron.db import l3_hamode_db
from neutron.db.models import l3 as l3_models
from neutron.db import models_v2
from neutron.extensions import dns
from neutron.extensions import external_net
from neutron.extensions import l3
from neutron.services.revisions import revision_plugin
@ -3981,7 +3981,7 @@ class L3TestExtensionManagerWithDNS(L3TestExtensionManager):
attributes.RESOURCE_ATTRIBUTE_MAP.update(
l3.RESOURCE_ATTRIBUTE_MAP)
attributes.RESOURCE_ATTRIBUTE_MAP[l3.FLOATINGIPS].update(
dns.EXTENDED_ATTRIBUTES_2_0[l3.FLOATINGIPS])
dns_apidef.RESOURCE_ATTRIBUTE_MAP[l3.FLOATINGIPS])
return l3.L3.get_resources()

View File

@ -16,11 +16,11 @@ import itertools
import random
from neutron_lib import constants as const
from neutron_lib.db import constants as db_const
from neutron_lib.utils import net
from oslo_serialization import jsonutils
from neutron.common import constants
from neutron.extensions import dns as dns_ext
from neutron.objects import common_types
from neutron.tests import base as test_base
from neutron.tests import tools
@ -194,7 +194,7 @@ class DomainNameFieldTest(test_base.BaseTestCase, TestField):
(val, val)
for val in ('www.google.com', 'hostname', '1abc.com')
]
self.coerce_bad_values = ['x' * (dns_ext.FQDN_MAX_LEN + 1), 10, []]
self.coerce_bad_values = ['x' * (db_const.FQDN_FIELD_SIZE + 1), 10, []]
self.to_primitive_values = self.coerce_good_values
self.from_primitive_values = self.coerce_good_values

View File

@ -17,6 +17,7 @@ from keystoneauth1 import loading
from keystoneauth1 import session
import mock
import netaddr
from neutron_lib.api.definitions import dns as dns_apidef
from neutron_lib.api.definitions import provider_net as pnet
from neutron_lib import constants
from neutron_lib import context
@ -25,7 +26,6 @@ from oslo_config import cfg
from oslo_utils import uuidutils
import testtools
from neutron.extensions import dns
from neutron.objects import ports as port_obj
from neutron.plugins.ml2.extensions import dns_integration
from neutron.services.externaldns.drivers.designate import driver
@ -75,9 +75,9 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
pnet.SEGMENTATION_ID: '2016',
}
if dns_domain:
net_kwargs[dns.DNSDOMAIN] = DNSDOMAIN
net_kwargs[dns_apidef.DNSDOMAIN] = DNSDOMAIN
net_kwargs['arg_list'] = \
net_kwargs.get('arg_list', ()) + (dns.DNSDOMAIN,)
net_kwargs.get('arg_list', ()) + (dns_apidef.DNSDOMAIN,)
res = self._create_network(self.fmt, 'test_network', True,
**net_kwargs)
network = self.deserialize(self.fmt, res)
@ -92,13 +92,13 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
port_kwargs = {}
if dns_name:
port_kwargs = {
'arg_list': (dns.DNSNAME,),
dns.DNSNAME: DNSNAME
'arg_list': (dns_apidef.DNSNAME,),
dns_apidef.DNSNAME: DNSNAME
}
if dns_domain_port:
port_kwargs[dns.DNSDOMAIN] = PORTDNSDOMAIN
port_kwargs[dns_apidef.DNSDOMAIN] = PORTDNSDOMAIN
port_kwargs['arg_list'] = (port_kwargs.get('arg_list', ()) +
(dns.DNSDOMAIN,))
(dns_apidef.DNSDOMAIN,))
res = self._create_port('json', network['network']['id'],
**port_kwargs)
self.assertEqual(201, res.status_int)
@ -138,7 +138,7 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
if new_dns_name is not None:
body['dns_name'] = new_dns_name
if new_dns_domain is not None:
body[dns.DNSDOMAIN] = new_dns_domain
body[dns_apidef.DNSDOMAIN] = new_dns_domain
body.update(kwargs)
data = {'port': body}
req = self.new_update_request('ports', data, port['id'])
@ -156,9 +156,9 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
dns_domain_port=False, current_dns_domain=DNSDOMAIN,
previous_dns_domain=DNSDOMAIN):
if dns_name:
self.assertEqual(current_dns_name, port[dns.DNSNAME])
self.assertEqual(current_dns_name, port[dns_apidef.DNSNAME])
if dns_domain_port:
self.assertTrue(port[dns.DNSDOMAIN])
self.assertTrue(port[dns_apidef.DNSDOMAIN])
is_there_dns_domain = dns_domain or dns_domain_port
if dns_name and is_there_dns_domain and provider_net and dns_driver:
self.assertEqual(current_dns_name, dns_data_db['current_dns_name'])
@ -239,7 +239,7 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
len(expected_delete))
else:
if not dns_name:
self.assertEqual('', port[dns.DNSNAME])
self.assertEqual('', port[dns_apidef.DNSNAME])
if not (dns_name or dns_domain_port):
self.assertIsNone(dns_data_db)
self.assertFalse(mock_client.recordsets.create.call_args_list)
@ -494,7 +494,7 @@ class DNSIntegrationTestCaseDefaultDomain(DNSIntegrationTestCase):
dns_domain=True, ptr_zones=True, delete_records=False,
provider_net=True, dns_driver=True, original_ips=None,
current_dns_name=DNSNAME, previous_dns_name=''):
self.assertEqual('', port[dns.DNSNAME])
self.assertEqual('', port[dns_apidef.DNSNAME])
fqdn_set = self._generate_dns_assignment(port)
port_fqdn_set = set([each['fqdn'] for each in port['dns_assignment']])
self.assertEqual(fqdn_set, port_fqdn_set)
@ -568,8 +568,8 @@ class DNSDomainPortsTestCase(DNSIntegrationTestCase):
dns_name=False)
self._verify_port_dns(port, dns_data_db, dns_name=False,
dns_domain=False, dns_domain_port=True)
self.assertEqual(PORTDNSDOMAIN, dns_data_db[dns.DNSDOMAIN])
self.assertEqual(PORTDNSDOMAIN, port[dns.DNSDOMAIN])
self.assertEqual(PORTDNSDOMAIN, dns_data_db[dns_apidef.DNSDOMAIN])
self.assertEqual(PORTDNSDOMAIN, port[dns_apidef.DNSDOMAIN])
def test_update_port_replace_port_dns_domain(self, *mocks):
port, dns_data_db = self._create_port_for_test(
@ -639,10 +639,10 @@ class DNSDomainPortsTestCase(DNSIntegrationTestCase):
self.assertFalse(dns_data_db['current_dns_domain'])
self.assertEqual(DNSNAME, dns_data_db['previous_dns_name'])
self.assertEqual(PORTDNSDOMAIN, dns_data_db['previous_dns_domain'])
self.assertEqual(DNSNAME, dns_data_db[dns.DNSNAME])
self.assertFalse(dns_data_db[dns.DNSDOMAIN])
self.assertEqual(DNSNAME, port[dns.DNSNAME])
self.assertFalse(port[dns.DNSDOMAIN])
self.assertEqual(DNSNAME, dns_data_db[dns_apidef.DNSNAME])
self.assertFalse(dns_data_db[dns_apidef.DNSDOMAIN])
self.assertEqual(DNSNAME, port[dns_apidef.DNSNAME])
self.assertFalse(port[dns_apidef.DNSDOMAIN])
self.assertFalse(mock_client.recordsets.create.call_args_list)
self.assertFalse(mock_admin_client.recordsets.create.call_args_list)
self.assertEqual(2, mock_client.recordsets.delete.call_count)
@ -667,10 +667,10 @@ class DNSDomainPortsTestCase(DNSIntegrationTestCase):
self.assertFalse(dns_data_db['current_dns_domain'])
self.assertFalse(dns_data_db['previous_dns_name'])
self.assertFalse(dns_data_db['previous_dns_domain'])
self.assertEqual(dns_name, dns_data_db[dns.DNSNAME])
self.assertEqual(dns_domain, dns_data_db[dns.DNSDOMAIN])
self.assertEqual(dns_name, port[dns.DNSNAME])
self.assertEqual(dns_domain, port[dns.DNSDOMAIN])
self.assertEqual(dns_name, dns_data_db[dns_apidef.DNSNAME])
self.assertEqual(dns_domain, dns_data_db[dns_apidef.DNSDOMAIN])
self.assertEqual(dns_name, port[dns_apidef.DNSNAME])
self.assertEqual(dns_domain, port[dns_apidef.DNSDOMAIN])
self.assertFalse(mock_client.recordsets.create.call_args_list)
self.assertFalse(
mock_admin_client.recordsets.create.call_args_list)