Allow direct and internal clients to use the replication network
A new header `X-Backend-Use-Replication-Network` is added; if true, use the replication network instead of the client-data-path network. Several background daemons are updated to use the replication network: * account-reaper * container-reconciler * container-sharder * container-sync * object-expirer Note that if container-sync is being used to sync data within the same cluster, the replication network will only be used when communicating with the "source" container; the "destination" traffic will continue to use the configured realm endpoint. The direct and internal client APIs still default to using the client-data-path network; this maintains backwards compatibility for external tools written against them. UpgradeImpact ============= Until recently, servers configured with replication_server = true would only handle REPLICATE (and, in the case of object servers, SSYNC) requests, and would respond 405 Method Not Allowed to other requests. When upgrading from Swift 2.25.0 or earlier, remove the config option and restart services prior to upgrade to avoid a flood of background daemon errors in logs. Note that some background daemons find work by querying Swift rather than walking local drives that should be available on the replication network: * container-reconciler * object-expirer Previosuly these may have been configured without access to the replication network; ensure they have access before upgrading. Closes-Bug: #1883302 Related-Bug: #1446873 Related-Change: Ica2b41a52d11cb10c94fa8ad780a201318c4fc87 Change-Id: Ieef534bf5d5fb53602e875b51c15ef565882fbff
This commit is contained in:
parent
50800aba37
commit
2a6dfae2f3
@ -31,6 +31,7 @@ from swift.common.constraints import check_drive
|
||||
from swift.common.direct_client import direct_delete_container, \
|
||||
direct_delete_object, direct_get_container
|
||||
from swift.common.exceptions import ClientException
|
||||
from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.ring.utils import is_local_device
|
||||
from swift.common.utils import get_logger, whataremyips, config_true_value, \
|
||||
@ -370,7 +371,8 @@ class AccountReaper(Daemon):
|
||||
node, part, account, container,
|
||||
marker=marker,
|
||||
conn_timeout=self.conn_timeout,
|
||||
response_timeout=self.node_timeout)
|
||||
response_timeout=self.node_timeout,
|
||||
headers={USE_REPLICATION_NETWORK_HEADER: 'true'})
|
||||
self.stats_return_codes[2] = \
|
||||
self.stats_return_codes.get(2, 0) + 1
|
||||
self.logger.increment('return_codes.2')
|
||||
@ -418,7 +420,8 @@ class AccountReaper(Daemon):
|
||||
'X-Account-Partition': str(account_partition),
|
||||
'X-Account-Device': anode['device'],
|
||||
'X-Account-Override-Deleted': 'yes',
|
||||
'X-Timestamp': timestamp.internal})
|
||||
'X-Timestamp': timestamp.internal,
|
||||
USE_REPLICATION_NETWORK_HEADER: 'true'})
|
||||
successes += 1
|
||||
self.stats_return_codes[2] = \
|
||||
self.stats_return_codes.get(2, 0) + 1
|
||||
@ -494,7 +497,8 @@ class AccountReaper(Daemon):
|
||||
'X-Container-Partition': str(container_partition),
|
||||
'X-Container-Device': cnode['device'],
|
||||
'X-Backend-Storage-Policy-Index': policy_index,
|
||||
'X-Timestamp': timestamp.internal})
|
||||
'X-Timestamp': timestamp.internal,
|
||||
USE_REPLICATION_NETWORK_HEADER: 'true'})
|
||||
successes += 1
|
||||
self.stats_return_codes[2] = \
|
||||
self.stats_return_codes.get(2, 0) + 1
|
||||
|
@ -29,6 +29,8 @@ from six.moves.http_client import HTTPException
|
||||
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.exceptions import ClientException
|
||||
from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER, \
|
||||
get_ip_port
|
||||
from swift.common.swob import normalize_etag
|
||||
from swift.common.utils import Timestamp, FileLikeIter, quote
|
||||
from swift.common.http import HTTP_NO_CONTENT, HTTP_INSUFFICIENT_STORAGE, \
|
||||
@ -100,9 +102,10 @@ def _make_req(node, part, method, path, headers, stype,
|
||||
if content_length is None:
|
||||
headers['Transfer-Encoding'] = 'chunked'
|
||||
|
||||
ip, port = get_ip_port(node, headers)
|
||||
headers.setdefault('X-Backend-Allow-Reserved-Names', 'true')
|
||||
with Timeout(conn_timeout):
|
||||
conn = http_connect(node['ip'], node['port'], node['device'], part,
|
||||
conn = http_connect(ip, port, node['device'], part,
|
||||
method, path, headers=headers)
|
||||
|
||||
if contents is not None:
|
||||
@ -145,6 +148,9 @@ def _get_direct_account_container(path, stype, node, part,
|
||||
Do not use directly use the get_direct_account or
|
||||
get_direct_container instead.
|
||||
"""
|
||||
if headers is None:
|
||||
headers = {}
|
||||
|
||||
params = ['format=json']
|
||||
if marker:
|
||||
params.append('marker=%s' % quote(marker))
|
||||
@ -159,8 +165,10 @@ def _get_direct_account_container(path, stype, node, part,
|
||||
if reverse:
|
||||
params.append('reverse=%s' % quote(reverse))
|
||||
qs = '&'.join(params)
|
||||
|
||||
ip, port = get_ip_port(node, headers)
|
||||
with Timeout(conn_timeout):
|
||||
conn = http_connect(node['ip'], node['port'], node['device'], part,
|
||||
conn = http_connect(ip, port, node['device'], part,
|
||||
'GET', path, query_string=qs,
|
||||
headers=gen_headers(hdrs_in=headers))
|
||||
with Timeout(response_timeout):
|
||||
@ -200,7 +208,8 @@ def gen_headers(hdrs_in=None, add_ts=True):
|
||||
|
||||
def direct_get_account(node, part, account, marker=None, limit=None,
|
||||
prefix=None, delimiter=None, conn_timeout=5,
|
||||
response_timeout=15, end_marker=None, reverse=None):
|
||||
response_timeout=15, end_marker=None, reverse=None,
|
||||
headers=None):
|
||||
"""
|
||||
Get listings directly from the account server.
|
||||
|
||||
@ -220,6 +229,7 @@ def direct_get_account(node, part, account, marker=None, limit=None,
|
||||
"""
|
||||
path = _make_path(account)
|
||||
return _get_direct_account_container(path, "Account", node, part,
|
||||
headers=headers,
|
||||
marker=marker,
|
||||
limit=limit, prefix=prefix,
|
||||
delimiter=delimiter,
|
||||
@ -240,7 +250,7 @@ def direct_delete_account(node, part, account, conn_timeout=5,
|
||||
|
||||
|
||||
def direct_head_container(node, part, account, container, conn_timeout=5,
|
||||
response_timeout=15):
|
||||
response_timeout=15, headers=None):
|
||||
"""
|
||||
Request container information directly from the container server.
|
||||
|
||||
@ -253,8 +263,11 @@ def direct_head_container(node, part, account, container, conn_timeout=5,
|
||||
:returns: a dict containing the response's headers in a HeaderKeyDict
|
||||
:raises ClientException: HTTP HEAD request failed
|
||||
"""
|
||||
if headers is None:
|
||||
headers = {}
|
||||
|
||||
path = _make_path(account, container)
|
||||
resp = _make_req(node, part, 'HEAD', path, gen_headers(),
|
||||
resp = _make_req(node, part, 'HEAD', path, gen_headers(headers),
|
||||
'Container', conn_timeout, response_timeout)
|
||||
|
||||
resp_headers = HeaderKeyDict()
|
||||
@ -431,9 +444,10 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5,
|
||||
if headers is None:
|
||||
headers = {}
|
||||
|
||||
ip, port = get_ip_port(node, headers)
|
||||
path = _make_path(account, container, obj)
|
||||
with Timeout(conn_timeout):
|
||||
conn = http_connect(node['ip'], node['port'], node['device'], part,
|
||||
conn = http_connect(ip, port, node['device'], part,
|
||||
'GET', path, headers=gen_headers(headers))
|
||||
with Timeout(response_timeout):
|
||||
resp = conn.getresponse()
|
||||
@ -551,6 +565,9 @@ def direct_get_suffix_hashes(node, part, suffixes, conn_timeout=5,
|
||||
"""
|
||||
Get suffix hashes directly from the object server.
|
||||
|
||||
Note that unlike other ``direct_client`` functions, this one defaults
|
||||
to using the replication network to make requests.
|
||||
|
||||
:param node: node dictionary from the ring
|
||||
:param part: partition the container is on
|
||||
:param conn_timeout: timeout in seconds for establishing the connection
|
||||
@ -562,9 +579,11 @@ def direct_get_suffix_hashes(node, part, suffixes, conn_timeout=5,
|
||||
if headers is None:
|
||||
headers = {}
|
||||
|
||||
headers.setdefault(USE_REPLICATION_NETWORK_HEADER, 'true')
|
||||
ip, port = get_ip_port(node, headers)
|
||||
path = '/%s' % '-'.join(suffixes)
|
||||
with Timeout(conn_timeout):
|
||||
conn = http_connect(node['replication_ip'], node['replication_port'],
|
||||
conn = http_connect(ip, port,
|
||||
node['device'], part, 'REPLICATE', path,
|
||||
headers=gen_headers(headers))
|
||||
with Timeout(response_timeout):
|
||||
|
@ -29,6 +29,7 @@ from swift.common.constraints import AUTO_CREATE_ACCOUNT_PREFIX
|
||||
from swift.common.exceptions import ClientException
|
||||
from swift.common.http import (HTTP_NOT_FOUND, HTTP_MULTIPLE_CHOICES,
|
||||
is_client_error, is_server_error)
|
||||
from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER
|
||||
from swift.common.swob import Request, bytes_to_wsgi
|
||||
from swift.common.utils import quote, closing_if_possible
|
||||
from swift.common.wsgi import loadapp, pipeline_property
|
||||
@ -147,13 +148,14 @@ class InternalClient(object):
|
||||
"""
|
||||
|
||||
def __init__(self, conf_path, user_agent, request_tries,
|
||||
allow_modify_pipeline=False):
|
||||
allow_modify_pipeline=False, use_replication_network=False):
|
||||
if request_tries < 1:
|
||||
raise ValueError('request_tries must be positive')
|
||||
self.app = loadapp(conf_path,
|
||||
allow_modify_pipeline=allow_modify_pipeline)
|
||||
self.user_agent = user_agent
|
||||
self.request_tries = request_tries
|
||||
self.use_replication_network = use_replication_network
|
||||
|
||||
get_object_ring = pipeline_property('get_object_ring')
|
||||
container_ring = pipeline_property('container_ring')
|
||||
@ -186,6 +188,9 @@ class InternalClient(object):
|
||||
headers = dict(headers)
|
||||
headers['user-agent'] = self.user_agent
|
||||
headers.setdefault('x-backend-allow-reserved-names', 'true')
|
||||
if self.use_replication_network:
|
||||
headers.setdefault(USE_REPLICATION_NETWORK_HEADER, 'true')
|
||||
|
||||
for attempt in range(self.request_tries):
|
||||
resp = exc_type = exc_value = exc_traceback = None
|
||||
req = Request.blank(
|
||||
|
@ -40,14 +40,15 @@ from swift.common.utils import split_path, validate_device_partition, \
|
||||
close_if_possible, maybe_multipart_byteranges_to_document_iters, \
|
||||
multipart_byteranges_to_document_iters, parse_content_type, \
|
||||
parse_content_range, csv_append, list_from_csv, Spliterator, quote, \
|
||||
RESERVED
|
||||
RESERVED, config_true_value
|
||||
from swift.common.wsgi import make_subrequest
|
||||
from swift.container.reconciler import MISPLACED_OBJECTS_ACCOUNT
|
||||
|
||||
|
||||
OBJECT_TRANSIENT_SYSMETA_PREFIX = 'x-object-transient-sysmeta-'
|
||||
OBJECT_SYSMETA_CONTAINER_UPDATE_OVERRIDE_PREFIX = \
|
||||
'x-object-sysmeta-container-update-override-'
|
||||
USE_REPLICATION_NETWORK_HEADER = 'x-backend-use-replication-network'
|
||||
MISPLACED_OBJECTS_ACCOUNT = '.misplaced_objects'
|
||||
|
||||
|
||||
if six.PY2:
|
||||
@ -849,3 +850,15 @@ def update_ignore_range_header(req, name):
|
||||
raise ValueError('Header name must not contain commas')
|
||||
hdr = 'X-Backend-Ignore-Range-If-Metadata-Present'
|
||||
req.headers[hdr] = csv_append(req.headers.get(hdr), name)
|
||||
|
||||
|
||||
def get_ip_port(node, headers):
|
||||
use_replication_network = False
|
||||
for h, v in headers.items():
|
||||
if h.lower() == USE_REPLICATION_NETWORK_HEADER:
|
||||
use_replication_network = config_true_value(v)
|
||||
break
|
||||
if use_replication_network:
|
||||
return node['replication_ip'], node['replication_port']
|
||||
else:
|
||||
return node['ip'], node['port']
|
||||
|
@ -27,11 +27,12 @@ from swift.common.direct_client import (
|
||||
direct_head_container, direct_delete_container_object,
|
||||
direct_put_container_object, ClientException)
|
||||
from swift.common.internal_client import InternalClient, UnexpectedResponse
|
||||
from swift.common.request_helpers import MISPLACED_OBJECTS_ACCOUNT, \
|
||||
USE_REPLICATION_NETWORK_HEADER
|
||||
from swift.common.utils import get_logger, split_path, majority_size, \
|
||||
FileLikeIter, Timestamp, last_modified_date_to_timestamp, \
|
||||
LRUCache, decode_timestamps
|
||||
|
||||
MISPLACED_OBJECTS_ACCOUNT = '.misplaced_objects'
|
||||
MISPLACED_OBJECTS_CONTAINER_DIVISOR = 3600 # 1 hour
|
||||
CONTAINER_POLICY_TTL = 30
|
||||
|
||||
@ -224,6 +225,7 @@ def add_to_reconciler_queue(container_ring, account, container, obj,
|
||||
'X-Etag': obj_timestamp,
|
||||
'X-Timestamp': x_timestamp,
|
||||
'X-Content-Type': q_op_type,
|
||||
USE_REPLICATION_NETWORK_HEADER: 'true',
|
||||
}
|
||||
|
||||
def _check_success(*args, **kwargs):
|
||||
@ -307,7 +309,8 @@ def direct_get_container_policy_index(container_ring, account_name,
|
||||
"""
|
||||
def _eat_client_exception(*args):
|
||||
try:
|
||||
return direct_head_container(*args)
|
||||
return direct_head_container(*args, headers={
|
||||
USE_REPLICATION_NETWORK_HEADER: 'true'})
|
||||
except ClientException as err:
|
||||
if err.http_status == 404:
|
||||
return err.http_headers
|
||||
@ -333,6 +336,10 @@ def direct_delete_container_entry(container_ring, account_name, container_name,
|
||||
object listing. Does not talk to object servers; use this only when a
|
||||
container entry does not actually have a corresponding object.
|
||||
"""
|
||||
if headers is None:
|
||||
headers = {}
|
||||
headers[USE_REPLICATION_NETWORK_HEADER] = 'true'
|
||||
|
||||
pool = GreenPool()
|
||||
part, nodes = container_ring.get_nodes(account_name, container_name)
|
||||
for node in nodes:
|
||||
@ -360,9 +367,11 @@ class ContainerReconciler(Daemon):
|
||||
'/etc/swift/container-reconciler.conf'
|
||||
self.logger = get_logger(conf, log_route='container-reconciler')
|
||||
request_tries = int(conf.get('request_tries') or 3)
|
||||
self.swift = InternalClient(conf_path,
|
||||
'Swift Container Reconciler',
|
||||
request_tries)
|
||||
self.swift = InternalClient(
|
||||
conf_path,
|
||||
'Swift Container Reconciler',
|
||||
request_tries,
|
||||
use_replication_network=True)
|
||||
self.stats = defaultdict(int)
|
||||
self.last_stat_time = time.time()
|
||||
|
||||
|
@ -29,6 +29,7 @@ from swift.common.constraints import check_drive, AUTO_CREATE_ACCOUNT_PREFIX
|
||||
from swift.common.direct_client import (direct_put_container,
|
||||
DirectClientException)
|
||||
from swift.common.exceptions import DeviceUnavailable
|
||||
from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER
|
||||
from swift.common.ring.utils import is_local_device
|
||||
from swift.common.swob import str_to_wsgi
|
||||
from swift.common.utils import get_logger, config_true_value, \
|
||||
@ -409,7 +410,8 @@ class ContainerSharder(ContainerReplicator):
|
||||
internal_client_conf_path,
|
||||
'Swift Container Sharder',
|
||||
request_tries,
|
||||
allow_modify_pipeline=False)
|
||||
allow_modify_pipeline=False,
|
||||
use_replication_network=True)
|
||||
except (OSError, IOError) as err:
|
||||
if err.errno != errno.ENOENT and \
|
||||
not str(err).endswith(' not found'):
|
||||
@ -623,6 +625,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
part, nodes = self.ring.get_nodes(account, container)
|
||||
headers = headers or {}
|
||||
headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD,
|
||||
USE_REPLICATION_NETWORK_HEADER: 'True',
|
||||
'User-Agent': 'container-sharder %s' % os.getpid(),
|
||||
'X-Timestamp': Timestamp.now().normal,
|
||||
'Content-Length': len(body),
|
||||
|
@ -241,7 +241,8 @@ class ContainerSync(Daemon):
|
||||
internal_client_conf = internal_client_conf_path
|
||||
try:
|
||||
self.swift = InternalClient(
|
||||
internal_client_conf, 'Swift Container Sync', request_tries)
|
||||
internal_client_conf, 'Swift Container Sync', request_tries,
|
||||
use_replication_network=True)
|
||||
except (OSError, IOError) as err:
|
||||
if err.errno != errno.ENOENT and \
|
||||
not str(err).endswith(' not found'):
|
||||
|
@ -134,7 +134,8 @@ class ObjectExpirer(Daemon):
|
||||
|
||||
request_tries = int(self.conf.get('request_tries') or 3)
|
||||
self.swift = swift or InternalClient(
|
||||
self.ic_conf_path, 'Swift Object Expirer', request_tries)
|
||||
self.ic_conf_path, 'Swift Object Expirer', request_tries,
|
||||
use_replication_network=True)
|
||||
|
||||
self.processes = int(self.conf.get('processes', 0))
|
||||
self.process = int(self.conf.get('process', 0))
|
||||
|
@ -63,7 +63,7 @@ from swift.common.swob import Request, Response, Range, \
|
||||
from swift.common.request_helpers import strip_sys_meta_prefix, \
|
||||
strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta, \
|
||||
http_response_to_document_iters, is_object_transient_sysmeta, \
|
||||
strip_object_transient_sysmeta_prefix
|
||||
strip_object_transient_sysmeta_prefix, get_ip_port
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
|
||||
@ -1264,11 +1264,13 @@ class ResumingGetter(object):
|
||||
# a request may be specialised with specific backend headers
|
||||
if self.header_provider:
|
||||
req_headers.update(self.header_provider())
|
||||
|
||||
ip, port = get_ip_port(node, req_headers)
|
||||
start_node_timing = time.time()
|
||||
try:
|
||||
with ConnectionTimeout(self.app.conn_timeout):
|
||||
conn = http_connect(
|
||||
node['ip'], node['port'], node['device'],
|
||||
ip, port, node['device'],
|
||||
self.partition, self.req_method, self.path,
|
||||
headers=req_headers,
|
||||
query_string=self.req_query_string)
|
||||
@ -1766,11 +1768,12 @@ class Controller(object):
|
||||
headers['Content-Length'] = str(len(body))
|
||||
for node in nodes:
|
||||
try:
|
||||
ip, port = get_ip_port(node, headers)
|
||||
start_node_timing = time.time()
|
||||
with ConnectionTimeout(self.app.conn_timeout):
|
||||
conn = http_connect(node['ip'], node['port'],
|
||||
node['device'], part, method, path,
|
||||
headers=headers, query_string=query)
|
||||
conn = http_connect(
|
||||
ip, port, node['device'], part, method, path,
|
||||
headers=headers, query_string=query)
|
||||
conn.node = node
|
||||
self.app.set_node_timing(node, time.time() - start_node_timing)
|
||||
if body:
|
||||
|
@ -73,7 +73,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
|
||||
HTTPRequestedRangeNotSatisfiable, Range, HTTPInternalServerError, \
|
||||
normalize_etag
|
||||
from swift.common.request_helpers import update_etag_is_at_header, \
|
||||
resolve_etag_is_at_header, validate_internal_obj
|
||||
resolve_etag_is_at_header, validate_internal_obj, get_ip_port
|
||||
|
||||
|
||||
def check_content_type(req):
|
||||
@ -1683,9 +1683,10 @@ class Putter(object):
|
||||
@classmethod
|
||||
def _make_connection(cls, node, part, path, headers, conn_timeout,
|
||||
node_timeout):
|
||||
ip, port = get_ip_port(node, headers)
|
||||
start_time = time.time()
|
||||
with ConnectionTimeout(conn_timeout):
|
||||
conn = http_connect(node['ip'], node['port'], node['device'],
|
||||
conn = http_connect(ip, port, node['device'],
|
||||
part, 'PUT', path, headers)
|
||||
connect_duration = time.time() - start_time
|
||||
|
||||
|
@ -215,12 +215,13 @@ class PatchPolicies(object):
|
||||
class FakeRing(Ring):
|
||||
|
||||
def __init__(self, replicas=3, max_more_nodes=0, part_power=0,
|
||||
base_port=1000):
|
||||
base_port=1000, separate_replication=False):
|
||||
self.serialized_path = '/foo/bar/object.ring.gz'
|
||||
self._base_port = base_port
|
||||
self.max_more_nodes = max_more_nodes
|
||||
self._part_shift = 32 - part_power
|
||||
self._init_device_char()
|
||||
self.separate_replication = separate_replication
|
||||
# 9 total nodes (6 more past the initial 3) is the cap, no matter if
|
||||
# this is set higher, or R^2 for R replicas
|
||||
self.set_replicas(replicas)
|
||||
@ -256,11 +257,16 @@ class FakeRing(Ring):
|
||||
for x in range(self.replicas):
|
||||
ip = '10.0.0.%s' % x
|
||||
port = self._base_port + x
|
||||
if self.separate_replication:
|
||||
repl_ip = '10.0.1.%s' % x
|
||||
repl_port = port + 100
|
||||
else:
|
||||
repl_ip, repl_port = ip, port
|
||||
dev = {
|
||||
'ip': ip,
|
||||
'replication_ip': ip,
|
||||
'replication_ip': repl_ip,
|
||||
'port': port,
|
||||
'replication_port': port,
|
||||
'replication_port': repl_port,
|
||||
'device': self.device_char,
|
||||
'zone': x % 3,
|
||||
'region': x % 2,
|
||||
@ -278,10 +284,17 @@ class FakeRing(Ring):
|
||||
def get_more_nodes(self, part):
|
||||
index_counter = itertools.count()
|
||||
for x in range(self.replicas, (self.replicas + self.max_more_nodes)):
|
||||
yield {'ip': '10.0.0.%s' % x,
|
||||
'replication_ip': '10.0.0.%s' % x,
|
||||
'port': self._base_port + x,
|
||||
'replication_port': self._base_port + x,
|
||||
ip = '10.0.0.%s' % x
|
||||
port = self._base_port + x
|
||||
if self.separate_replication:
|
||||
repl_ip = '10.0.1.%s' % x
|
||||
repl_port = port + 100
|
||||
else:
|
||||
repl_ip, repl_port = ip, port
|
||||
yield {'ip': ip,
|
||||
'replication_ip': repl_ip,
|
||||
'port': port,
|
||||
'replication_port': repl_port,
|
||||
'device': 'sda',
|
||||
'zone': x % 3,
|
||||
'region': x % 2,
|
||||
@ -1265,12 +1278,11 @@ def fake_ec_node_response(node_frags, policy):
|
||||
call_count = {} # maps node index to get_response call count for node
|
||||
|
||||
def _build_node_map(req, policy):
|
||||
node_key = lambda n: (n['ip'], n['port'])
|
||||
part = utils.split_path(req['path'], 5, 5, True)[1]
|
||||
all_nodes.extend(policy.object_ring.get_part_nodes(part))
|
||||
all_nodes.extend(policy.object_ring.get_more_nodes(part))
|
||||
for i, node in enumerate(all_nodes):
|
||||
node_map[node_key(node)] = i
|
||||
node_map[(node['ip'], node['port'])] = i
|
||||
call_count[i] = 0
|
||||
|
||||
# normalize node_frags to a list of fragments for each node even
|
||||
|
@ -322,7 +322,8 @@ class TestReaper(unittest.TestCase):
|
||||
'X-Container-Partition': 'partition',
|
||||
'X-Container-Device': device,
|
||||
'X-Backend-Storage-Policy-Index': policy.idx,
|
||||
'X-Timestamp': '1429117638.86767'
|
||||
'X-Timestamp': '1429117638.86767',
|
||||
'x-backend-use-replication-network': 'true',
|
||||
}
|
||||
ring = r.get_object_ring(policy.idx)
|
||||
expected = call(dict(ring.devs[i], index=i), 0,
|
||||
@ -442,7 +443,8 @@ class TestReaper(unittest.TestCase):
|
||||
'X-Account-Partition': 'partition',
|
||||
'X-Account-Device': device,
|
||||
'X-Account-Override-Deleted': 'yes',
|
||||
'X-Timestamp': '1429117639.67676'
|
||||
'X-Timestamp': '1429117639.67676',
|
||||
'x-backend-use-replication-network': 'true',
|
||||
}
|
||||
ring = r.get_object_ring(policy.idx)
|
||||
expected = call(dict(ring.devs[i], index=i), 0, 'a', 'c',
|
||||
|
@ -316,6 +316,31 @@ class TestDirectClient(unittest.TestCase):
|
||||
self.assertIn('X-Timestamp', headers)
|
||||
self.assertIn('User-Agent', headers)
|
||||
|
||||
def test_direct_delete_account_replication_net(self):
|
||||
part = '0'
|
||||
account = 'a'
|
||||
|
||||
mock_path = 'swift.common.bufferedhttp.http_connect_raw'
|
||||
with mock.patch(mock_path) as fake_connect:
|
||||
fake_connect.return_value.getresponse.return_value.status = 200
|
||||
direct_client.direct_delete_account(
|
||||
self.node, part, account,
|
||||
headers={'X-Backend-Use-Replication-Network': 't'})
|
||||
args, kwargs = fake_connect.call_args
|
||||
ip = args[0]
|
||||
self.assertEqual(self.node['replication_ip'], ip)
|
||||
self.assertNotEqual(self.node['ip'], ip)
|
||||
port = args[1]
|
||||
self.assertEqual(self.node['replication_port'], port)
|
||||
self.assertNotEqual(self.node['port'], port)
|
||||
method = args[2]
|
||||
self.assertEqual('DELETE', method)
|
||||
path = args[3]
|
||||
self.assertEqual('/sda/0/a', path)
|
||||
headers = args[4]
|
||||
self.assertIn('X-Timestamp', headers)
|
||||
self.assertIn('User-Agent', headers)
|
||||
|
||||
def test_direct_delete_account_failure(self):
|
||||
part = '0'
|
||||
account = 'a'
|
||||
@ -346,6 +371,24 @@ class TestDirectClient(unittest.TestCase):
|
||||
self.user_agent)
|
||||
self.assertEqual(headers, resp)
|
||||
|
||||
def test_direct_head_container_replication_net(self):
|
||||
headers = HeaderKeyDict(key='value')
|
||||
|
||||
with mocked_http_conn(200, headers) as conn:
|
||||
resp = direct_client.direct_head_container(
|
||||
self.node, self.part, self.account, self.container,
|
||||
headers={'X-Backend-Use-Replication-Network': 'on'})
|
||||
self.assertEqual(conn.host, self.node['replication_ip'])
|
||||
self.assertEqual(conn.port, self.node['replication_port'])
|
||||
self.assertNotEqual(conn.host, self.node['ip'])
|
||||
self.assertNotEqual(conn.port, self.node['port'])
|
||||
self.assertEqual(conn.method, 'HEAD')
|
||||
self.assertEqual(conn.path, self.container_path)
|
||||
|
||||
self.assertEqual(conn.req_headers['user-agent'],
|
||||
self.user_agent)
|
||||
self.assertEqual(headers, resp)
|
||||
|
||||
def test_direct_head_container_error(self):
|
||||
headers = HeaderKeyDict(key='value')
|
||||
|
||||
@ -441,6 +484,18 @@ class TestDirectClient(unittest.TestCase):
|
||||
self.assertEqual(conn.method, 'DELETE')
|
||||
self.assertEqual(conn.path, self.container_path)
|
||||
|
||||
def test_direct_delete_container_replication_net(self):
|
||||
with mocked_http_conn(200) as conn:
|
||||
direct_client.direct_delete_container(
|
||||
self.node, self.part, self.account, self.container,
|
||||
headers={'X-Backend-Use-Replication-Network': '1'})
|
||||
self.assertEqual(conn.host, self.node['replication_ip'])
|
||||
self.assertEqual(conn.port, self.node['replication_port'])
|
||||
self.assertNotEqual(conn.host, self.node['ip'])
|
||||
self.assertNotEqual(conn.port, self.node['port'])
|
||||
self.assertEqual(conn.method, 'DELETE')
|
||||
self.assertEqual(conn.path, self.container_path)
|
||||
|
||||
def test_direct_delete_container_with_timestamp(self):
|
||||
# ensure timestamp is different from any that might be auto-generated
|
||||
timestamp = Timestamp(time.time() - 100)
|
||||
|
@ -25,7 +25,7 @@ from textwrap import dedent
|
||||
import six
|
||||
from six.moves import range, zip_longest
|
||||
from six.moves.urllib.parse import quote, parse_qsl
|
||||
from swift.common import exceptions, internal_client, swob
|
||||
from swift.common import exceptions, internal_client, request_helpers, swob
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
from swift.common.middleware.proxy_logging import ProxyLoggingMiddleware
|
||||
@ -303,7 +303,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
with mock.patch.object(internal_client, 'loadapp', app.load), \
|
||||
self.assertRaises(ValueError):
|
||||
# First try with a bad arg
|
||||
client = internal_client.InternalClient(
|
||||
internal_client.InternalClient(
|
||||
conf_path, user_agent, request_tries=0)
|
||||
self.assertEqual(0, app.load_called)
|
||||
|
||||
@ -315,6 +315,18 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.assertEqual(app, client.app)
|
||||
self.assertEqual(user_agent, client.user_agent)
|
||||
self.assertEqual(request_tries, client.request_tries)
|
||||
self.assertFalse(client.use_replication_network)
|
||||
|
||||
with mock.patch.object(internal_client, 'loadapp', app.load):
|
||||
client = internal_client.InternalClient(
|
||||
conf_path, user_agent, request_tries,
|
||||
use_replication_network=True)
|
||||
|
||||
self.assertEqual(2, app.load_called)
|
||||
self.assertEqual(app, client.app)
|
||||
self.assertEqual(user_agent, client.user_agent)
|
||||
self.assertEqual(request_tries, client.request_tries)
|
||||
self.assertTrue(client.use_replication_network)
|
||||
|
||||
def test_make_request_sets_user_agent(self):
|
||||
class InternalClient(internal_client.InternalClient):
|
||||
@ -323,8 +335,11 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.app = self.fake_app
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 1
|
||||
self.use_replication_network = False
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
self.test.assertNotIn(
|
||||
'HTTP_X_BACKEND_USE_REPLICATION_NETWORK', env)
|
||||
self.test.assertEqual(self.user_agent, env['HTTP_USER_AGENT'])
|
||||
start_response('200 Ok', [('Content-Length', '0')])
|
||||
return []
|
||||
@ -332,6 +347,47 @@ class TestInternalClient(unittest.TestCase):
|
||||
client = InternalClient(self)
|
||||
client.make_request('GET', '/', {}, (200,))
|
||||
|
||||
def test_make_request_defaults_replication_network_header(self):
|
||||
class InternalClient(internal_client.InternalClient):
|
||||
def __init__(self, test):
|
||||
self.test = test
|
||||
self.app = self.fake_app
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 1
|
||||
self.use_replication_network = False
|
||||
self.expected_header_value = None
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
if self.expected_header_value is None:
|
||||
self.test.assertNotIn(
|
||||
'HTTP_X_BACKEND_USE_REPLICATION_NETWORK', env)
|
||||
else:
|
||||
hdr_val = env['HTTP_X_BACKEND_USE_REPLICATION_NETWORK']
|
||||
self.test.assertEqual(self.expected_header_value, hdr_val)
|
||||
start_response('200 Ok', [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
client = InternalClient(self)
|
||||
client.make_request('GET', '/', {}, (200,))
|
||||
# Caller can still override
|
||||
client.expected_header_value = 'false'
|
||||
client.make_request('GET', '/', {
|
||||
request_helpers.USE_REPLICATION_NETWORK_HEADER: 'false'}, (200,))
|
||||
client.expected_header_value = 'true'
|
||||
client.make_request('GET', '/', {
|
||||
request_helpers.USE_REPLICATION_NETWORK_HEADER: 'true'}, (200,))
|
||||
|
||||
# Switch default behavior
|
||||
client.use_replication_network = True
|
||||
|
||||
client.make_request('GET', '/', {}, (200,))
|
||||
client.expected_header_value = 'false'
|
||||
client.make_request('GET', '/', {
|
||||
request_helpers.USE_REPLICATION_NETWORK_HEADER: 'false'}, (200,))
|
||||
client.expected_header_value = 'on'
|
||||
client.make_request('GET', '/', {
|
||||
request_helpers.USE_REPLICATION_NETWORK_HEADER: 'on'}, (200,))
|
||||
|
||||
def test_make_request_sets_query_string(self):
|
||||
captured_envs = []
|
||||
|
||||
@ -341,6 +397,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.app = self.fake_app
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 1
|
||||
self.use_replication_network = False
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
captured_envs.append(env)
|
||||
@ -362,6 +419,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.app = self.fake_app
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 4
|
||||
self.use_replication_network = False
|
||||
self.tries = 0
|
||||
self.sleep_called = 0
|
||||
|
||||
@ -441,6 +499,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.app = self.fake_app
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 3
|
||||
self.use_replication_network = False
|
||||
self.env = None
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
@ -468,6 +527,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.fake_app, {}, self.logger)
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 3
|
||||
self.use_replication_network = False
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
body = b'fake error response'
|
||||
@ -499,6 +559,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.user_agent = 'some_agent'
|
||||
self.resp_status = resp_status
|
||||
self.request_tries = 3
|
||||
self.use_replication_network = False
|
||||
self.closed_paths = []
|
||||
self.fully_read_paths = []
|
||||
|
||||
@ -557,6 +618,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.app = self.fake_app
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 3
|
||||
self.use_replication_network = False
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
start_response('200 Ok', [('Content-Length', '0')])
|
||||
@ -607,6 +669,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.app = self.fake_app
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 3
|
||||
self.use_replication_network = False
|
||||
self.status = status
|
||||
self.call_count = 0
|
||||
|
||||
@ -698,6 +761,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
def __init__(self):
|
||||
self.user_agent = 'test'
|
||||
self.request_tries = 1
|
||||
self.use_replication_network = False
|
||||
self.app = self.fake_app
|
||||
|
||||
def fake_app(self, environ, start_response):
|
||||
@ -1217,6 +1281,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.app = self.fake_app
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 3
|
||||
self.use_replication_network = False
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
self.req_env = env
|
||||
@ -1261,6 +1326,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.app = self.fake_app
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 3
|
||||
self.use_replication_network = False
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
start_response('200 Ok', [('Content-Length', '0')])
|
||||
@ -1280,6 +1346,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.app = self.fake_app
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 3
|
||||
self.use_replication_network = False
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
start_response('200 Ok', [('Content-Length', '0')])
|
||||
@ -1300,6 +1367,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.app = self.fake_app
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 3
|
||||
self.use_replication_network = False
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
start_response('404 Not Found', [])
|
||||
@ -1330,6 +1398,7 @@ class TestInternalClient(unittest.TestCase):
|
||||
class InternalClient(internal_client.InternalClient):
|
||||
def __init__(self, test, path, headers, fobj):
|
||||
self.test = test
|
||||
self.use_replication_network = False
|
||||
self.path = path
|
||||
self.headers = headers
|
||||
self.fobj = fobj
|
||||
|
@ -124,6 +124,19 @@ class TestRequestHelpers(unittest.TestCase):
|
||||
self.assertFalse('c' in to_req.headers)
|
||||
self.assertFalse('C' in to_req.headers)
|
||||
|
||||
def test_get_ip_port(self):
|
||||
node = {
|
||||
'ip': '1.2.3.4',
|
||||
'port': 6000,
|
||||
'replication_ip': '5.6.7.8',
|
||||
'replication_port': 7000,
|
||||
}
|
||||
self.assertEqual(('1.2.3.4', 6000), rh.get_ip_port(node, {}))
|
||||
self.assertEqual(('5.6.7.8', 7000), rh.get_ip_port(node, {
|
||||
rh.USE_REPLICATION_NETWORK_HEADER: 'true'}))
|
||||
self.assertEqual(('1.2.3.4', 6000), rh.get_ip_port(node, {
|
||||
rh.USE_REPLICATION_NETWORK_HEADER: 'false'}))
|
||||
|
||||
@patch_policies(with_ec_default=True)
|
||||
def test_get_name_and_placement_object_req(self):
|
||||
path = '/device/part/account/container/object'
|
||||
|
@ -95,6 +95,7 @@ class FakeInternalClient(reconciler.InternalClient):
|
||||
self.app = FakeStoragePolicySwift()
|
||||
self.user_agent = 'fake-internal-client'
|
||||
self.request_tries = 1
|
||||
self.use_replication_network = True
|
||||
self.parse(listings)
|
||||
|
||||
def parse(self, listings):
|
||||
|
@ -171,7 +171,8 @@ class TestSharder(BaseTestSharder):
|
||||
'container-sharder', sharder.logger.logger.name)
|
||||
mock_ic.assert_called_once_with(
|
||||
'/etc/swift/internal-client.conf', 'Swift Container Sharder', 3,
|
||||
allow_modify_pipeline=False)
|
||||
allow_modify_pipeline=False,
|
||||
use_replication_network=True)
|
||||
|
||||
conf = {
|
||||
'mount_check': False, 'bind_ip': '10.11.12.13', 'bind_port': 62010,
|
||||
@ -221,7 +222,8 @@ class TestSharder(BaseTestSharder):
|
||||
sharder, mock_ic = do_test(conf, expected)
|
||||
mock_ic.assert_called_once_with(
|
||||
'/etc/swift/my-sharder-ic.conf', 'Swift Container Sharder', 2,
|
||||
allow_modify_pipeline=False)
|
||||
allow_modify_pipeline=False,
|
||||
use_replication_network=True)
|
||||
self.assertEqual(self.logger.get_lines_for_level('warning'), [
|
||||
'Option auto_create_account_prefix is deprecated. '
|
||||
'Configure auto_create_account_prefix under the '
|
||||
@ -731,11 +733,12 @@ class TestSharder(BaseTestSharder):
|
||||
self.logger.clear()
|
||||
conf = conf or {}
|
||||
conf['devices'] = self.tempdir
|
||||
fake_ring = FakeRing(replicas=replicas, separate_replication=True)
|
||||
with mock.patch(
|
||||
'swift.container.sharder.internal_client.InternalClient'):
|
||||
with mock.patch(
|
||||
'swift.common.db_replicator.ring.Ring',
|
||||
lambda *args, **kwargs: FakeRing(replicas=replicas)):
|
||||
return_value=fake_ring):
|
||||
sharder = ContainerSharder(conf, logger=self.logger)
|
||||
sharder._local_device_ids = {0, 1, 2}
|
||||
sharder._replicate_object = mock.MagicMock(
|
||||
@ -4185,20 +4188,31 @@ class TestSharder(BaseTestSharder):
|
||||
|
||||
def check_shard_ranges_sent(self, broker, expected_sent):
|
||||
bodies = []
|
||||
servers = []
|
||||
|
||||
def capture_send(conn, data):
|
||||
bodies.append(data)
|
||||
|
||||
def capture_connect(host, port, *a, **kw):
|
||||
servers.append((host, port))
|
||||
|
||||
self.assertFalse(broker.get_own_shard_range().reported) # sanity
|
||||
with self._mock_sharder() as sharder:
|
||||
with mocked_http_conn(204, 204, 204,
|
||||
give_send=capture_send) as mock_conn:
|
||||
give_send=capture_send,
|
||||
give_connect=capture_connect) as mock_conn:
|
||||
sharder._update_root_container(broker)
|
||||
|
||||
for req in mock_conn.requests:
|
||||
self.assertEqual('PUT', req['method'])
|
||||
self.assertEqual([expected_sent] * 3,
|
||||
[json.loads(b) for b in bodies])
|
||||
self.assertEqual(servers, [
|
||||
# NB: replication interfaces
|
||||
('10.0.1.0', 1100),
|
||||
('10.0.1.1', 1101),
|
||||
('10.0.1.2', 1102),
|
||||
])
|
||||
self.assertTrue(broker.get_own_shard_range().reported)
|
||||
|
||||
def test_update_root_container_own_range(self):
|
||||
|
@ -318,7 +318,7 @@ class TestController(unittest.TestCase):
|
||||
self.controller.account_info(self.account, self.request)
|
||||
set_http_connect(201, raise_timeout_exc=True)
|
||||
self.controller._make_request(
|
||||
nodes, partition, 'POST', '/', '', '', None,
|
||||
nodes, partition, 'POST', '/', {}, '', None,
|
||||
self.controller.app.logger.thread_locals)
|
||||
|
||||
# tests if 200 is cached and used
|
||||
|
Loading…
x
Reference in New Issue
Block a user