Add NAT rules to the floating IP workflow

The floating IP workflow is now able to manage NAT
rules (portforwarding) if the floating IP is not
associated with any NICs (ports).

This patch is the one of a series of patches
to implement floating ip port forwarding with
port ranges.

The specification is defined in:
https://github.com/openstack/neutron-specs/blob/master/specs/wallaby/port-forwarding-port-ranges.rst

Implements: blueprint https://blueprints.launchpad.net/neutron/+spec/floatingips-portforwarding-ranges
Change-Id: Id715da6591124de45f41cc367bf39a6bfe190c9a
This commit is contained in:
Pedro Martins 2019-08-13 21:34:26 -03:00 committed by Pedro Henrique
parent d4b73ed1c4
commit 1db1764749
17 changed files with 1312 additions and 14 deletions

View File

@ -510,15 +510,38 @@ class SecurityGroupManager(object):
class FloatingIp(base.APIDictWrapper):
_attrs = ['id', 'ip', 'fixed_ip', 'port_id', 'instance_id',
'instance_type', 'pool', 'dns_domain', 'dns_name']
'instance_type', 'pool', 'dns_domain', 'dns_name',
'port_forwardings']
def __init__(self, fip):
fip['ip'] = fip['floating_ip_address']
fip['fixed_ip'] = fip['fixed_ip_address']
fip['pool'] = fip['floating_network_id']
fip['port_forwardings'] = fip.get('portforwardings', {})
super().__init__(fip)
class PortForwarding(base.APIDictWrapper):
_attrs = ['id', 'floating_ip_id', 'protocol', 'internal_port_range',
'external_port_range', 'internal_ip_address',
'description', 'internal_port_id', 'external_ip_address']
def __init__(self, pfw, fip):
pfw['floating_ip_id'] = fip
port_forwarding = pfw
if 'port_forwarding' in pfw:
port_forwarding = pfw['port_forwarding']
port_forwarding['internal_port_range'] = ':'.join(
map(str, sorted(
map(int, set(port_forwarding.get(
'internal_port_range', '').split(':'))))))
port_forwarding['external_port_range'] = ':'.join(
map(str, sorted(
map(int, set(port_forwarding.get(
'external_port_range', '').split(':'))))))
super().__init__(pfw)
class FloatingIpPool(base.APIDictWrapper):
pass
@ -544,6 +567,81 @@ class FloatingIpTarget(base.APIDictWrapper):
super().__init__(target)
class PortForwardingManager(object):
def __init__(self, request):
self.request = request
self.client = neutronclient(request)
@profiler.trace
def list(self, floating_ip_id, **search_opts):
port_forwarding_rules = self.client.list_port_forwardings(
floating_ip_id, **search_opts)
port_forwarding_rules = port_forwarding_rules.get('port_forwardings')
LOG.debug("Portforwarding rules listed=%s", port_forwarding_rules)
return [PortForwarding(port_forwarding_rule, floating_ip_id)
for port_forwarding_rule in port_forwarding_rules]
@profiler.trace
def update(self, floating_ip_id, **params):
portforwarding_dict = self.create_port_forwarding_dict(**params)
portforwarding_id = params['portforwarding_id']
LOG.debug("Updating Portforwarding rule with id %s", portforwarding_id)
pfw = self.client.update_port_forwarding(
floating_ip_id,
portforwarding_id,
{'port_forwarding': portforwarding_dict}).get('port_forwarding')
return PortForwarding(pfw, floating_ip_id)
@profiler.trace
def create(self, floating_ip_id, **params):
portforwarding_dict = self.create_port_forwarding_dict(**params)
portforwarding_rule = self.client.create_port_forwarding(
floating_ip_id,
{'port_forwarding': portforwarding_dict}).get('port_forwarding')
LOG.debug("Created a Portforwarding rule to floating IP %s with id %s",
floating_ip_id,
portforwarding_rule['id'])
return PortForwarding(portforwarding_rule, floating_ip_id)
def create_port_forwarding_dict(self, **params):
portforwarding_dict = {}
if 'protocol' in params:
portforwarding_dict['protocol'] = str(params['protocol']).lower()
if 'internal_port' in params:
internal_port = str(params['internal_port'])
if ':' not in internal_port:
portforwarding_dict['internal_port'] = int(internal_port)
else:
portforwarding_dict['internal_port_range'] = internal_port
if 'external_port' in params:
external_port = str(params['external_port'])
if ':' not in external_port:
portforwarding_dict['external_port'] = int(external_port)
else:
portforwarding_dict['external_port_range'] = external_port
if 'internal_ip_address' in params:
portforwarding_dict['internal_ip_address'] = params[
'internal_ip_address']
if 'description' in params:
portforwarding_dict['description'] = params['description']
if 'internal_port_id' in params:
portforwarding_dict['internal_port_id'] = params['internal_port_id']
return portforwarding_dict
def delete(self, floating_ip_id, portforwarding_id):
self.client.delete_port_forwarding(floating_ip_id, portforwarding_id)
LOG.debug(
"The Portforwarding rule of floating IP %s with id %s was deleted",
floating_ip_id, portforwarding_id)
def get(self, floating_ip_id, portforwarding_id):
pfw = self.client.show_port_forwarding(floating_ip_id,
portforwarding_id)
return PortForwarding(pfw, portforwarding_id)
class FloatingIpManager(object):
"""Manager class to implement Floating IP methods
@ -1956,6 +2054,26 @@ def tenant_floating_ip_list(request, all_tenants=False, **search_opts):
**search_opts)
def floating_ip_port_forwarding_list(request, fip):
return PortForwardingManager(request).list(fip)
def floating_ip_port_forwarding_create(request, fip, **params):
return PortForwardingManager(request).create(fip, **params)
def floating_ip_port_forwarding_update(request, fip, **params):
return PortForwardingManager(request).update(fip, **params)
def floating_ip_port_forwarding_get(request, fip, pfw):
return PortForwardingManager(request).get(fip, pfw)
def floating_ip_port_forwarding_delete(request, fip, pfw):
return PortForwardingManager(request).delete(fip, pfw)
def tenant_floating_ip_get(request, floating_ip_id):
return FloatingIpManager(request).get(floating_ip_id)
@ -2179,6 +2297,18 @@ def is_extension_supported(request, extension_alias):
return False
@profiler.trace
def is_extension_floating_ip_port_forwarding_supported(request):
try:
return is_extension_supported(
request, extension_alias='floating-ip-port-forwarding')
except Exception as e:
LOG.error("It was not possible to check if the "
"floating-ip-port-forwarding extension is enabled in "
"neutron. Port forwardings will not be enabled.: %s", e)
return False
# TODO(amotoki): Clean up 'default' parameter because the default
# values are pre-defined now, so 'default' argument is meaningless
# in most cases.

View File

@ -0,0 +1,37 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from django.utils.translation import gettext_lazy as _
import horizon
from openstack_dashboard.api import neutron
LOG = logging.getLogger(__name__)
class FloatingIpPortforwardingRules(horizon.Panel):
name = _("Floating IP port forwarding rules")
slug = 'floating_ip_portforwardings'
permissions = ('openstack.services.network',)
nav = False
def allowed(self, context):
request = context['request']
return (
super().allowed(context) and
request.user.has_perms(self.permissions) and
neutron.is_extension_floating_ip_port_forwarding_supported(
request)
)

View File

@ -0,0 +1,194 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from django import shortcuts
from django.urls import reverse
from django.utils.http import urlencode
from django.utils.translation import gettext_lazy as _
from django.utils.translation import ngettext_lazy
from horizon import tables
from openstack_dashboard import api
from openstack_dashboard import policy
PROTOCOL_CHOICES = (
("Select a protocol", "Select a protocol"),
("UDP", "UDP"),
("TCP", "TCP"),
)
class CreateFloatingIpPortForwardingRule(tables.LinkAction):
name = "create"
verbose_name = _("Add floating IP port forwarding rule")
classes = ("ajax-modal",)
icon = "plus"
url = "horizon:project:floating_ip_portforwardings:create"
floating_ip_id = None
def allowed(self, request, fip=None):
policy_rules = (("network", "create_floatingip_port_forwarding"),)
return policy.check(policy_rules, request)
def single(self, data_table, request, *args):
return shortcuts.redirect(
'horizon:project:floating_ip_portforwardings:show')
def get_url_params(self, datum=None):
return urlencode({"floating_ip_id": self.floating_ip_id})
def get_link_url(self, datum=None):
base_url = reverse(self.url)
join = "?".join([base_url, self.get_url_params(datum)])
return join
class EditFloatingIpPortForwardingRule(CreateFloatingIpPortForwardingRule):
name = "edit"
verbose_name = _("Edit floating IP port forwarding rule")
classes = ("ajax-modal", "btn-edit")
url = "horizon:project:floating_ip_portforwardings:edit"
def allowed(self, request, fip=None):
policy_rules = (("network", "update_floatingip_port_forwarding"),)
return policy.check(policy_rules, request)
def get_url_params(self, datum=None):
portforwading_id = self.table.get_object_id(datum)
return urlencode({"floating_ip_id": self.floating_ip_id,
"pfwd_id": portforwading_id})
class EditFloatingIpPortForwardingRuleFromAllPanel(
EditFloatingIpPortForwardingRule):
name = "edit-from-all"
url = "horizon:project:floating_ip_portforwardings:editToAll"
def single(self, data_table, request, *args):
return shortcuts.redirect(
'horizon:project:floating_ip_portforwardings:index')
def get_url_params(self, datum=None):
portforwading_id = self.table.get_object_id(datum)
return urlencode({"floating_ip_id": datum.floating_ip_id,
"pfwd_id": portforwading_id})
class DeleteRule(tables.DeleteAction):
name = "delete"
help_text = _(
"This action will delete the "
"selected floating IP port forwarding rule(s); "
"this process cannot be undone.")
floating_ip_id = None
@staticmethod
def action_present(count):
return ngettext_lazy(
u"Delete Rule",
u"Delete Rules",
count
)
@staticmethod
def action_past(count):
return ngettext_lazy(
u"Deleted Rule",
u"Deleted Rules",
count
)
def allowed(self, request, fip=None):
policy_rules = (("network", "delete_floatingip_port_forwarding"),)
return policy.check(policy_rules, request)
def action(self, request, obj_id):
api.neutron.floating_ip_port_forwarding_delete(request,
self.floating_ip_id,
obj_id)
class DeleteRuleFromAllPanel(DeleteRule):
name = "delete-from-all"
def action(self, request, obj_id):
datum = self.table.get_object_by_id(obj_id)
api.neutron.floating_ip_port_forwarding_delete(request,
datum.floating_ip_id,
obj_id)
class FloatingIpPortForwardingRulesTable(tables.DataTable):
protocol = tables.Column("protocol", verbose_name=_("Protocol"))
external_port_range = tables.Column("external_port_range",
verbose_name=_("External port"))
internal_port_range = tables.Column("internal_port_range",
verbose_name=_("Internal port"))
internal_ip_address = tables.Column("internal_ip_address",
verbose_name=_("Internal IP address"))
description = tables.Column("description", verbose_name=_("Description"))
def __init__(self, request, data=None, needs_form_wrapper=None, **kwargs):
super().__init__(
request, data=data, needs_form_wrapper=needs_form_wrapper,
**kwargs)
floating_ip_id = request.GET.get('floating_ip_id')
for action in self.get_table_actions():
action.floating_ip_id = floating_ip_id
for action in self._meta.row_actions:
action.floating_ip_id = floating_ip_id
def get_object_display(self, datum):
return str(datum.internal_ip_address) + ':' + str(
datum.internal_port_range)
class Meta(object):
name = "floating_ip_portforwardings"
verbose_name = _("Floating IP port forwarding rules")
table_actions = (CreateFloatingIpPortForwardingRule, DeleteRule)
row_actions = (EditFloatingIpPortForwardingRule, DeleteRule)
class AllFloatingIpPortForwardingRulesTable(tables.DataTable):
floating_ip_id = tables.Column("floating_ip_id",
verbose_name=_("floating_ip_id"),
hidden=True)
protocol = tables.Column("protocol", verbose_name=_("Protocol"))
external_port_range = tables.Column("external_port_range",
verbose_name=_("External port"))
internal_port_range = tables.Column("internal_port_range",
verbose_name=_("Internal port"))
external_ip_address = tables.Column("external_ip_address",
verbose_name=_("External IP address"))
internal_ip_address = tables.Column("internal_ip_address",
verbose_name=_("Internal IP address"))
description = tables.Column("description", verbose_name=_("Description"))
def __init__(self, request, data=None, needs_form_wrapper=None, **kwargs):
super().__init__(
request, data=data, needs_form_wrapper=needs_form_wrapper, **kwargs)
def get_object_display(self, datum):
return str(datum.internal_ip_address) + ':' + str(
datum.internal_port_range)
class Meta(object):
name = "floating_ip_portforwardings"
verbose_name = _("Floating IP port forwarding rules")
table_actions = (DeleteRuleFromAllPanel,)
row_actions = (
EditFloatingIpPortForwardingRuleFromAllPanel,
DeleteRuleFromAllPanel)

View File

@ -0,0 +1,262 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import uuid
from django.urls import reverse
from django.utils.http import urlencode
from openstack_dashboard import api
from openstack_dashboard.test import helpers as test
from horizon.tables import views as table_views
from horizon.workflows import views
INDEX_URL = reverse('horizon:project:floating_ip_portforwardings:index')
NAMESPACE = "horizon:project:floating_ip_portforwardings"
class FloatingIpPortforwardingViewTests(test.TestCase):
def setUp(self):
super().setUp()
api_mock = mock.patch.object(
api.neutron,
'is_extension_floating_ip_port_forwarding_supported').start()
api_mock.return_value = True
@test.create_mocks({api.neutron: ('tenant_floating_ip_get',
'floating_ip_port_forwarding_list')})
def test_floating_ip_portforwarding(self):
fip = self._get_fip_targets()[0]
self.mock_tenant_floating_ip_get.return_value = fip
fip_id = fip.id
self.mock_floating_ip_port_forwarding_list.return_value = (
fip.port_forwardings)
params = urlencode({'floating_ip_id': fip_id})
url = '?'.join([reverse('%s:show' % NAMESPACE), params])
res = self.client.get(url)
self.assertTemplateUsed(res, table_views.DataTableView.template_name)
table_data = res.context_data['table'].data
self.assertEqual(len(table_data), 1)
self.assertEqual(fip.port_forwardings[0].id, table_data[0].id)
@test.create_mocks({api.neutron: ('floating_ip_port_forwarding_list',
'tenant_floating_ip_list')})
def test_floating_ip_portforwarding_all(self):
fips = self._get_fip_targets()
self.mock_tenant_floating_ip_list.return_value = fips
fips_dict = {}
for f in fips:
fips_dict[f.id] = f.port_forwardings
def pfw_list(request, fip_id):
return fips_dict[fip_id]
self.mock_floating_ip_port_forwarding_list.side_effect = pfw_list
url = reverse('%s:index' % NAMESPACE)
res = self.client.get(url)
self.assertTemplateUsed(res, table_views.DataTableView.template_name)
table_data = res.context_data['table'].data
self.assertEqual(len(table_data), len(fips))
for pfw in table_data:
self.assertIn(pfw.id, list(map(lambda x: x.id,
fips_dict[pfw.floating_ip_id])))
def _get_compute_ports(self):
return [p for p in self.ports.list()
if not p.device_owner.startswith('network:')]
def _get_fip_targets(self):
server_dict = dict((s.id, s.name) for s in self.servers.list())
targets = []
port = 10
for p in self._get_compute_ports():
for ip in p.fixed_ips:
targets.append(api.neutron.FloatingIpTarget(
p, ip['ip_address'], server_dict.get(p.device_id)))
targets[-1].ip = ip['ip_address']
targets[-1].port_id = None
targets[-1].port_forwardings = [api.neutron.PortForwarding({
'id': str(uuid.uuid4()),
'floating_ip_id': targets[-1].id,
'protocol': 'TCP',
'internal_port_range': str(port),
'external_port_range': str(port + 10),
'internal_ip_address': ip['ip_address'],
'description': '',
'internal_port_id': '',
'external_ip_address': ''}, targets[-1].id)]
port += 1
return targets
@test.create_mocks({api.neutron: ('tenant_floating_ip_get',
'floating_ip_port_forwarding_list',
'floating_ip_target_list')})
def test_create_floating_ip_portforwarding(self):
fip = self._get_fip_targets()[0]
self.mock_tenant_floating_ip_get.return_value = fip
fip_id = fip.id
self.mock_floating_ip_port_forwarding_list.return_value = (
fip.port_forwardings)
self.mock_floating_ip_target_list.return_value = [fip]
params = urlencode({'floating_ip_id': fip_id})
url = '?'.join([reverse('%s:create' % NAMESPACE), params])
res = self.client.get(url)
self.assertTemplateUsed(res, views.WorkflowView.template_name)
workflow = res.context['workflow']
choices = dict(
workflow.steps[0].action.fields[
'internal_ip_address'].choices)
choices.pop('Select an IP-Address')
self.assertEqual({fip.id}, set(choices.keys()))
@test.create_mocks({api.neutron: ('tenant_floating_ip_get',
'floating_ip_port_forwarding_list',
'floating_ip_port_forwarding_create',
'floating_ip_target_list')})
def test_create_floating_ip_portforwarding_post(self):
fip = self._get_fip_targets()[0]
self.mock_tenant_floating_ip_get.return_value = fip
fip_id = fip.id
self.mock_floating_ip_port_forwarding_list.return_value = (
fip.port_forwardings)
self.mock_floating_ip_target_list.return_value = [fip]
create_mock = self.mock_floating_ip_port_forwarding_create
params = urlencode({'floating_ip_id': fip_id})
url = '?'.join([reverse('%s:create' % NAMESPACE), params])
port = self.ports.get(id=fip.id.split('_')[0])
internal_ip = '%s_%s' % (port.id, port.fixed_ips[0]['ip_address'])
post_params = {
'floating_ip_id': fip_id,
'description': 'test',
'internal_port': '10',
'protocol': 'TCP',
'internal_ip_address': internal_ip,
'external_port': '123',
}
expected_params = {
'description': 'test',
'internal_port': '10',
'protocol': 'TCP',
'internal_port_id': internal_ip.split('_')[0],
'internal_ip_address': internal_ip.split('_')[1],
'external_port': '123',
}
self.client.post(url, post_params)
create_mock.assert_called_once_with(mock.ANY, fip_id,
**expected_params)
@test.create_mocks({api.neutron: ('tenant_floating_ip_get',
'floating_ip_port_forwarding_list',
'floating_ip_target_list',
'floating_ip_port_forwarding_get')})
def test_update_floating_ip_portforwarding(self):
fip = self._get_fip_targets()[0]
self.mock_tenant_floating_ip_get.return_value = fip
fip_id = fip.id
self.mock_floating_ip_port_forwarding_list.return_value = (
fip.port_forwardings)
self.mock_floating_ip_target_list.return_value = [fip]
self.mock_floating_ip_port_forwarding_get.return_value = {
'port_forwarding': fip.port_forwardings[0].to_dict()
}
params = urlencode({'floating_ip_id': fip_id,
'pfwd_id': fip.port_forwardings[0]['id']})
url = '?'.join([reverse('%s:edit' % NAMESPACE), params])
res = self.client.get(url)
self.assertTemplateUsed(res, views.WorkflowView.template_name)
workflow = res.context['workflow']
self.assertEqual(workflow.steps[0].action.initial['floating_ip_id'],
fip.port_forwardings[0]['floating_ip_id'])
self.assertEqual(workflow.steps[0].action.initial['portforwading_id'],
fip.port_forwardings[0]['id'])
self.assertEqual(workflow.steps[0].action.initial['protocol'],
fip.port_forwardings[0]['protocol'])
self.assertEqual(workflow.steps[0].action.initial['internal_port'],
fip.port_forwardings[0]['internal_port_range'])
self.assertEqual(workflow.steps[0].action.initial['external_port'],
fip.port_forwardings[0]['external_port_range'])
self.assertEqual(workflow.steps[0].action.initial['description'],
fip.port_forwardings[0]['description'])
@test.create_mocks({api.neutron: ('tenant_floating_ip_get',
'floating_ip_port_forwarding_list',
'floating_ip_target_list',
'floating_ip_port_forwarding_update',
'floating_ip_port_forwarding_get')})
def test_update_floating_ip_portforwarding_post(self):
fip = self._get_fip_targets()[0]
self.mock_tenant_floating_ip_get.return_value = fip
fip_id = fip.id
self.mock_floating_ip_port_forwarding_list.return_value = (
fip.port_forwardings)
self.mock_floating_ip_target_list.return_value = [fip]
self.mock_floating_ip_port_forwarding_get.return_value = {
'port_forwarding': fip.port_forwardings[0].to_dict()
}
update_mock = self.mock_floating_ip_port_forwarding_update
pfw_id = fip.port_forwardings[0]['id']
params = urlencode({'floating_ip_id': fip_id,
'pfwd_id': pfw_id})
url = '?'.join([reverse('%s:edit' % NAMESPACE), params])
port = self.ports.get(id=fip.id.split('_')[0])
internal_ip = '%s_%s' % (port.id, port.fixed_ips[0]['ip_address'])
post_params = {
'portforwading_id': pfw_id,
'floating_ip_id': fip_id,
'description': 'test',
'internal_port': '10',
'protocol': 'TCP',
'internal_ip_address': internal_ip,
'external_port': '123',
}
expected_params = {
'portforwarding_id': pfw_id,
'description': 'test',
'internal_port': '10',
'protocol': 'TCP',
'internal_port_id': internal_ip.split('_')[0],
'internal_ip_address': internal_ip.split('_')[1],
'external_port': '123',
}
self.client.post(url, post_params)
update_mock.assert_called_once_with(mock.ANY, fip_id,
**expected_params)
@test.create_mocks({api.neutron: ('tenant_floating_ip_get',
'floating_ip_port_forwarding_list',
'floating_ip_port_forwarding_delete')})
def test_delete_floating_ip_portforwarding(self):
fip = self._get_fip_targets()[0]
self.mock_tenant_floating_ip_get.return_value = fip
fip_id = fip.id
self.mock_floating_ip_port_forwarding_list.return_value = (
fip.port_forwardings)
deletion_mock = self.mock_floating_ip_port_forwarding_delete
pf_id = fip.port_forwardings[0].id
params = urlencode({'floating_ip_id': fip_id})
url = '?'.join([reverse('%s:show' % NAMESPACE), params])
self.client.post(url, {
'action': 'floating_ip_portforwardings__delete__%s' % pf_id})
deletion_mock.assert_called_once_with(mock.ANY, fip_id, pf_id)

View File

@ -0,0 +1,24 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from django.urls import re_path
from openstack_dashboard.dashboards.project.floating_ip_portforwardings import (
views)
urlpatterns = [
re_path(r'^$', views.AllRulesView.as_view(), name='index'),
re_path(r'^show$', views.IndexView.as_view(), name='show'),
re_path(r'^create/$', views.CreateView.as_view(), name='create'),
re_path(r'^edit/$', views.EditView.as_view(), name='edit'),
re_path(r'^editToAll/$', views.EditToAllView.as_view(), name='editToAll')
]

View File

@ -0,0 +1,110 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Views for managing floating IPs port forwardings
"""
import logging
from django.utils.translation import gettext_lazy as _
from neutronclient.common import exceptions as neutron_exc
from horizon import exceptions
from horizon import tables
from horizon import workflows
from openstack_dashboard import api
from openstack_dashboard.dashboards.project.floating_ip_portforwardings import (
tables as project_tables)
from openstack_dashboard.dashboards.project.floating_ip_portforwardings import (
workflows as project_workflows)
LOG = logging.getLogger(__name__)
class CreateView(workflows.WorkflowView):
workflow_class = (
project_workflows.FloatingIpPortForwardingRuleCreationWorkflow)
class EditView(workflows.WorkflowView):
workflow_class = project_workflows.FloatingIpPortForwardingRuleEditWorkflow
class EditToAllView(workflows.WorkflowView):
workflow_class = (
project_workflows.FloatingIpPortForwardingRuleEditWorkflowToAll)
class IndexView(tables.DataTableView):
table_class = project_tables.FloatingIpPortForwardingRulesTable
page_title = _("Manage floating IP port forwarding rules")
def get_data(self):
try:
floating_ip_id = self.request.GET.get('floating_ip_id')
floating_ip = api.neutron.tenant_floating_ip_get(self.request,
floating_ip_id)
self.page_title = _(
"Manage floating IP port forwarding rules : " + str(
floating_ip.ip))
return self.get_floating_ip_rules(floating_ip)
except neutron_exc.ConnectionFailed:
exceptions.handle(self.request)
except Exception:
exceptions.handle(
self.request,
_('Unable to retrieve floating IP port forwarding rules.'))
return []
def get_floating_ip_rules(self, floating_ip):
if floating_ip.port_id:
return []
floating_ip_portforwarding_rules = []
external_ip_address = floating_ip.ip
floating_ip_id = floating_ip.id
port_forwarding_rules = api.neutron.floating_ip_port_forwarding_list(
self.request, floating_ip_id)
for port_forwarding_rule in port_forwarding_rules:
setattr(port_forwarding_rule, 'external_ip_address',
external_ip_address)
floating_ip_portforwarding_rules.extend(port_forwarding_rules)
return floating_ip_portforwarding_rules
class AllRulesView(IndexView):
table_class = project_tables.AllFloatingIpPortForwardingRulesTable
def get_data(self):
try:
return self.get_all_floating_ip_rules()
except neutron_exc.ConnectionFailed:
exceptions.handle(self.request)
except Exception:
exceptions.handle(
self.request,
_('Unable to retrieve floating IP port forwarding rules.'))
return []
def get_all_floating_ip_rules(self):
floating_ip_portforwarding_rules = []
floating_ips = api.neutron.tenant_floating_ip_list(self.request)
for floating_ip in floating_ips:
floating_ip_portforwarding_rules.extend(
self.get_floating_ip_rules(floating_ip))
return floating_ip_portforwarding_rules

View File

@ -0,0 +1,270 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from django.core.exceptions import ValidationError
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from neutronclient.common import exceptions as neutron_exc
from horizon import exceptions
from horizon import forms
from horizon import workflows
from openstack_dashboard import api
from openstack_dashboard.dashboards.project.floating_ip_portforwardings import (
tables as project_tables)
LOG = logging.getLogger(__name__)
class CommonMetaData(object):
name = _("Description")
help_text = _(
"Description:"
""
"IP floating rules define external specific traffic that is bound "
"from a public IP to an internal address of a specific port.\n"
"Protocol: The protocol configured for the IP forwarding rule. "
"You can choose between TCP and UDP.\n"
"External port: The external port of the floating IP that"
" will be "
"bound to the internal port in the internal address. This field"
" allow values "
"between 1 and 65535 and also support ranges using the following"
" format:\n"
"InitialPort:FinalPort where InitialPort <= FinalPort.\n"
"Internal port: The internal port of the given internal IP "
"address that will be bound to the port that is exposed to the "
"internet via the public floating IP. This field allow values "
"between 1 and 65535 and also support ranges using the following"
" format:\n"
"InitialPort:FinalPort where InitialPort <= FinalPort.\n"
"Internal IP address: The internal IP address where the "
"internal ports will be running.\n"
"Description: Describes the reason why this rule is being "
"created.")
class CreateFloatingIpPortForwardingRuleAction(workflows.Action):
protocol = forms.ThemableChoiceField(
required=True,
choices=project_tables.PROTOCOL_CHOICES,
label=_("Protocol"))
external_port = forms.CharField(max_length=11, label=_("External port"))
internal_port = forms.CharField(max_length=11, label=_("Internal port"))
internal_ip_address = forms.ThemableChoiceField(required=True, label=_(
"Internal IP address"))
description = forms.CharField(required=False, widget=forms.Textarea,
max_length=255, label=_("Description"))
floating_ip_id = forms.CharField(max_length=255,
widget=forms.HiddenInput())
class Meta(CommonMetaData):
pass
def ignore_validation(self, portforward=None):
return False
def validate_input_selects(self):
err_msg = "You must select a%s"
internal_ip_address = self.cleaned_data.get('internal_ip_address')
protocol = self.cleaned_data.get('protocol')
if protocol == "Select a protocol":
raise ValidationError(message=err_msg % " Protocol.")
if internal_ip_address in ('Select an IP-Address',
'No ports available'):
raise ValidationError(message=err_msg % "n Ip-Address.")
def clean(self):
request = self.request
if request.method == "GET":
return self.cleaned_data
self.validate_input_selects()
return self.cleaned_data
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
floating_ip_id = self.request.GET.get('floating_ip_id')
self.initial['floating_ip_id'] = floating_ip_id
def populate_internal_ip_address_choices(self, request, context):
targets = api.neutron.floating_ip_target_list(self.request)
instances = sorted([(target.id, target.name) for target in targets],
key=lambda x: x[1])
if instances:
instances.insert(0, ("Select an IP-Address", _(
"Select an IP-Address")))
else:
instances = (("No ports available", _(
"No ports available")),)
return instances
class EditFloatingIpPortForwardingRuleAction(
CreateFloatingIpPortForwardingRuleAction):
portforwading_id = forms.CharField(max_length=255,
widget=forms.HiddenInput())
instance_id = None
class Meta(CommonMetaData):
pass
def ignore_validation(self, portforward=None):
return (super().ignore_validation(portforward) or
portforward.id == self.cleaned_data.get(
'portforwading_id'))
def __init__(self, *args, **kwargs):
request = args[0]
if request.method == 'POST':
super().__init__(
*args, **kwargs)
else:
floating_ip_id = request.GET.get('floating_ip_id')
port_forwarding_id = request.GET.get('pfwd_id')
port_forwarding = api.neutron.floating_ip_port_forwarding_get(
request, floating_ip_id, port_forwarding_id)
port_forwarding_rule = port_forwarding['port_forwarding']
self.instance_id = "%s_%s" % (
port_forwarding_rule['internal_port_id'],
port_forwarding_rule['internal_ip_address'])
super().__init__(
*args, **kwargs)
self.initial['portforwading_id'] = port_forwarding_id
self.initial['protocol'] = str(
port_forwarding_rule['protocol']).upper()
self.initial['internal_port'] = port_forwarding_rule[
'internal_port_range']
self.initial['external_port'] = port_forwarding_rule[
'external_port_range']
if 'description' in port_forwarding_rule.keys():
self.initial['description'] = port_forwarding_rule[
'description']
def populate_internal_ip_address_choices(self, request, context):
targets = api.neutron.floating_ip_target_list(self.request)
instances = sorted([(target.id, target.name) for target in targets],
key=lambda x: '0'
if x[0] == self.instance_id else x[1])
return instances
class CreateFloatingIpPortForwardingRule(workflows.Step):
action_class = CreateFloatingIpPortForwardingRuleAction
contributes = ("internal_port", "protocol", "external_port",
"internal_ip_address", "description", "floating_ip_id",
"portforwading_id")
def contribute(self, data, context):
context = super().contribute(data, context)
return context
class EditFloatingIpPortForwardingRule(
CreateFloatingIpPortForwardingRule):
action_class = EditFloatingIpPortForwardingRuleAction
def contribute(self, data, context):
context = super().contribute(data, context)
return context
class FloatingIpPortForwardingRuleCreationWorkflow(workflows.Workflow):
slug = "floating_ip_port_forwarding_rule_creation"
name = _("Add floating IP port forwarding rule")
finalize_button_name = _("Add")
success_message = _('Floating IP port forwarding rule %s created. '
'It might take a few minutes to apply all rules.')
failure_message = _('Unable to create floating IP port forwarding rule'
' %s.')
success_url = "horizon:project:floating_ip_portforwardings:show"
default_steps = (CreateFloatingIpPortForwardingRule,)
def format_status_message(self, message):
if "%s" in message:
return message % self.context.get('ip_address',
_('unknown IP address'))
return message
def handle_using_api_method(self, request, data, api_method,
**api_params):
try:
floating_ip_id = data['floating_ip_id']
self.success_url = reverse(
self.success_url) + "?floating_ip_id=" + str(
floating_ip_id)
port_id, internal_ip = data['internal_ip_address'].split('_')
self.context['ip_address'] = internal_ip
param = {}
if data['description']:
param['description'] = data['description']
if data['internal_port']:
param['internal_port'] = data['internal_port']
if data['external_port']:
param['external_port'] = data['external_port']
if internal_ip:
param['internal_ip_address'] = internal_ip
if data['protocol']:
param['protocol'] = data['protocol']
if port_id:
param['internal_port_id'] = port_id
param.update(**api_params)
api_method(request, floating_ip_id, **param)
except neutron_exc.Conflict as ex:
msg = _('The requested instance port is already'
' associated with another floating IP.')
LOG.exception(msg, ex)
exceptions.handle(request, msg)
self.failure_message = msg
return False
except Exception:
exceptions.handle(request)
return False
return True
def handle(self, request, data):
return self.handle_using_api_method(
request, data, api.neutron.floating_ip_port_forwarding_create)
class FloatingIpPortForwardingRuleEditWorkflow(
FloatingIpPortForwardingRuleCreationWorkflow):
slug = "floating_ip_port_forwarding_rule_edit"
name = _("Edit floating IP port forwarding rule")
finalize_button_name = _("Update")
success_message = _('Floating IP port forwarding rule %s updated. '
'It might take a few minutes to apply all rules.')
failure_message = _('Unable to updated floating IP port forwarding'
' rule %s.')
success_url = "horizon:project:floating_ip_portforwardings:show"
default_steps = (EditFloatingIpPortForwardingRule,)
def handle(self, request, data):
return self.handle_using_api_method(
request, data, api.neutron.floating_ip_port_forwarding_update,
portforwarding_id=data['portforwading_id'])
class FloatingIpPortForwardingRuleEditWorkflowToAll(
FloatingIpPortForwardingRuleEditWorkflow):
slug = "floating_ip_port_forwarding_rule_edit_all"
success_url = "horizon:project:floating_ip_portforwardings:index"

View File

@ -90,12 +90,60 @@ class ReleaseIPs(tables.BatchAction):
def allowed(self, request, fip=None):
policy_rules = (("network", "delete_floatingip"),)
return policy.check(policy_rules, request)
port_forwarding_occurrence = 0
if fip:
pwds = fip.port_forwardings
port_forwarding_occurrence = len(pwds)
return port_forwarding_occurrence == 0 and policy.check(policy_rules,
request)
def action(self, request, obj_id):
api.neutron.tenant_floating_ip_release(request, obj_id)
class ReleaseIPsPortForwarding(ReleaseIPs):
name = "release_floating_ip_portforwarding_rule"
help_text = _(
"This floating IP has port forwarding rules configured to it."
" Therefore,"
" you will need to remove all of these rules before being able"
" to release it.")
def __init__(self, **kwargs):
attributes = {"title": "Release Floating IP with port forwarding rules",
"confirm-button-text": "Edit floating IP port"
" forwarding rules"}
super().__init__(attrs=attributes, **kwargs)
@staticmethod
def action_past(count):
return ngettext_lazy(
u"Successfully redirected",
u"Successfully redirected",
count
)
def allowed(self, request, fip=None):
policy_rules = (("network", "delete_floatingip_port_forwarding"),)
pwds = fip.port_forwardings
return (
len(pwds) > 0 and
policy.check(policy_rules, request) and
api.neutron.is_extension_floating_ip_port_forwarding_supported(
request)
)
def action(self, request, obj_id):
self.success_url = reverse(
'horizon:project:floating_ip_portforwardings:show') \
+ '?floating_ip_id=' \
+ str(obj_id)
class AssociateIP(tables.LinkAction):
name = "associate"
verbose_name = _("Associate")
@ -105,7 +153,9 @@ class AssociateIP(tables.LinkAction):
def allowed(self, request, fip):
policy_rules = (("network", "update_floatingip"),)
return not fip.port_id and policy.check(policy_rules, request)
pwds = fip.port_forwardings
return len(pwds) == 0 and not fip.port_id and policy.check(policy_rules,
request)
def get_link_url(self, datum):
base_url = reverse(self.url)
@ -113,6 +163,59 @@ class AssociateIP(tables.LinkAction):
return "?".join([base_url, params])
class ListAllFloatingIpPortForwardingRules(tables.LinkAction):
name = "List floating_ip_portforwardings_rules"
verbose_name = _("List all floating IP port forwarding rules")
url = "horizon:project:floating_ip_portforwardings:index"
classes = ("btn-edit",)
icon = "link"
def exists_floating_ip_with_port_forwarding_rules_configurable(self,
request):
floating_ips = api.neutron.tenant_floating_ip_list(request)
for floating_ip in floating_ips:
if not floating_ip.port_id:
return True
return False
def allowed(self, request, fip):
policy_rules = (("network", "get_floatingip_port_forwarding"),)
return (self.exists_floating_ip_with_port_forwarding_rules_configurable(
request) and policy.check(policy_rules, request) and
api.neutron.is_extension_floating_ip_port_forwarding_supported(
request))
class ConfigureFloatingIpPortForwarding(tables.Action):
name = "configure_floating_ip_portforwarding_rules"
verbose_name = _("Configure floating IP port forwarding rules")
classes = ("btn-edit",)
icon = "link"
def allowed(self, request, fip):
policy_rules = (("network", "get_floatingip_port_forwarding"),)
return (
not fip.port_id and
policy.check(policy_rules, request) and
api.neutron.is_extension_floating_ip_port_forwarding_supported(
request)
)
def single(self, table, request, obj_id):
fip = {}
try:
fip = table.get_object_by_id(filters.get_int_or_uuid(obj_id))
except Exception as ex:
err_msg = 'Unable to find a floating IP.'
LOG.debug(err_msg, ex)
exceptions.handle(request,
_('Unable to find a floating IP.'))
return shortcuts.redirect(
reverse('horizon:project:floating_ip_portforwardings:show') +
'?floating_ip_id=' + str(fip.id))
class DisassociateIP(tables.Action):
name = "disassociate"
verbose_name = _("Disassociate")
@ -163,7 +266,6 @@ STATUS_DISPLAY_CHOICES = (
("error", pgettext_lazy("Current status of a Floating IP", "Error")),
)
FLOATING_IPS_FILTER_CHOICES = (
('floating_ip_address', _('Floating IP Address ='), True),
('network_id', _('Network ID ='), True),
@ -224,5 +326,9 @@ class FloatingIPsTable(tables.DataTable):
class Meta(object):
name = "floating_ips"
verbose_name = _("Floating IPs")
table_actions = (AllocateIP, ReleaseIPs, FloatingIPsFilterAction)
row_actions = (AssociateIP, DisassociateIP, ReleaseIPs)
table_actions = (
ListAllFloatingIpPortForwardingRules, AllocateIP, ReleaseIPs,
FloatingIPsFilterAction)
row_actions = (AssociateIP, DisassociateIP, ReleaseIPs,
ReleaseIPsPortForwarding,
ConfigureFloatingIpPortForwarding)

View File

@ -35,6 +35,13 @@ NAMESPACE = "horizon:project:floating_ips"
class FloatingIpViewTests(test.TestCase):
def setUp(self):
super().setUp()
api_mock = mock.patch.object(
api.neutron,
'is_extension_floating_ip_port_forwarding_supported').start()
api_mock.return_value = True
@test.create_mocks({api.neutron: ('floating_ip_target_list',
'tenant_floating_ip_list')})
def test_associate(self):
@ -42,7 +49,6 @@ class FloatingIpViewTests(test.TestCase):
self._get_fip_targets()
self.mock_tenant_floating_ip_list.return_value = \
self.floating_ips.list()
url = reverse('%s:associate' % NAMESPACE)
res = self.client.get(url)
self.assertTemplateUsed(res, views.WorkflowView.template_name)
@ -91,6 +97,7 @@ class FloatingIpViewTests(test.TestCase):
for ip in p.fixed_ips:
targets.append(api.neutron.FloatingIpTarget(
p, ip['ip_address'], server_dict.get(p.device_id)))
targets[-1].port_forwardings = []
return targets
@staticmethod
@ -213,12 +220,14 @@ class FloatingIpViewTests(test.TestCase):
@test.create_mocks({api.nova: ('server_list',),
api.neutron: ('floating_ip_disassociate',
'floating_ip_pools_list',
'floating_ip_port_forwarding_list',
'is_extension_supported',
'tenant_floating_ip_list')})
def test_disassociate_post(self):
floating_ip = self.floating_ips.first()
self.mock_is_extension_supported.return_value = False
self.mock_floating_ip_port_forwarding_list.return_value = []
self.mock_server_list.return_value = [self.servers.list(), False]
self.mock_tenant_floating_ip_list.return_value = \
self.floating_ips.list()
@ -243,6 +252,7 @@ class FloatingIpViewTests(test.TestCase):
@test.create_mocks({api.nova: ('server_list',),
api.neutron: ('floating_ip_disassociate',
'floating_ip_port_forwarding_list',
'floating_ip_pools_list',
'is_extension_supported',
'tenant_floating_ip_list')})
@ -250,6 +260,7 @@ class FloatingIpViewTests(test.TestCase):
floating_ip = self.floating_ips.first()
self.mock_is_extension_supported.return_value = False
self.mock_floating_ip_port_forwarding_list.return_value = []
self.mock_server_list.return_value = [self.servers.list(), False]
self.mock_tenant_floating_ip_list.return_value = \
self.floating_ips.list()
@ -273,6 +284,7 @@ class FloatingIpViewTests(test.TestCase):
@test.create_mocks({api.neutron: ('tenant_floating_ip_list',
'is_extension_supported',
'floating_ip_port_forwarding_list',
'floating_ip_pools_list'),
api.nova: ('server_list',),
quotas: ('tenant_quota_usages',)})
@ -283,6 +295,7 @@ class FloatingIpViewTests(test.TestCase):
self.mock_is_extension_supported.return_value = False
self.mock_tenant_floating_ip_list.return_value = floating_ips
self.mock_floating_ip_port_forwarding_list.return_value = []
self.mock_floating_ip_pools_list.return_value = floating_pools
self.mock_server_list.return_value = [self.servers.list(), False]
self.mock_tenant_quota_usages.return_value = quota_data
@ -298,9 +311,9 @@ class FloatingIpViewTests(test.TestCase):
url = 'horizon:project:floating_ips:allocate'
self.assertEqual(url, allocate_action.url)
self.mock_tenant_floating_ip_list.assert_called_once_with(
self.mock_tenant_floating_ip_list.assert_called_with(
test.IsHttpRequest())
self.mock_floating_ip_pools_list.assert_called_once_with(
self.mock_floating_ip_pools_list.assert_called_with(
test.IsHttpRequest())
self.mock_server_list.assert_called_once_with(test.IsHttpRequest(),
detailed=False)
@ -313,6 +326,7 @@ class FloatingIpViewTests(test.TestCase):
@test.create_mocks({api.neutron: ('tenant_floating_ip_list',
'is_extension_supported',
'floating_ip_port_forwarding_list',
'floating_ip_pools_list'),
api.nova: ('server_list',),
quotas: ('tenant_quota_usages',)})
@ -324,6 +338,7 @@ class FloatingIpViewTests(test.TestCase):
self.mock_is_extension_supported.return_value = False
self.mock_tenant_floating_ip_list.return_value = floating_ips
self.mock_floating_ip_port_forwarding_list.return_value = []
self.mock_floating_ip_pools_list.return_value = floating_pools
self.mock_server_list.return_value = [self.servers.list(), False]
self.mock_tenant_quota_usages.return_value = quota_data
@ -337,9 +352,9 @@ class FloatingIpViewTests(test.TestCase):
self.assertEqual('Allocate IP To Project (Quota exceeded)',
allocate_action.verbose_name)
self.mock_tenant_floating_ip_list.assert_called_once_with(
self.mock_tenant_floating_ip_list.assert_called_with(
test.IsHttpRequest())
self.mock_floating_ip_pools_list.assert_called_once_with(
self.mock_floating_ip_pools_list.assert_called_with(
test.IsHttpRequest())
self.mock_server_list.assert_called_once_with(test.IsHttpRequest(),
detailed=False)

View File

@ -20,6 +20,7 @@
"""
Views for managing floating IPs.
"""
import logging
from django.urls import reverse_lazy
from django.utils.translation import gettext_lazy as _
@ -41,6 +42,8 @@ from openstack_dashboard.dashboards.project.floating_ips \
from openstack_dashboard.dashboards.project.floating_ips \
import workflows as project_workflows
LOG = logging.getLogger(__name__)
class AssociateView(workflows.WorkflowView):
workflow_class = project_workflows.IPAssociationWorkflow
@ -129,8 +132,20 @@ class IndexView(tables.DataTableView):
instances_dict = dict((obj.id, obj.name) for obj in instances)
fip_pfw_enabled = (
api.neutron.is_extension_floating_ip_port_forwarding_supported(
self.request))
for ip in floating_ips:
ip.instance_name = instances_dict.get(ip.instance_id)
ip.pool_name = pool_dict.get(ip.pool, ip.pool)
if fip_pfw_enabled:
try:
pfws = api.neutron.floating_ip_port_forwarding_list(
self.request, ip.id)
ip.port_forwardings = pfws
except Exception as e:
LOG.info("Error fetching port forwardings for floating IP"
" %s: %s", ip.id, e)
return floating_ips

View File

@ -94,7 +94,8 @@ class AssociateIPAction(workflows.Action):
exceptions.handle(self.request,
_('Unable to retrieve floating IP addresses.'),
redirect=redirect)
options = sorted([(ip.id, ip.ip) for ip in ips if not ip.port_id])
options = sorted([(ip.id, ip.ip) for ip in ips if
not ip.port_id and len(ip.port_forwardings) == 0])
if options:
options.insert(0, ("", _("Select an IP address")))
else:

View File

@ -0,0 +1,7 @@
PANEL_DASHBOARD = 'project'
PANEL_GROUP = 'network'
PANEL = 'floating_ip_portforwardings'
ADD_PANEL = \
'openstack_dashboard.dashboards.project.floating_ip_portforwardings.panel' \
'.FloatingIpPortforwardingRules'

View File

@ -34,6 +34,7 @@ def data(TEST):
TEST.routers_with_rules = utils.TestDataContainer()
TEST.routers_with_routes = utils.TestDataContainer()
TEST.floating_ips = utils.TestDataContainer()
TEST.port_forwardings = utils.TestDataContainer()
TEST.security_groups = utils.TestDataContainer()
TEST.security_group_rules = utils.TestDataContainer()
TEST.providers = utils.TestDataContainer()
@ -63,6 +64,7 @@ def data(TEST):
TEST.api_routers = utils.TestDataContainer()
TEST.api_routers_with_routes = utils.TestDataContainer()
TEST.api_floating_ips = utils.TestDataContainer()
TEST.api_port_forwardings = utils.TestDataContainer()
TEST.api_security_groups = utils.TestDataContainer()
TEST.api_security_group_rules = utils.TestDataContainer()
TEST.api_pools = utils.TestDataContainer()
@ -647,6 +649,7 @@ def data(TEST):
'id': '9012cd70-cfae-4e46-b71e-6a409e9e0063',
'fixed_ip_address': None,
'port_id': None,
'port_forwardings': [],
'router_id': None}
TEST.api_floating_ips.add(fip_dict)
fip_with_instance = copy.deepcopy(fip_dict)
@ -659,6 +662,7 @@ def data(TEST):
'floating_ip_address': '172.16.88.228',
'floating_network_id': ext_net['id'],
'id': 'a97af8f2-3149-4b97-abbd-e49ad19510f7',
'port_forwardings': [],
'fixed_ip_address': assoc_port['fixed_ips'][0]['ip_address'],
'port_id': assoc_port['id'],
'router_id': router_dict['id']}
@ -668,6 +672,46 @@ def data(TEST):
'instance_type': 'compute'})
TEST.floating_ips.add(neutron.FloatingIp(fip_with_instance))
# port forwardings
TEST.api_port_forwardings.add({
"protocol": "tcp",
"internal_ip_address": "10.0.0.11",
"internal_port": 25,
"internal_port_id": "1238be08-a2a8-4b8d-addf-fb5e2250e480",
"external_port": 2230,
"internal_port_range": "25:25",
"external_port_range": "2230:2230",
"description": "",
"id": "e0a0274e-4d19-4eab-9e12-9e77a8caf3ea"
})
TEST.api_port_forwardings.add({
"protocol": "tcp",
"internal_port": 80,
"external_port": 8080,
"internal_ip_address": "10.0.0.12",
"internal_port_range": "80:90",
"internal_port_id": "2057ec54-8be2-11eb-8dcd-0242ac130003",
"external_port_range": "8080:8090",
"description": "using port ranges",
"id": "0f23a90a-8be2-11eb-8dcd-0242ac130003"
})
TEST.api_port_forwardings.add({
"protocol": "tcp",
"internal_ip_address": "10.0.0.24",
"internal_port": 25,
"internal_port_id": "070ef0b2-0175-4299-be5c-01fea8cca522",
"external_port": 2229,
"internal_port_range": "25:25",
"external_port_range": "2229:2229",
"description": "Some description",
"id": "1798dc82-c0ed-4b79-b12d-4c3c18f90eb2"
})
TEST.port_forwardings.add(neutron.PortForwarding(
TEST.api_port_forwardings.first(), fip_dict['id']
))
# Security group.
sec_group_1 = {'tenant_id': '1',

View File

@ -2321,6 +2321,81 @@ class NeutronApiSecurityGroupTests(test.APIMockTestCase):
self.qclient.update_port.assert_has_calls(expected_calls)
class NeutronApiFloatingIpPortForwardingTest(test.APIMockTestCase):
def setUp(self):
super().setUp()
neutronclient = mock.patch.object(api.neutron, 'neutronclient').start()
self.client_mock = neutronclient.return_value
def test_port_forwarding_list(self):
pfws = {'port_forwardings': self.api_port_forwardings.list()}
self.client_mock.list_port_forwardings.return_value = pfws
response = api.neutron.floating_ip_port_forwarding_list(
self.request, 'fip')
for i in range(len(response)):
resp_val = response[i]
expected_val = pfws['port_forwardings'][i]
for attr in resp_val.to_dict():
self.assertEqual(getattr(resp_val, attr), expected_val[attr])
self.client_mock.list_port_forwardings.assert_called_once_with('fip')
def test_port_forwarding_get(self):
pfw = self.api_port_forwardings.first()
pfw_id = pfw['id']
self.client_mock.show_port_forwarding.return_value = pfw
response = api.neutron.floating_ip_port_forwarding_get(
self.request, 'fip', pfw_id)
for attr in response.to_dict():
self.assertEqual(getattr(response, attr), pfw[attr])
self.client_mock.show_port_forwarding.assert_called_once_with(
'fip', pfw_id)
def test_port_forwarding_create(self):
pfw_resp_mock = {'port_forwarding': self.api_port_forwardings.first()}
pfw_expected = self.port_forwardings.get().to_dict()
pfw = {
"protocol": "tcp",
"internal_ip_address": "10.0.0.24",
"internal_port": 25,
"internal_port_id": "070ef0b2-0175-4299-be5c-01fea8cca522",
"external_port": 2229,
"description": "Some description",
}
self.client_mock.create_port_forwarding.return_value = pfw_resp_mock
response = api.neutron.floating_ip_port_forwarding_create(
self.request, 'fip', **pfw)
for attr in response.to_dict():
self.assertEqual(getattr(response, attr), pfw_expected[attr])
self.client_mock.create_port_forwarding.assert_called_once_with(
'fip', {'port_forwarding': pfw})
def test_port_forwarding_update(self):
pfw_resp_mock = {'port_forwarding': self.api_port_forwardings.first()}
pfw_expected = self.port_forwardings.get().to_dict()
pfw_id = pfw_resp_mock['port_forwarding']['id']
pfw = {
"protocol": "tcp",
"internal_port": 25,
"description": "Some description",
}
self.client_mock.update_port_forwarding.return_value = pfw_resp_mock
response = api.neutron.floating_ip_port_forwarding_update(
self.request, 'fip', portforwarding_id=pfw_id, **pfw)
for attr in response.to_dict():
self.assertEqual(getattr(response, attr), pfw_expected[attr])
self.client_mock.update_port_forwarding.assert_called_once_with(
'fip', pfw_id, {'port_forwarding': pfw})
def test_port_forwarding_delete(self):
pfw_id = self.api_port_forwardings.first()['id']
self.client_mock.delete_port_forwarding.return_value = None
api.neutron.floating_ip_port_forwarding_delete(
self.request, 'fip', pfw_id)
self.client_mock.delete_port_forwarding.assert_called_once_with(
'fip', pfw_id)
class NeutronApiFloatingIpTests(test.APIMockTestCase):
def setUp(self):

View File

@ -0,0 +1,8 @@
---
features:
- |
Add support to portforwardings in the Network Floating IPs dashboard.
Requires python-neutronclient >= 8.1.0
This feature is disabled by default.

View File

@ -37,7 +37,7 @@ pyScss>=1.4.0 # MIT License
python-cinderclient>=8.0.0 # Apache-2.0
python-glanceclient>=2.8.0 # Apache-2.0
python-keystoneclient>=3.22.0 # Apache-2.0
python-neutronclient>=6.7.0 # Apache-2.0
python-neutronclient>=8.1.0 # Apache-2.0
python-novaclient>=9.1.0 # Apache-2.0
python-swiftclient>=3.2.0 # Apache-2.0
pytz>=2013.6 # MIT