Move IP-address-related functions out to new module
Partial-Bug: #2015274 Change-Id: I7ffa3a8e95d4ec456860b0484caf1dd08ff0849a
This commit is contained in:
parent
ef64b63fdf
commit
e29e2c3ae5
@ -65,7 +65,6 @@ from eventlet.event import Event
|
||||
from eventlet.green import socket, threading
|
||||
import eventlet.hubs
|
||||
import eventlet.queue
|
||||
import netifaces
|
||||
import codecs
|
||||
utf8_decoder = codecs.getdecoder('utf-8')
|
||||
utf8_encoder = codecs.getencoder('utf-8')
|
||||
@ -118,7 +117,14 @@ from swift.common.utils.timestamp import ( # noqa
|
||||
last_modified_date_to_timestamp,
|
||||
normalize_delete_at_timestamp,
|
||||
)
|
||||
|
||||
from swift.common.utils.ipaddrs import ( # noqa
|
||||
is_valid_ip,
|
||||
is_valid_ipv4,
|
||||
is_valid_ipv6,
|
||||
expand_ipv6,
|
||||
parse_socket_string,
|
||||
whataremyips,
|
||||
)
|
||||
from logging.handlers import SysLogHandler
|
||||
import logging
|
||||
|
||||
@ -134,9 +140,6 @@ SWIFT_CONF_FILE = '/etc/swift/swift.conf'
|
||||
|
||||
O_TMPFILE = getattr(os, 'O_TMPFILE', 0o20000000 | os.O_DIRECTORY)
|
||||
|
||||
# Used by the parse_socket_string() function to validate IPv6 addresses
|
||||
IPV6_RE = re.compile(r"^\[(?P<address>.*)\](:(?P<port>[0-9]+))?$")
|
||||
|
||||
MD5_OF_EMPTY_STRING = 'd41d8cd98f00b204e9800998ecf8427e'
|
||||
RESERVED_BYTE = b'\x00'
|
||||
RESERVED_STR = u'\x00'
|
||||
@ -2051,125 +2054,6 @@ def parse_options(parser=None, once=False, test_args=None):
|
||||
return config, options
|
||||
|
||||
|
||||
def is_valid_ip(ip):
|
||||
"""
|
||||
Return True if the provided ip is a valid IP-address
|
||||
"""
|
||||
return is_valid_ipv4(ip) or is_valid_ipv6(ip)
|
||||
|
||||
|
||||
def is_valid_ipv4(ip):
|
||||
"""
|
||||
Return True if the provided ip is a valid IPv4-address
|
||||
"""
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET, ip)
|
||||
except socket.error: # not a valid IPv4 address
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_valid_ipv6(ip):
|
||||
"""
|
||||
Returns True if the provided ip is a valid IPv6-address
|
||||
"""
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET6, ip)
|
||||
except socket.error: # not a valid IPv6 address
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def expand_ipv6(address):
|
||||
"""
|
||||
Expand ipv6 address.
|
||||
:param address: a string indicating valid ipv6 address
|
||||
:returns: a string indicating fully expanded ipv6 address
|
||||
|
||||
"""
|
||||
packed_ip = socket.inet_pton(socket.AF_INET6, address)
|
||||
return socket.inet_ntop(socket.AF_INET6, packed_ip)
|
||||
|
||||
|
||||
def whataremyips(ring_ip=None):
|
||||
"""
|
||||
Get "our" IP addresses ("us" being the set of services configured by
|
||||
one `*.conf` file). If our REST listens on a specific address, return it.
|
||||
Otherwise, if listen on '0.0.0.0' or '::' return all addresses, including
|
||||
the loopback.
|
||||
|
||||
:param str ring_ip: Optional ring_ip/bind_ip from a config file; may be
|
||||
IP address or hostname.
|
||||
:returns: list of Strings of ip addresses
|
||||
"""
|
||||
if ring_ip:
|
||||
# See if bind_ip is '0.0.0.0'/'::'
|
||||
try:
|
||||
_, _, _, _, sockaddr = socket.getaddrinfo(
|
||||
ring_ip, None, 0, socket.SOCK_STREAM, 0,
|
||||
socket.AI_NUMERICHOST)[0]
|
||||
if sockaddr[0] not in ('0.0.0.0', '::'):
|
||||
return [ring_ip]
|
||||
except socket.gaierror:
|
||||
pass
|
||||
|
||||
addresses = []
|
||||
for interface in netifaces.interfaces():
|
||||
try:
|
||||
iface_data = netifaces.ifaddresses(interface)
|
||||
for family in iface_data:
|
||||
if family not in (netifaces.AF_INET, netifaces.AF_INET6):
|
||||
continue
|
||||
for address in iface_data[family]:
|
||||
addr = address['addr']
|
||||
|
||||
# If we have an ipv6 address remove the
|
||||
# %ether_interface at the end
|
||||
if family == netifaces.AF_INET6:
|
||||
addr = expand_ipv6(addr.split('%')[0])
|
||||
addresses.append(addr)
|
||||
except ValueError:
|
||||
pass
|
||||
return addresses
|
||||
|
||||
|
||||
def parse_socket_string(socket_string, default_port):
|
||||
"""
|
||||
Given a string representing a socket, returns a tuple of (host, port).
|
||||
Valid strings are DNS names, IPv4 addresses, or IPv6 addresses, with an
|
||||
optional port. If an IPv6 address is specified it **must** be enclosed in
|
||||
[], like *[::1]* or *[::1]:11211*. This follows the accepted prescription
|
||||
for `IPv6 host literals`_.
|
||||
|
||||
Examples::
|
||||
|
||||
server.org
|
||||
server.org:1337
|
||||
127.0.0.1:1337
|
||||
[::1]:1337
|
||||
[::1]
|
||||
|
||||
.. _IPv6 host literals: https://tools.ietf.org/html/rfc3986#section-3.2.2
|
||||
"""
|
||||
port = default_port
|
||||
# IPv6 addresses must be between '[]'
|
||||
if socket_string.startswith('['):
|
||||
match = IPV6_RE.match(socket_string)
|
||||
if not match:
|
||||
raise ValueError("Invalid IPv6 address: %s" % socket_string)
|
||||
host = match.group('address')
|
||||
port = match.group('port') or port
|
||||
else:
|
||||
if ':' in socket_string:
|
||||
tokens = socket_string.split(':')
|
||||
if len(tokens) > 2:
|
||||
raise ValueError("IPv6 addresses must be between '[]'")
|
||||
host, port = tokens
|
||||
else:
|
||||
host = socket_string
|
||||
return (host, port)
|
||||
|
||||
|
||||
def select_ip_port(node_dict, use_replication=False):
|
||||
"""
|
||||
Get the ip address and port that should be used for the given
|
||||
|
141
swift/common/utils/ipaddrs.py
Normal file
141
swift/common/utils/ipaddrs.py
Normal file
@ -0,0 +1,141 @@
|
||||
# Copyright (c) 2010-2012 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import netifaces
|
||||
import re
|
||||
import socket
|
||||
|
||||
|
||||
# Used by the parse_socket_string() function to validate IPv6 addresses
|
||||
IPV6_RE = re.compile(r"^\[(?P<address>.*)\](:(?P<port>[0-9]+))?$")
|
||||
|
||||
|
||||
def is_valid_ip(ip):
|
||||
"""
|
||||
Return True if the provided ip is a valid IP-address
|
||||
"""
|
||||
return is_valid_ipv4(ip) or is_valid_ipv6(ip)
|
||||
|
||||
|
||||
def is_valid_ipv4(ip):
|
||||
"""
|
||||
Return True if the provided ip is a valid IPv4-address
|
||||
"""
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET, ip)
|
||||
except socket.error: # not a valid IPv4 address
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_valid_ipv6(ip):
|
||||
"""
|
||||
Returns True if the provided ip is a valid IPv6-address
|
||||
"""
|
||||
try:
|
||||
socket.inet_pton(socket.AF_INET6, ip)
|
||||
except socket.error: # not a valid IPv6 address
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def expand_ipv6(address):
|
||||
"""
|
||||
Expand ipv6 address.
|
||||
:param address: a string indicating valid ipv6 address
|
||||
:returns: a string indicating fully expanded ipv6 address
|
||||
|
||||
"""
|
||||
packed_ip = socket.inet_pton(socket.AF_INET6, address)
|
||||
return socket.inet_ntop(socket.AF_INET6, packed_ip)
|
||||
|
||||
|
||||
def whataremyips(ring_ip=None):
|
||||
"""
|
||||
Get "our" IP addresses ("us" being the set of services configured by
|
||||
one `*.conf` file). If our REST listens on a specific address, return it.
|
||||
Otherwise, if listen on '0.0.0.0' or '::' return all addresses, including
|
||||
the loopback.
|
||||
|
||||
:param str ring_ip: Optional ring_ip/bind_ip from a config file; may be
|
||||
IP address or hostname.
|
||||
:returns: list of Strings of ip addresses
|
||||
"""
|
||||
if ring_ip:
|
||||
# See if bind_ip is '0.0.0.0'/'::'
|
||||
try:
|
||||
_, _, _, _, sockaddr = socket.getaddrinfo(
|
||||
ring_ip, None, 0, socket.SOCK_STREAM, 0,
|
||||
socket.AI_NUMERICHOST)[0]
|
||||
if sockaddr[0] not in ('0.0.0.0', '::'):
|
||||
return [ring_ip]
|
||||
except socket.gaierror:
|
||||
pass
|
||||
|
||||
addresses = []
|
||||
for interface in netifaces.interfaces():
|
||||
try:
|
||||
iface_data = netifaces.ifaddresses(interface)
|
||||
for family in iface_data:
|
||||
if family not in (netifaces.AF_INET, netifaces.AF_INET6):
|
||||
continue
|
||||
for address in iface_data[family]:
|
||||
addr = address['addr']
|
||||
|
||||
# If we have an ipv6 address remove the
|
||||
# %ether_interface at the end
|
||||
if family == netifaces.AF_INET6:
|
||||
addr = expand_ipv6(addr.split('%')[0])
|
||||
addresses.append(addr)
|
||||
except ValueError:
|
||||
pass
|
||||
return addresses
|
||||
|
||||
|
||||
def parse_socket_string(socket_string, default_port):
|
||||
"""
|
||||
Given a string representing a socket, returns a tuple of (host, port).
|
||||
Valid strings are DNS names, IPv4 addresses, or IPv6 addresses, with an
|
||||
optional port. If an IPv6 address is specified it **must** be enclosed in
|
||||
[], like *[::1]* or *[::1]:11211*. This follows the accepted prescription
|
||||
for `IPv6 host literals`_.
|
||||
|
||||
Examples::
|
||||
|
||||
server.org
|
||||
server.org:1337
|
||||
127.0.0.1:1337
|
||||
[::1]:1337
|
||||
[::1]
|
||||
|
||||
.. _IPv6 host literals: https://tools.ietf.org/html/rfc3986#section-3.2.2
|
||||
"""
|
||||
port = default_port
|
||||
# IPv6 addresses must be between '[]'
|
||||
if socket_string.startswith('['):
|
||||
match = IPV6_RE.match(socket_string)
|
||||
if not match:
|
||||
raise ValueError("Invalid IPv6 address: %s" % socket_string)
|
||||
host = match.group('address')
|
||||
port = match.group('port') or port
|
||||
else:
|
||||
if ':' in socket_string:
|
||||
tokens = socket_string.split(':')
|
||||
if len(tokens) > 2:
|
||||
raise ValueError("IPv6 addresses must be between '[]'")
|
||||
host, port = tokens
|
||||
else:
|
||||
host = socket_string
|
||||
return (host, port)
|
@ -65,7 +65,6 @@ from io import BytesIO
|
||||
from shutil import rmtree
|
||||
from functools import partial
|
||||
from tempfile import TemporaryFile, NamedTemporaryFile, mkdtemp
|
||||
from netifaces import AF_INET6
|
||||
from mock import MagicMock, patch
|
||||
from six.moves.configparser import NoSectionError, NoOptionError
|
||||
from uuid import uuid4
|
||||
@ -74,8 +73,7 @@ from swift.common.exceptions import Timeout, MessageTimeout, \
|
||||
ConnectionTimeout, LockTimeout, ReplicationLockTimeout, \
|
||||
MimeInvalid
|
||||
from swift.common import utils
|
||||
from swift.common.utils import is_valid_ip, is_valid_ipv4, is_valid_ipv6, \
|
||||
set_swift_dir, md5, ShardRangeList
|
||||
from swift.common.utils import set_swift_dir, md5, ShardRangeList
|
||||
from swift.common.container_sync_realms import ContainerSyncRealms
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.storage_policy import POLICIES, reload_storage_policies
|
||||
@ -1511,132 +1509,6 @@ class TestUtils(unittest.TestCase):
|
||||
self.assertEqual(utils.node_to_string(dev, replication=True),
|
||||
'[fe80::0204:61ff:ff9d:1234]:6400/sdb')
|
||||
|
||||
def test_is_valid_ip(self):
|
||||
self.assertTrue(is_valid_ip("127.0.0.1"))
|
||||
self.assertTrue(is_valid_ip("10.0.0.1"))
|
||||
ipv6 = "fe80:0000:0000:0000:0204:61ff:fe9d:f156"
|
||||
self.assertTrue(is_valid_ip(ipv6))
|
||||
ipv6 = "fe80:0:0:0:204:61ff:fe9d:f156"
|
||||
self.assertTrue(is_valid_ip(ipv6))
|
||||
ipv6 = "fe80::204:61ff:fe9d:f156"
|
||||
self.assertTrue(is_valid_ip(ipv6))
|
||||
ipv6 = "fe80:0000:0000:0000:0204:61ff:254.157.241.86"
|
||||
self.assertTrue(is_valid_ip(ipv6))
|
||||
ipv6 = "fe80:0:0:0:0204:61ff:254.157.241.86"
|
||||
self.assertTrue(is_valid_ip(ipv6))
|
||||
ipv6 = "fe80::204:61ff:254.157.241.86"
|
||||
self.assertTrue(is_valid_ip(ipv6))
|
||||
ipv6 = "fe80::"
|
||||
self.assertTrue(is_valid_ip(ipv6))
|
||||
ipv6 = "::1"
|
||||
self.assertTrue(is_valid_ip(ipv6))
|
||||
not_ipv6 = "3ffe:0b00:0000:0001:0000:0000:000a"
|
||||
self.assertFalse(is_valid_ip(not_ipv6))
|
||||
not_ipv6 = "1:2:3:4:5:6::7:8"
|
||||
self.assertFalse(is_valid_ip(not_ipv6))
|
||||
|
||||
def test_is_valid_ipv4(self):
|
||||
self.assertTrue(is_valid_ipv4("127.0.0.1"))
|
||||
self.assertTrue(is_valid_ipv4("10.0.0.1"))
|
||||
ipv6 = "fe80:0000:0000:0000:0204:61ff:fe9d:f156"
|
||||
self.assertFalse(is_valid_ipv4(ipv6))
|
||||
ipv6 = "fe80:0:0:0:204:61ff:fe9d:f156"
|
||||
self.assertFalse(is_valid_ipv4(ipv6))
|
||||
ipv6 = "fe80::204:61ff:fe9d:f156"
|
||||
self.assertFalse(is_valid_ipv4(ipv6))
|
||||
ipv6 = "fe80:0000:0000:0000:0204:61ff:254.157.241.86"
|
||||
self.assertFalse(is_valid_ipv4(ipv6))
|
||||
ipv6 = "fe80:0:0:0:0204:61ff:254.157.241.86"
|
||||
self.assertFalse(is_valid_ipv4(ipv6))
|
||||
ipv6 = "fe80::204:61ff:254.157.241.86"
|
||||
self.assertFalse(is_valid_ipv4(ipv6))
|
||||
ipv6 = "fe80::"
|
||||
self.assertFalse(is_valid_ipv4(ipv6))
|
||||
ipv6 = "::1"
|
||||
self.assertFalse(is_valid_ipv4(ipv6))
|
||||
not_ipv6 = "3ffe:0b00:0000:0001:0000:0000:000a"
|
||||
self.assertFalse(is_valid_ipv4(not_ipv6))
|
||||
not_ipv6 = "1:2:3:4:5:6::7:8"
|
||||
self.assertFalse(is_valid_ipv4(not_ipv6))
|
||||
|
||||
def test_is_valid_ipv6(self):
|
||||
self.assertFalse(is_valid_ipv6("127.0.0.1"))
|
||||
self.assertFalse(is_valid_ipv6("10.0.0.1"))
|
||||
ipv6 = "fe80:0000:0000:0000:0204:61ff:fe9d:f156"
|
||||
self.assertTrue(is_valid_ipv6(ipv6))
|
||||
ipv6 = "fe80:0:0:0:204:61ff:fe9d:f156"
|
||||
self.assertTrue(is_valid_ipv6(ipv6))
|
||||
ipv6 = "fe80::204:61ff:fe9d:f156"
|
||||
self.assertTrue(is_valid_ipv6(ipv6))
|
||||
ipv6 = "fe80:0000:0000:0000:0204:61ff:254.157.241.86"
|
||||
self.assertTrue(is_valid_ipv6(ipv6))
|
||||
ipv6 = "fe80:0:0:0:0204:61ff:254.157.241.86"
|
||||
self.assertTrue(is_valid_ipv6(ipv6))
|
||||
ipv6 = "fe80::204:61ff:254.157.241.86"
|
||||
self.assertTrue(is_valid_ipv6(ipv6))
|
||||
ipv6 = "fe80::"
|
||||
self.assertTrue(is_valid_ipv6(ipv6))
|
||||
ipv6 = "::1"
|
||||
self.assertTrue(is_valid_ipv6(ipv6))
|
||||
not_ipv6 = "3ffe:0b00:0000:0001:0000:0000:000a"
|
||||
self.assertFalse(is_valid_ipv6(not_ipv6))
|
||||
not_ipv6 = "1:2:3:4:5:6::7:8"
|
||||
self.assertFalse(is_valid_ipv6(not_ipv6))
|
||||
|
||||
def test_expand_ipv6(self):
|
||||
expanded_ipv6 = "fe80::204:61ff:fe9d:f156"
|
||||
upper_ipv6 = "fe80:0000:0000:0000:0204:61ff:fe9d:f156"
|
||||
self.assertEqual(expanded_ipv6, utils.expand_ipv6(upper_ipv6))
|
||||
omit_ipv6 = "fe80:0000:0000::0204:61ff:fe9d:f156"
|
||||
self.assertEqual(expanded_ipv6, utils.expand_ipv6(omit_ipv6))
|
||||
less_num_ipv6 = "fe80:0:00:000:0204:61ff:fe9d:f156"
|
||||
self.assertEqual(expanded_ipv6, utils.expand_ipv6(less_num_ipv6))
|
||||
|
||||
def test_whataremyips(self):
|
||||
myips = utils.whataremyips()
|
||||
self.assertTrue(len(myips) > 1)
|
||||
self.assertTrue('127.0.0.1' in myips)
|
||||
|
||||
def test_whataremyips_bind_to_all(self):
|
||||
for any_addr in ('0.0.0.0', '0000:0000:0000:0000:0000:0000:0000:0000',
|
||||
'::0', '::0000', '::',
|
||||
# Wacky parse-error input produces all IPs
|
||||
'I am a bear'):
|
||||
myips = utils.whataremyips(any_addr)
|
||||
self.assertTrue(len(myips) > 1)
|
||||
self.assertTrue('127.0.0.1' in myips)
|
||||
|
||||
def test_whataremyips_bind_ip_specific(self):
|
||||
self.assertEqual(['1.2.3.4'], utils.whataremyips('1.2.3.4'))
|
||||
|
||||
def test_whataremyips_error(self):
|
||||
def my_interfaces():
|
||||
return ['eth0']
|
||||
|
||||
def my_ifaddress_error(interface):
|
||||
raise ValueError
|
||||
|
||||
with patch('netifaces.interfaces', my_interfaces), \
|
||||
patch('netifaces.ifaddresses', my_ifaddress_error):
|
||||
self.assertEqual(utils.whataremyips(), [])
|
||||
|
||||
def test_whataremyips_ipv6(self):
|
||||
test_ipv6_address = '2001:6b0:dead:beef:2::32'
|
||||
test_interface = 'eth0'
|
||||
|
||||
def my_ipv6_interfaces():
|
||||
return ['eth0']
|
||||
|
||||
def my_ipv6_ifaddresses(interface):
|
||||
return {AF_INET6:
|
||||
[{'netmask': 'ffff:ffff:ffff:ffff::',
|
||||
'addr': '%s%%%s' % (test_ipv6_address, test_interface)}]}
|
||||
with patch('netifaces.interfaces', my_ipv6_interfaces), \
|
||||
patch('netifaces.ifaddresses', my_ipv6_ifaddresses):
|
||||
myips = utils.whataremyips()
|
||||
self.assertEqual(len(myips), 1)
|
||||
self.assertEqual(myips[0], test_ipv6_address)
|
||||
|
||||
def test_hash_path(self):
|
||||
# Yes, these tests are deliberately very fragile. We want to make sure
|
||||
# that if someones changes the results hash_path produces, they know it
|
||||
|
162
test/unit/common/utils/test_ipaddrs.py
Normal file
162
test/unit/common/utils/test_ipaddrs.py
Normal file
@ -0,0 +1,162 @@
|
||||
# Copyright (c) 2010-2012 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import patch
|
||||
import socket
|
||||
import unittest
|
||||
|
||||
# Continue importing from utils, as 3rd parties may depend on those imports
|
||||
from swift.common import utils
|
||||
from swift.common.utils import ipaddrs as utils_ipaddrs
|
||||
|
||||
|
||||
class TestIsValidIP(unittest.TestCase):
|
||||
def test_is_valid_ip(self):
|
||||
self.assertTrue(utils.is_valid_ip("127.0.0.1"))
|
||||
self.assertTrue(utils.is_valid_ip("10.0.0.1"))
|
||||
ipv6 = "fe80:0000:0000:0000:0204:61ff:fe9d:f156"
|
||||
self.assertTrue(utils.is_valid_ip(ipv6))
|
||||
ipv6 = "fe80:0:0:0:204:61ff:fe9d:f156"
|
||||
self.assertTrue(utils.is_valid_ip(ipv6))
|
||||
ipv6 = "fe80::204:61ff:fe9d:f156"
|
||||
self.assertTrue(utils.is_valid_ip(ipv6))
|
||||
ipv6 = "fe80:0000:0000:0000:0204:61ff:254.157.241.86"
|
||||
self.assertTrue(utils.is_valid_ip(ipv6))
|
||||
ipv6 = "fe80:0:0:0:0204:61ff:254.157.241.86"
|
||||
self.assertTrue(utils.is_valid_ip(ipv6))
|
||||
ipv6 = "fe80::204:61ff:254.157.241.86"
|
||||
self.assertTrue(utils.is_valid_ip(ipv6))
|
||||
ipv6 = "fe80::"
|
||||
self.assertTrue(utils.is_valid_ip(ipv6))
|
||||
ipv6 = "::1"
|
||||
self.assertTrue(utils.is_valid_ip(ipv6))
|
||||
not_ipv6 = "3ffe:0b00:0000:0001:0000:0000:000a"
|
||||
self.assertFalse(utils.is_valid_ip(not_ipv6))
|
||||
not_ipv6 = "1:2:3:4:5:6::7:8"
|
||||
self.assertFalse(utils.is_valid_ip(not_ipv6))
|
||||
|
||||
def test_is_valid_ipv4(self):
|
||||
self.assertTrue(utils.is_valid_ipv4("127.0.0.1"))
|
||||
self.assertTrue(utils.is_valid_ipv4("10.0.0.1"))
|
||||
ipv6 = "fe80:0000:0000:0000:0204:61ff:fe9d:f156"
|
||||
self.assertFalse(utils.is_valid_ipv4(ipv6))
|
||||
ipv6 = "fe80:0:0:0:204:61ff:fe9d:f156"
|
||||
self.assertFalse(utils.is_valid_ipv4(ipv6))
|
||||
ipv6 = "fe80::204:61ff:fe9d:f156"
|
||||
self.assertFalse(utils.is_valid_ipv4(ipv6))
|
||||
ipv6 = "fe80:0000:0000:0000:0204:61ff:254.157.241.86"
|
||||
self.assertFalse(utils.is_valid_ipv4(ipv6))
|
||||
ipv6 = "fe80:0:0:0:0204:61ff:254.157.241.86"
|
||||
self.assertFalse(utils.is_valid_ipv4(ipv6))
|
||||
ipv6 = "fe80::204:61ff:254.157.241.86"
|
||||
self.assertFalse(utils.is_valid_ipv4(ipv6))
|
||||
ipv6 = "fe80::"
|
||||
self.assertFalse(utils.is_valid_ipv4(ipv6))
|
||||
ipv6 = "::1"
|
||||
self.assertFalse(utils.is_valid_ipv4(ipv6))
|
||||
not_ipv6 = "3ffe:0b00:0000:0001:0000:0000:000a"
|
||||
self.assertFalse(utils.is_valid_ipv4(not_ipv6))
|
||||
not_ipv6 = "1:2:3:4:5:6::7:8"
|
||||
self.assertFalse(utils.is_valid_ipv4(not_ipv6))
|
||||
|
||||
def test_is_valid_ipv6(self):
|
||||
self.assertFalse(utils.is_valid_ipv6("127.0.0.1"))
|
||||
self.assertFalse(utils.is_valid_ipv6("10.0.0.1"))
|
||||
ipv6 = "fe80:0000:0000:0000:0204:61ff:fe9d:f156"
|
||||
self.assertTrue(utils.is_valid_ipv6(ipv6))
|
||||
ipv6 = "fe80:0:0:0:204:61ff:fe9d:f156"
|
||||
self.assertTrue(utils.is_valid_ipv6(ipv6))
|
||||
ipv6 = "fe80::204:61ff:fe9d:f156"
|
||||
self.assertTrue(utils.is_valid_ipv6(ipv6))
|
||||
ipv6 = "fe80:0000:0000:0000:0204:61ff:254.157.241.86"
|
||||
self.assertTrue(utils.is_valid_ipv6(ipv6))
|
||||
ipv6 = "fe80:0:0:0:0204:61ff:254.157.241.86"
|
||||
self.assertTrue(utils.is_valid_ipv6(ipv6))
|
||||
ipv6 = "fe80::204:61ff:254.157.241.86"
|
||||
self.assertTrue(utils.is_valid_ipv6(ipv6))
|
||||
ipv6 = "fe80::"
|
||||
self.assertTrue(utils.is_valid_ipv6(ipv6))
|
||||
ipv6 = "::1"
|
||||
self.assertTrue(utils.is_valid_ipv6(ipv6))
|
||||
not_ipv6 = "3ffe:0b00:0000:0001:0000:0000:000a"
|
||||
self.assertFalse(utils.is_valid_ipv6(not_ipv6))
|
||||
not_ipv6 = "1:2:3:4:5:6::7:8"
|
||||
self.assertFalse(utils.is_valid_ipv6(not_ipv6))
|
||||
|
||||
|
||||
class TestExpandIPv6(unittest.TestCase):
|
||||
def test_expand_ipv6(self):
|
||||
expanded_ipv6 = "fe80::204:61ff:fe9d:f156"
|
||||
upper_ipv6 = "fe80:0000:0000:0000:0204:61ff:fe9d:f156"
|
||||
self.assertEqual(expanded_ipv6, utils.expand_ipv6(upper_ipv6))
|
||||
omit_ipv6 = "fe80:0000:0000::0204:61ff:fe9d:f156"
|
||||
self.assertEqual(expanded_ipv6, utils.expand_ipv6(omit_ipv6))
|
||||
less_num_ipv6 = "fe80:0:00:000:0204:61ff:fe9d:f156"
|
||||
self.assertEqual(expanded_ipv6, utils.expand_ipv6(less_num_ipv6))
|
||||
|
||||
|
||||
class TestWhatAreMyIPs(unittest.TestCase):
|
||||
def test_whataremyips(self):
|
||||
myips = utils.whataremyips()
|
||||
self.assertTrue(len(myips) > 1)
|
||||
self.assertIn('127.0.0.1', myips)
|
||||
|
||||
def test_whataremyips_bind_to_all(self):
|
||||
for any_addr in ('0.0.0.0', '0000:0000:0000:0000:0000:0000:0000:0000',
|
||||
'::0', '::0000', '::',
|
||||
# Wacky parse-error input produces all IPs
|
||||
'I am a bear'):
|
||||
myips = utils.whataremyips(any_addr)
|
||||
self.assertTrue(len(myips) > 1)
|
||||
self.assertIn('127.0.0.1', myips)
|
||||
|
||||
def test_whataremyips_bind_ip_specific(self):
|
||||
self.assertEqual(['1.2.3.4'], utils.whataremyips('1.2.3.4'))
|
||||
|
||||
def test_whataremyips_netifaces_error(self):
|
||||
class FakeNetifaces(object):
|
||||
@staticmethod
|
||||
def interfaces():
|
||||
return ['eth0']
|
||||
|
||||
@staticmethod
|
||||
def ifaddresses(interface):
|
||||
raise ValueError
|
||||
|
||||
with patch.object(utils_ipaddrs, 'netifaces', FakeNetifaces):
|
||||
self.assertEqual(utils.whataremyips(), [])
|
||||
|
||||
def test_whataremyips_netifaces_ipv6(self):
|
||||
test_ipv6_address = '2001:6b0:dead:beef:2::32'
|
||||
test_interface = 'eth0'
|
||||
|
||||
class FakeNetifaces(object):
|
||||
AF_INET = int(socket.AF_INET)
|
||||
AF_INET6 = int(socket.AF_INET6)
|
||||
|
||||
@staticmethod
|
||||
def interfaces():
|
||||
return ['eth0']
|
||||
|
||||
@staticmethod
|
||||
def ifaddresses(interface):
|
||||
return {int(socket.AF_INET6): [
|
||||
{'netmask': 'ffff:ffff:ffff:ffff::',
|
||||
'addr': '%s%%%s' % (test_ipv6_address, test_interface)}]}
|
||||
|
||||
with patch.object(utils_ipaddrs, 'netifaces', FakeNetifaces):
|
||||
myips = utils.whataremyips()
|
||||
self.assertEqual(len(myips), 1)
|
||||
self.assertEqual(myips[0], test_ipv6_address)
|
Loading…
Reference in New Issue
Block a user