diff --git a/swift/container/backend.py b/swift/container/backend.py index 1a4bd5a188..998f04aff0 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -54,7 +54,13 @@ SHARD_STATS_STATES = [ShardRange.ACTIVE, ShardRange.SHARDING, SHARD_LISTING_STATES = SHARD_STATS_STATES + [ShardRange.CLEAVED] SHARD_UPDATE_STATES = [ShardRange.CREATED, ShardRange.CLEAVED, ShardRange.ACTIVE, ShardRange.SHARDING] - +# when auditing a shard gets its own shard range, which could be in any state +# except FOUND, and any potential acceptors excluding FOUND ranges that may be +# unwanted overlaps +SHARD_AUDITING_STATES = [ShardRange.CREATED, ShardRange.CLEAVED, + ShardRange.ACTIVE, ShardRange.SHARDING, + ShardRange.SHARDED, ShardRange.SHRINKING, + ShardRange.SHRUNK] # attribute names in order used when transforming shard ranges from dicts to # tuples and vice-versa @@ -1716,7 +1722,10 @@ class ContainerBroker(DatabaseBroker): The following alias values are supported: 'listing' maps to all states that are considered valid when listing objects; 'updating' maps to all - states that are considered valid for redirecting an object update. + states that are considered valid for redirecting an object update; + 'auditing' maps to all states that are considered valid for a shard + container that is updating its own shard range table from a root (this + currently maps to all states except FOUND). :param states: a list of values each of which may be the name of a state, the number of a state, or an alias @@ -1731,6 +1740,8 @@ class ContainerBroker(DatabaseBroker): resolved_states.update(SHARD_LISTING_STATES) elif state == 'updating': resolved_states.update(SHARD_UPDATE_STATES) + elif state == 'auditing': + resolved_states.update(SHARD_AUDITING_STATES) else: resolved_states.add(ShardRange.resolve_state(state)[0]) return resolved_states diff --git a/swift/container/server.py b/swift/container/server.py index 0292320fbe..77b91ce07c 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -749,10 +749,14 @@ class ContainerController(BaseStorageServer): marker = end_marker = includes = None reverse = False states = params.get('states') - fill_gaps = False + fill_gaps = include_own = False if states: states = list_from_csv(states) fill_gaps = any(('listing' in states, 'updating' in states)) + # 'auditing' is used during shard audit; if the shard is + # shrinking then it needs to get acceptor shard ranges, which + # may be the root container itself, so use include_own + include_own = 'auditing' in states try: states = broker.resolve_shard_range_states(states) except ValueError: @@ -761,7 +765,8 @@ class ContainerController(BaseStorageServer): req.headers.get('x-backend-include-deleted', False)) container_list = broker.get_shard_ranges( marker, end_marker, includes, reverse, states=states, - include_deleted=include_deleted, fill_gaps=fill_gaps) + include_deleted=include_deleted, fill_gaps=fill_gaps, + include_own=include_own) else: resp_headers = gen_resp_headers(info, is_deleted=is_deleted) if is_deleted: diff --git a/swift/container/sharder.py b/swift/container/sharder.py index f00fbf70f2..f511549fd5 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -317,8 +317,7 @@ class CleavingContext(object): def range_done(self, new_cursor): self.ranges_done += 1 self.ranges_todo -= 1 - if new_cursor is not None: - self.cursor = new_cursor + self.cursor = new_cursor def done(self): return all((self.misplaced_done, self.cleaving_done, @@ -744,11 +743,18 @@ class ContainerSharder(ContainerReplicator): shard_ranges = own_shard_range_from_root = None if own_shard_range: # Get the root view of the world, at least that part of the world - # that overlaps with this shard's namespace + # that overlaps with this shard's namespace. The + # 'states=auditing' parameter will cause the root to include + # its own shard range in the response, which is necessary for the + # particular case when this shard should be shrinking to the root + # container; when not shrinking to root, but to another acceptor, + # the root range should be in sharded state and will not interfere + # with cleaving, listing or updating behaviour. shard_ranges = self._fetch_shard_ranges( broker, newest=True, params={'marker': str_to_wsgi(own_shard_range.lower_str), - 'end_marker': str_to_wsgi(own_shard_range.upper_str)}, + 'end_marker': str_to_wsgi(own_shard_range.upper_str), + 'states': 'auditing'}, include_deleted=True) if shard_ranges: for shard_range in shard_ranges: @@ -1394,10 +1400,17 @@ class ContainerSharder(ContainerReplicator): quote(broker.path)) return cleaving_context.misplaced_done - ranges_todo = broker.get_shard_ranges(marker=cleaving_context.marker) + shard_ranges = broker.get_shard_ranges(marker=cleaving_context.marker) + # Ignore shrinking shard ranges: we never want to cleave objects to a + # shrinking shard. Shrinking shard ranges are to be expected in a root; + # shrinking shard ranges (other than own shard range) are not normally + # expected in a shard but can occur if there is an overlapping shard + # range that has been discovered from the root. + ranges_todo = [sr for sr in shard_ranges + if sr.state != ShardRange.SHRINKING] if cleaving_context.cursor: - # always update ranges_todo in case more ranges have been found - # since last visit + # always update ranges_todo in case shard ranges have changed since + # last visit cleaving_context.ranges_todo = len(ranges_todo) self.logger.debug('Continuing to cleave (%s done, %s todo): %s', cleaving_context.ranges_done, @@ -1411,36 +1424,36 @@ class ContainerSharder(ContainerReplicator): ranges_done = [] for shard_range in ranges_todo: - if shard_range.state == ShardRange.SHRINKING: - # Ignore shrinking shard ranges: we never want to cleave - # objects to a shrinking shard. Shrinking shard ranges are to - # be expected in a root; shrinking shard ranges (other than own - # shard range) are not normally expected in a shard but can - # occur if there is an overlapping shard range that has been - # discovered from the root. - cleaving_context.range_done(None) # don't move the cursor - continue - elif shard_range.state in (ShardRange.CREATED, - ShardRange.CLEAVED, - ShardRange.ACTIVE): - cleave_result = self._cleave_shard_range( - broker, cleaving_context, shard_range) - if cleave_result == CLEAVE_SUCCESS: - ranges_done.append(shard_range) - if len(ranges_done) == self.cleave_batch_size: - break - elif cleave_result == CLEAVE_FAILED: - break - # else, no errors, but no rows found either. keep going, - # and don't count it against our batch size - else: + if cleaving_context.cleaving_done: + # note: there may still be ranges_todo, for example: if this + # shard is shrinking and has merged a root shard range in + # sharded state along with an active acceptor shard range, but + # the root range is irrelevant + break + + if len(ranges_done) == self.cleave_batch_size: + break + + if shard_range.state not in (ShardRange.CREATED, + ShardRange.CLEAVED, + ShardRange.ACTIVE): self.logger.info('Stopped cleave at unready %s', shard_range) break - if not ranges_done: - # _cleave_shard_range always store()s the context on success; make - # sure we *also* do that if we hit a failure right off the bat - cleaving_context.store(broker) + cleave_result = self._cleave_shard_range( + broker, cleaving_context, shard_range) + + if cleave_result == CLEAVE_SUCCESS: + ranges_done.append(shard_range) + elif cleave_result == CLEAVE_FAILED: + break + # else: CLEAVE_EMPTY: no errors, but no rows found either. keep + # going, and don't count it against our batch size + + # _cleave_shard_range always store()s the context on success; *also* do + # that here in case we hit a failure right off the bat or ended loop + # with skipped ranges + cleaving_context.store(broker) self.logger.debug( 'Cleaved %s shard ranges for %s', len(ranges_done), quote(broker.path)) diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 40ab20e244..e9cba39a89 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -1450,14 +1450,32 @@ class TestContainerSharding(BaseTestContainerSharding): def test_shrinking(self): int_client = self.make_internal_client() - def check_node_data(node_data, exp_hdrs, exp_obj_count, exp_shards): + def check_node_data(node_data, exp_hdrs, exp_obj_count, exp_shards, + exp_sharded_root_range=False): hdrs, range_data = node_data self.assert_dict_contains(exp_hdrs, hdrs) - self.assert_shard_ranges_contiguous(exp_shards, range_data) - self.assert_total_object_count(exp_obj_count, range_data) + sharded_root_range = False + other_range_data = [] + for data in range_data: + sr = ShardRange.from_dict(data) + if (sr.account == self.account and + sr.container == self.container_name and + sr.state == ShardRange.SHARDED): + # only expect one root range + self.assertFalse(sharded_root_range, range_data) + sharded_root_range = True + self.assertEqual(ShardRange.MIN, sr.lower, sr) + self.assertEqual(ShardRange.MAX, sr.upper, sr) + else: + # include active root range in further assertions + other_range_data.append(data) + self.assertEqual(exp_sharded_root_range, sharded_root_range) + self.assert_shard_ranges_contiguous(exp_shards, other_range_data) + self.assert_total_object_count(exp_obj_count, other_range_data) def check_shard_nodes_data(node_data, expected_state='unsharded', - expected_shards=0, exp_obj_count=0): + expected_shards=0, exp_obj_count=0, + exp_sharded_root_range=False): # checks that shard range is consistent on all nodes root_path = '%s/%s' % (self.account, self.container_name) exp_shard_hdrs = { @@ -1469,7 +1487,7 @@ class TestContainerSharding(BaseTestContainerSharding): with annotate_failure('Node id %s.' % node_id): check_node_data( node_data, exp_shard_hdrs, exp_obj_count, - expected_shards) + expected_shards, exp_sharded_root_range) hdrs = node_data[0] object_counts.append(int(hdrs['X-Container-Object-Count'])) bytes_used.append(int(hdrs['X-Container-Bytes-Used'])) @@ -1668,10 +1686,13 @@ class TestContainerSharding(BaseTestContainerSharding): donor = orig_shard_ranges[0] shard_nodes_data = self.direct_get_container_shard_ranges( donor.account, donor.container) - # the donor's shard range will have the acceptor's projected stats + # the donor's shard range will have the acceptor's projected stats; + # donor also has copy of root shard range that will be ignored; + # note: expected_shards does not include the sharded root range obj_count, bytes_used = check_shard_nodes_data( shard_nodes_data, expected_state='sharded', expected_shards=1, - exp_obj_count=len(second_shard_objects) + 1) + exp_obj_count=len(second_shard_objects) + 1, + exp_sharded_root_range=True) # but the donor is empty and so reports zero stats self.assertEqual(0, obj_count) self.assertEqual(0, bytes_used) @@ -2700,6 +2721,29 @@ class TestManagedContainerSharding(BaseTestContainerSharding): self.assert_container_state(self.brain.nodes[2], 'sharded', 2) self.assert_container_listing(obj_names) + # Let's pretend that some actor in the system has determined that all + # the shard ranges should shrink back to root + # TODO: replace this db manipulation if/when manage_shard_ranges can + # manage shrinking... + broker = self.get_broker(self.brain.part, self.brain.nodes[0]) + shard_ranges = broker.get_shard_ranges() + self.assertEqual(2, len(shard_ranges)) + for sr in shard_ranges: + self.assertTrue(sr.update_state(ShardRange.SHRINKING)) + sr.epoch = sr.state_timestamp = Timestamp.now() + own_sr = broker.get_own_shard_range() + own_sr.update_state(ShardRange.ACTIVE, state_timestamp=Timestamp.now()) + broker.merge_shard_ranges(shard_ranges + [own_sr]) + + # replicate and run sharders + self.replicators.once() + self.sharders_once() + + self.assert_container_state(self.brain.nodes[0], 'collapsed', 0) + self.assert_container_state(self.brain.nodes[1], 'collapsed', 0) + self.assert_container_state(self.brain.nodes[2], 'collapsed', 0) + self.assert_container_listing(obj_names) + def test_manage_shard_ranges_used_poorly(self): obj_names = self._make_object_names(8) self.put_objects(obj_names) diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py index 0c4cb24522..c6a284ad48 100644 --- a/test/unit/container/test_backend.py +++ b/test/unit/container/test_backend.py @@ -3974,6 +3974,12 @@ class TestContainerBroker(unittest.TestCase): ContainerBroker.resolve_shard_range_states( ['updating', 'listing'])) + self.assertEqual( + {ShardRange.CREATED, ShardRange.CLEAVED, + ShardRange.ACTIVE, ShardRange.SHARDING, ShardRange.SHARDED, + ShardRange.SHRINKING, ShardRange.SHRUNK}, + ContainerBroker.resolve_shard_range_states(['auditing'])) + def check_bad_value(value): with self.assertRaises(ValueError) as cm: ContainerBroker.resolve_shard_range_states(value) @@ -4072,7 +4078,7 @@ class TestContainerBroker(unittest.TestCase): self.assertFalse(actual) @with_tempdir - def test_overloap_shard_range_order(self, tempdir): + def test_overlap_shard_range_order(self, tempdir): db_path = os.path.join(tempdir, 'container.db') broker = ContainerBroker(db_path, account='a', container='c') broker.initialize(next(self.ts).internal, 0) diff --git a/test/unit/container/test_server.py b/test/unit/container/test_server.py index a5a081c5b4..28bbd9862d 100644 --- a/test/unit/container/test_server.py +++ b/test/unit/container/test_server.py @@ -3178,6 +3178,98 @@ class TestContainerController(unittest.TestCase): do_test({'states': 'bad'}, 404) + def test_GET_shard_ranges_auditing(self): + # verify that states=auditing causes own shard range to be included + def put_shard_ranges(shard_ranges): + headers = {'X-Timestamp': next(self.ts).normal, + 'X-Backend-Record-Type': 'shard'} + body = json.dumps([dict(sr) for sr in shard_ranges]) + req = Request.blank( + '/sda1/p/a/c', method='PUT', headers=headers, body=body) + self.assertEqual(202, req.get_response(self.controller).status_int) + + def do_test(ts_now, extra_params): + headers = {'X-Backend-Record-Type': 'shard', + 'X-Backend-Include-Deleted': 'True'} + params = {'format': 'json'} + if extra_params: + params.update(extra_params) + req = Request.blank('/sda1/p/a/c?format=json', method='GET', + headers=headers, params=params) + with mock_timestamp_now(ts_now): + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.content_type, 'application/json') + self.assertIn('X-Backend-Record-Type', resp.headers) + self.assertEqual('shard', resp.headers['X-Backend-Record-Type']) + return resp + + # initially not all shards are shrinking and root is sharded + own_sr = ShardRange('a/c', next(self.ts), '', '', + state=ShardRange.SHARDED) + shard_bounds = [('', 'f', ShardRange.SHRUNK, True), + ('f', 't', ShardRange.SHRINKING, False), + ('t', '', ShardRange.ACTIVE, False)] + shard_ranges = [ + ShardRange('.shards_a/_%s' % upper, next(self.ts), + lower, upper, state=state, deleted=deleted) + for (lower, upper, state, deleted) in shard_bounds] + overlap = ShardRange('.shards_a/c_bad', next(self.ts), '', 'f', + state=ShardRange.FOUND) + + # create container and PUT some shard ranges + headers = {'X-Timestamp': next(self.ts).normal} + req = Request.blank( + '/sda1/p/a/c', method='PUT', headers=headers) + self.assertIn( + req.get_response(self.controller).status_int, (201, 202)) + put_shard_ranges(shard_ranges + [own_sr, overlap]) + + # do *not* expect own shard range in default case (no states param) + ts_now = next(self.ts) + expected = [dict(sr, last_modified=sr.timestamp.isoformat) + for sr in [overlap] + shard_ranges] + resp = do_test(ts_now, {}) + self.assertEqual(expected, json.loads(resp.body)) + + # expect own shard range to be included when states=auditing + expected = [dict(sr, last_modified=sr.timestamp.isoformat) + for sr in shard_ranges + [own_sr]] + resp = do_test(ts_now, {'states': 'auditing'}) + self.assertEqual(expected, json.loads(resp.body)) + + # expect own shard range to be included, marker/end_marker respected + expected = [dict(sr, last_modified=sr.timestamp.isoformat) + for sr in shard_ranges[1:2] + [own_sr]] + resp = do_test(ts_now, {'marker': 'f', 'end_marker': 't', + 'states': 'auditing'}) + self.assertEqual(expected, json.loads(resp.body)) + + # update shards to all shrinking and root to active + shard_ranges[-1].update_state(ShardRange.SHRINKING, next(self.ts)) + own_sr.update_state(ShardRange.ACTIVE, next(self.ts)) + put_shard_ranges(shard_ranges + [own_sr]) + + # do *not* expect own shard range in default case (no states param) + ts_now = next(self.ts) + expected = [dict(sr, last_modified=sr.timestamp.isoformat) + for sr in [overlap] + shard_ranges] + resp = do_test(ts_now, {}) + self.assertEqual(expected, json.loads(resp.body)) + + # expect own shard range to be included when states=auditing + expected = [dict(sr, last_modified=sr.timestamp.isoformat) + for sr in shard_ranges[:2] + [own_sr] + shard_ranges[2:]] + resp = do_test(ts_now, {'states': 'auditing'}) + self.assertEqual(expected, json.loads(resp.body)) + + # expect own shard range to be included, marker/end_marker respected + expected = [dict(sr, last_modified=sr.timestamp.isoformat) + for sr in shard_ranges[1:2] + [own_sr]] + resp = do_test(ts_now, {'marker': 'f', 'end_marker': 't', + 'states': 'auditing'}) + self.assertEqual(expected, json.loads(resp.body)) + def test_GET_auto_record_type(self): # make a container ts_iter = make_timestamp_iter() diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 39313c54f0..031df7239e 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -2258,14 +2258,14 @@ class TestSharder(BaseTestSharder): self.assertTrue(broker.set_sharding_state()) # run cleave - first batch is cleaved, shrinking range doesn't count - # towards batch size of 2 but does count towards ranges_done + # towards batch size of 2 nor towards ranges_done with self._mock_sharder() as sharder: self.assertFalse(sharder._cleave(broker)) context = CleavingContext.load(broker) self.assertTrue(context.misplaced_done) self.assertFalse(context.cleaving_done) self.assertEqual(shard_ranges[2].upper_str, context.cursor) - self.assertEqual(3, context.ranges_done) + self.assertEqual(2, context.ranges_done) self.assertEqual(2, context.ranges_todo) # run cleave - stops at shard range in FOUND state @@ -2275,7 +2275,7 @@ class TestSharder(BaseTestSharder): self.assertTrue(context.misplaced_done) self.assertFalse(context.cleaving_done) self.assertEqual(shard_ranges[3].upper_str, context.cursor) - self.assertEqual(4, context.ranges_done) + self.assertEqual(3, context.ranges_done) self.assertEqual(1, context.ranges_todo) # run cleave - final shard range in CREATED state, cleaving proceeds @@ -2288,9 +2288,149 @@ class TestSharder(BaseTestSharder): self.assertTrue(context.misplaced_done) self.assertTrue(context.cleaving_done) self.assertEqual(shard_ranges[4].upper_str, context.cursor) - self.assertEqual(5, context.ranges_done) + self.assertEqual(4, context.ranges_done) self.assertEqual(0, context.ranges_todo) + def test_cleave_shrinking_to_active_root_range(self): + broker = self._make_broker(account='.shards_a', container='shard_c') + broker.put_object( + 'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0) + # a donor previously shrunk to own... + deleted_range = ShardRange( + '.shards/other', next(self.ts_iter), 'here', 'there', deleted=True, + state=ShardRange.SHRUNK, epoch=next(self.ts_iter)) + own_shard_range = ShardRange( + broker.path, next(self.ts_iter), 'here', '', + state=ShardRange.SHRINKING, epoch=next(self.ts_iter)) + # root is the acceptor... + root = ShardRange( + 'a/c', next(self.ts_iter), '', '', + state=ShardRange.ACTIVE, epoch=next(self.ts_iter)) + broker.merge_shard_ranges([deleted_range, own_shard_range, root]) + broker.set_sharding_sysmeta('Root', 'a/c') + self.assertFalse(broker.is_root_container()) # sanity check + self.assertTrue(broker.set_sharding_state()) + + # expect cleave to the root + with self._mock_sharder() as sharder: + self.assertTrue(sharder._cleave(broker)) + context = CleavingContext.load(broker) + self.assertTrue(context.misplaced_done) + self.assertTrue(context.cleaving_done) + self.assertEqual(root.upper_str, context.cursor) + self.assertEqual(1, context.ranges_done) + self.assertEqual(0, context.ranges_todo) + + def test_cleave_shrinking_to_active_acceptor_with_sharded_root_range(self): + broker = self._make_broker(account='.shards_a', container='shard_c') + broker.put_object( + 'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0) + own_shard_range = ShardRange( + broker.path, next(self.ts_iter), 'here', 'there', + state=ShardRange.SHARDING, epoch=next(self.ts_iter)) + # the intended acceptor... + acceptor = ShardRange( + '.shards_a/shard_d', next(self.ts_iter), 'here', '', + state=ShardRange.ACTIVE, epoch=next(self.ts_iter)) + # root range also gets pulled from root during audit... + root = ShardRange( + 'a/c', next(self.ts_iter), '', '', + state=ShardRange.SHARDED, epoch=next(self.ts_iter)) + broker.merge_shard_ranges([own_shard_range, acceptor, root]) + broker.set_sharding_sysmeta('Root', 'a/c') + self.assertFalse(broker.is_root_container()) # sanity check + self.assertTrue(broker.set_sharding_state()) + + # sharded root range should always sort after an active acceptor so + # expect cleave to acceptor first then cleaving completes + with self._mock_sharder() as sharder: + self.assertTrue(sharder._cleave(broker)) + context = CleavingContext.load(broker) + self.assertTrue(context.misplaced_done) + self.assertTrue(context.cleaving_done) + self.assertEqual(acceptor.upper_str, context.cursor) + self.assertEqual(1, context.ranges_done) # cleaved the acceptor + self.assertEqual(1, context.ranges_todo) # never reached sharded root + + def test_cleave_shrinking_to_active_root_range_with_active_acceptor(self): + # if shrinking shard has both active root and active other acceptor, + # verify that shard only cleaves to one of them; + # root will sort before acceptor if acceptor.upper==MAX + broker = self._make_broker(account='.shards_a', container='shard_c') + broker.put_object( + 'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0) + own_shard_range = ShardRange( + broker.path, next(self.ts_iter), 'here', 'there', + state=ShardRange.SHRINKING, epoch=next(self.ts_iter)) + # active acceptor with upper bound == MAX + acceptor = ShardRange( + '.shards/other', next(self.ts_iter), 'here', '', deleted=False, + state=ShardRange.ACTIVE, epoch=next(self.ts_iter)) + # root is also active + root = ShardRange( + 'a/c', next(self.ts_iter), '', '', + state=ShardRange.ACTIVE, epoch=next(self.ts_iter)) + broker.merge_shard_ranges([own_shard_range, acceptor, root]) + broker.set_sharding_sysmeta('Root', 'a/c') + self.assertFalse(broker.is_root_container()) # sanity check + self.assertTrue(broker.set_sharding_state()) + + # expect cleave to the root + acceptor.upper = '' + acceptor.timestamp = next(self.ts_iter) + broker.merge_shard_ranges([acceptor]) + with self._mock_sharder() as sharder: + self.assertTrue(sharder._cleave(broker)) + context = CleavingContext.load(broker) + self.assertTrue(context.misplaced_done) + self.assertTrue(context.cleaving_done) + self.assertEqual(root.upper_str, context.cursor) + self.assertEqual(1, context.ranges_done) + self.assertEqual(1, context.ranges_todo) + info = [ + line for line in self.logger.get_lines_for_level('info') + if line.startswith('Replicating new shard container a/c') + ] + self.assertEqual(1, len(info)) + + def test_cleave_shrinking_to_active_acceptor_with_active_root_range(self): + # if shrinking shard has both active root and active other acceptor, + # verify that shard only cleaves to one of them; + # root will sort after acceptor if acceptor.upper