Let developers/operators add watchers to object audit
Swift operators may find it useful to operate on each object in their cluster in some way. This commit provides them a way to hook into the object auditor with a simple, clearly-defined boundary so that they can iterate over their objects without additional disk IO. For example, a cluster operator may want to ensure a semantic consistency with all SLO segments accounted in their manifests, or locate objects that aren't in container listings. Now that Swift has encryption support, this could be used to locate unencrypted objects. The list goes on. This commit makes the auditor locate, via entry points, the watchers named in its config file. A watcher is a class with at least these four methods: __init__(self, conf, logger, **kwargs) start(self, audit_type, **kwargs) see_object(self, object_metadata, data_file_path, **kwargs) end(self, **kwargs) The auditor will call watcher.start(audit_type) at the start of an audit pass, watcher.see_object(...) for each object audited, and watcher.end() at the end of an audit pass. All method arguments are passed as keyword args. This version of the API is implemented on the context of the auditor itself, without spawning any additional processes. If the plugins are not working well -- hang, crash, or leak -- it's easier to debug them when there's no additional complication of processes that run by themselves. In addition, we include a reference implementation of plugin for the watcher API, as a help to plugin writers. Change-Id: I1be1faec53b2cdfaabf927598f1460e23c206b0a
This commit is contained in:
parent
e0d46d77fa
commit
b971280907
126
doc/source/development_watchers.rst
Normal file
126
doc/source/development_watchers.rst
Normal file
@ -0,0 +1,126 @@
|
||||
================
|
||||
Auditor Watchers
|
||||
================
|
||||
|
||||
--------
|
||||
Overview
|
||||
--------
|
||||
|
||||
The duty of auditors is to guard Swift against corruption in the
|
||||
storage media. But because auditors crawl all objects, they can be
|
||||
used to program Swift to operate on every object. It is done through
|
||||
an API known as "watcher".
|
||||
|
||||
Watchers do not have any private view into the cluster.
|
||||
An operator can write a standalone program that walks the
|
||||
directories and performs any desired inspection or maintenance.
|
||||
What watcher brings to the table is a framework to do the same
|
||||
job easily, under resource restrictions already in place
|
||||
for the auditor.
|
||||
|
||||
Operations performed by watchers are often site-specific, or else
|
||||
they would be incorporated into Swift already. However, the code in
|
||||
the tree provides a reference implementation for convenience.
|
||||
It is located in swift/obj/watchers/dark_data.py and implements
|
||||
so-called "Dark Data Watcher".
|
||||
|
||||
Currently, only object auditor supports the watchers.
|
||||
|
||||
-------------
|
||||
The API class
|
||||
-------------
|
||||
|
||||
The implementation of a watcher is a Python class that may look like this::
|
||||
|
||||
class MyWatcher(object):
|
||||
|
||||
def __init__(self, conf, logger, **kwargs):
|
||||
pass
|
||||
|
||||
def start(self, audit_type, **kwargs):
|
||||
pass
|
||||
|
||||
def see_object(self, object_metadata, policy_index, partition,
|
||||
data_file_path, **kwargs):
|
||||
pass
|
||||
|
||||
def end(self, **kwargs):
|
||||
pass
|
||||
|
||||
Arguments to watcher methods are passed as keyword arguments,
|
||||
and methods are expected to consume new, unknown arguments.
|
||||
|
||||
The method __init__() is used to save configuration and logger
|
||||
at the start of the plug-in.
|
||||
|
||||
The method start() is invoked when auditor starts a pass.
|
||||
It usually resets counters. The argument `auditor_type` is string of
|
||||
`"ALL"` or `"ZBF"`, according to the type of the auditor running
|
||||
the watcher. Watchers that talk to the network tend to hang off the
|
||||
ALL-type auditor, the lightweight ones are okay with the ZBF-type.
|
||||
|
||||
The method end() is the closing bracket for start(). It is typically
|
||||
used to log something, or dump some statistics.
|
||||
|
||||
The method see_object() is called when auditor completed an audit
|
||||
of an object. This is where most of the work is done.
|
||||
|
||||
The protocol for see_object() allows it to raise a special exception,
|
||||
QuarantienRequested. Auditor catches it and quarantines the object.
|
||||
In general, it's okay for watcher methods to throw exceptions, so
|
||||
an author of a watcher plugin does not have to catch them explicitly
|
||||
with a try:; they can be just permitted to bubble up naturally.
|
||||
|
||||
-------------------
|
||||
Loading the plugins
|
||||
-------------------
|
||||
|
||||
Swift auditor loads watcher classes from eggs, so it is necessary
|
||||
to wrap the class and provide it an entry point::
|
||||
|
||||
$ cat /usr/lib/python3.8/site-p*/mywatcher*egg-info/entry_points.txt
|
||||
[mywatcher.mysection]
|
||||
mywatcherentry = mywatcher:MyWatcher
|
||||
|
||||
Operator tells Swift auditor what plugins to load by adding them
|
||||
to object-server.conf in the section [object-auditor]. It is also
|
||||
possible to pass parameters, arriving in the argument conf{} of
|
||||
method start()::
|
||||
|
||||
[object-auditor]
|
||||
watchers = mywatcher#mywatcherentry,swift#dark_data
|
||||
|
||||
[object-auditor:watcher:mywatcher#mywatcherentry]
|
||||
myparam=testing2020
|
||||
|
||||
Do not forget to remove the watcher from auditors when done.
|
||||
Although the API itself is very lightweight, it is common for watchers
|
||||
to incur a significant performance penalty: they can talk to networked
|
||||
services or access additional objects.
|
||||
|
||||
-----------------
|
||||
Dark Data Watcher
|
||||
-----------------
|
||||
|
||||
The watcher API is assumed to be under development. Operators who
|
||||
need extensions are welcome to report any needs for more arguments
|
||||
to see_object(). For now, start by copying the provided template watcher
|
||||
swift/obj/watchers/dark_data.py and see if it is sufficient.
|
||||
|
||||
The name of "Dark Data" refers to the scientific hypothesis of Dark Matter,
|
||||
which supposes that the universe contains a lot of matter than we cannot
|
||||
observe. The Dark Data in Swift is the name of objects that are not
|
||||
accounted in the containers.
|
||||
|
||||
The experience of running large scale clusters suggests that Swift does
|
||||
not have any particular bugs that trigger creation of dark data. So,
|
||||
this is an excercise in writing watchers, with a plausible function.
|
||||
|
||||
When enabled, Dark Data watcher definitely drags down the cluster's overall
|
||||
performance, as mentioned above. Of course, the load increase can be
|
||||
mitigated as usual, but at the expense of the total time taken by
|
||||
the pass of auditor.
|
||||
|
||||
Finally, keep in mind that Dark Data watcher needs the container
|
||||
ring to operate, but runs on an object node. This can come up if
|
||||
cluster has nodes separated by function.
|
@ -88,6 +88,7 @@ Developer Documentation
|
||||
development_auth
|
||||
development_middleware
|
||||
development_ondisk_backends
|
||||
development_watchers
|
||||
|
||||
Administrator Documentation
|
||||
===========================
|
||||
|
@ -480,6 +480,27 @@ use = egg:swift#recon
|
||||
# to 86400 (1 day).
|
||||
# rsync_tempfile_timeout = auto
|
||||
|
||||
# A comma-separated list of watcher entry points. This lets operators
|
||||
# programmatically see audited objects.
|
||||
#
|
||||
# The entry point group name is "swift.object_audit_watcher". If your
|
||||
# setup.py has something like this:
|
||||
#
|
||||
# entry_points={'swift.object_audit_watcher': [
|
||||
# 'some_watcher = some_module:Watcher']}
|
||||
#
|
||||
# then you would enable it with "watchers = some_package#some_watcher".
|
||||
# For example, the built-in reference implementation is enabled as
|
||||
# "watchers = swift#dark_data".
|
||||
#
|
||||
# watchers =
|
||||
|
||||
# Watcher-specific parameters can he added after "object-auditor:watcher:"
|
||||
# like the following (note that entry points are qualified by package#):
|
||||
#
|
||||
# [object-auditor:watcher:swift#dark_data]
|
||||
# action=log
|
||||
|
||||
[object-expirer]
|
||||
# If this true, this expirer will execute tasks from legacy expirer task queue,
|
||||
# at least one object server should run with dequeue_from_legacy = true
|
||||
|
@ -133,6 +133,9 @@ swift.diskfile =
|
||||
replication.fs = swift.obj.diskfile:DiskFileManager
|
||||
erasure_coding.fs = swift.obj.diskfile:ECDiskFileManager
|
||||
|
||||
swift.object_audit_watcher =
|
||||
dark_data = swift.obj.watchers.dark_data:DarkDataWatcher
|
||||
|
||||
[egg_info]
|
||||
tag_build =
|
||||
tag_date = 0
|
||||
|
@ -235,6 +235,10 @@ class UnknownSecretIdError(EncryptionException):
|
||||
pass
|
||||
|
||||
|
||||
class QuarantineRequest(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class ClientException(Exception):
|
||||
|
||||
def __init__(self, msg, http_scheme='', http_host='', http_port='',
|
||||
|
@ -3071,6 +3071,27 @@ def readconf(conf_path, section_name=None, log_name=None, defaults=None,
|
||||
return conf
|
||||
|
||||
|
||||
def parse_prefixed_conf(conf_file, prefix):
|
||||
"""
|
||||
Search the config file for any common-prefix sections and load those
|
||||
sections to a dict mapping the after-prefix reference to options.
|
||||
|
||||
:param conf_file: the file name of the config to parse
|
||||
:param prefix: the common prefix of the sections
|
||||
:return: a dict mapping policy reference -> dict of policy options
|
||||
:raises ValueError: if a policy config section has an invalid name
|
||||
"""
|
||||
|
||||
ret_config = {}
|
||||
all_conf = readconf(conf_file)
|
||||
for section, options in all_conf.items():
|
||||
if not section.startswith(prefix):
|
||||
continue
|
||||
target_ref = section[len(prefix):]
|
||||
ret_config[target_ref] = options
|
||||
return ret_config
|
||||
|
||||
|
||||
def write_pickle(obj, dest, tmp=None, pickle_protocol=0):
|
||||
"""
|
||||
Ensure that a pickle file gets written to disk. The file
|
||||
|
@ -25,19 +25,23 @@ from contextlib import closing
|
||||
from eventlet import Timeout
|
||||
|
||||
from swift.obj import diskfile, replicator
|
||||
from swift.common.utils import (
|
||||
get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir,
|
||||
unlink_paths_older_than, readconf, config_auto_int_value, round_robin_iter)
|
||||
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist,\
|
||||
DiskFileDeleted, DiskFileExpired
|
||||
DiskFileDeleted, DiskFileExpired, QuarantineRequest
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.utils import (
|
||||
config_auto_int_value, dump_recon_cache, get_logger, list_from_csv,
|
||||
listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep,
|
||||
readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter)
|
||||
|
||||
|
||||
class AuditorWorker(object):
|
||||
"""Walk through file system to audit objects"""
|
||||
|
||||
def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0):
|
||||
def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0,
|
||||
watcher_defs=None):
|
||||
if watcher_defs is None:
|
||||
watcher_defs = {}
|
||||
self.conf = conf
|
||||
self.logger = logger
|
||||
self.devices = devices
|
||||
@ -95,6 +99,11 @@ class AuditorWorker(object):
|
||||
self.stats_buckets = dict(
|
||||
[(s, 0) for s in self.stats_sizes + ['OVER']])
|
||||
|
||||
self.watchers = [
|
||||
WatcherWrapper(wdef['klass'], name, wdef['conf'], logger)
|
||||
for name, wdef in watcher_defs.items()]
|
||||
logger.debug("%d audit watcher(s) loaded", len(self.watchers))
|
||||
|
||||
def create_recon_nested_dict(self, top_level_key, device_list, item):
|
||||
if device_list:
|
||||
device_key = ''.join(sorted(device_list))
|
||||
@ -114,6 +123,8 @@ class AuditorWorker(object):
|
||||
'%(description)s)') %
|
||||
{'mode': mode, 'audi_type': self.auditor_type,
|
||||
'description': description})
|
||||
for watcher in self.watchers:
|
||||
watcher.start(self.auditor_type)
|
||||
begin = reported = time.time()
|
||||
self.total_bytes_processed = 0
|
||||
self.total_files_processed = 0
|
||||
@ -187,6 +198,8 @@ class AuditorWorker(object):
|
||||
'frate': self.total_files_processed / elapsed,
|
||||
'brate': self.total_bytes_processed / elapsed,
|
||||
'audit': time_auditing, 'audit_rate': time_auditing / elapsed})
|
||||
for watcher in self.watchers:
|
||||
watcher.end()
|
||||
if self.stats_sizes:
|
||||
self.logger.info(
|
||||
_('Object audit stats: %s') % json.dumps(self.stats_buckets))
|
||||
@ -259,6 +272,15 @@ class AuditorWorker(object):
|
||||
incr_by=chunk_len)
|
||||
self.bytes_processed += chunk_len
|
||||
self.total_bytes_processed += chunk_len
|
||||
for watcher in self.watchers:
|
||||
try:
|
||||
watcher.see_object(
|
||||
metadata,
|
||||
df._ondisk_info['data_file'])
|
||||
except QuarantineRequest:
|
||||
raise df._quarantine(
|
||||
df._data_file,
|
||||
"Requested by %s" % watcher.watcher_name)
|
||||
except DiskFileQuarantined as err:
|
||||
self.quarantines += 1
|
||||
self.logger.error(_('ERROR Object %(obj)s failed audit and was'
|
||||
@ -303,6 +325,20 @@ class ObjectAuditor(Daemon):
|
||||
self.rcache = join(self.recon_cache_path, "object.recon")
|
||||
self.interval = int(conf.get('interval', 30))
|
||||
|
||||
watcher_names = set(list_from_csv(conf.get('watchers', '')))
|
||||
# Normally '__file__' is always in config, but tests neglect it often.
|
||||
watcher_configs = \
|
||||
parse_prefixed_conf(conf['__file__'], 'object-auditor:watcher:') \
|
||||
if '__file__' in conf else {}
|
||||
self.watcher_defs = {}
|
||||
for name in watcher_names:
|
||||
self.logger.debug("Loading entry point '%s'", name)
|
||||
wconf = dict(conf)
|
||||
wconf.update(watcher_configs.get(name, {}))
|
||||
self.watcher_defs[name] = {
|
||||
'conf': wconf,
|
||||
'klass': load_pkg_resource("swift.object_audit_watcher", name)}
|
||||
|
||||
def _sleep(self):
|
||||
time.sleep(self.interval)
|
||||
|
||||
@ -318,7 +354,8 @@ class ObjectAuditor(Daemon):
|
||||
device_dirs = kwargs.get('device_dirs')
|
||||
worker = AuditorWorker(self.conf, self.logger, self.rcache,
|
||||
self.devices,
|
||||
zero_byte_only_at_fps=zero_byte_only_at_fps)
|
||||
zero_byte_only_at_fps=zero_byte_only_at_fps,
|
||||
watcher_defs=self.watcher_defs)
|
||||
worker.audit_all_objects(mode=mode, device_dirs=device_dirs)
|
||||
|
||||
def fork_child(self, zero_byte_fps=False, sleep_between_zbf_scanner=False,
|
||||
@ -438,3 +475,62 @@ class ObjectAuditor(Daemon):
|
||||
**kwargs)
|
||||
except (Exception, Timeout) as err:
|
||||
self.logger.exception(_('ERROR auditing: %s'), err)
|
||||
|
||||
|
||||
class WatcherWrapper(object):
|
||||
"""
|
||||
Run the user-supplied watcher.
|
||||
|
||||
Simple and gets the job done. Note that we aren't doing anything
|
||||
to isolate ourselves from hangs or file descriptor leaks
|
||||
in the plugins.
|
||||
"""
|
||||
|
||||
def __init__(self, watcher_class, watcher_name, conf, logger):
|
||||
self.watcher_name = watcher_name
|
||||
self.watcher_in_error = False
|
||||
self.logger = PrefixLoggerAdapter(logger, {})
|
||||
self.logger.set_prefix('[audit-watcher %s] ' % watcher_name)
|
||||
|
||||
try:
|
||||
self.watcher = watcher_class(conf, self.logger)
|
||||
except (Exception, Timeout):
|
||||
self.logger.exception('Error intializing watcher')
|
||||
self.watcher_in_error = True
|
||||
|
||||
def start(self, audit_type):
|
||||
if self.watcher_in_error:
|
||||
return # can't trust the state of the thing; bail
|
||||
try:
|
||||
self.watcher.start(audit_type=audit_type)
|
||||
except (Exception, Timeout):
|
||||
self.logger.exception('Error starting watcher')
|
||||
self.watcher_in_error = True
|
||||
|
||||
def see_object(self, meta, data_file_path):
|
||||
if self.watcher_in_error:
|
||||
return # can't trust the state of the thing; bail
|
||||
kwargs = {'object_metadata': meta,
|
||||
'data_file_path': data_file_path}
|
||||
try:
|
||||
self.watcher.see_object(**kwargs)
|
||||
except QuarantineRequest:
|
||||
# Avoid extra logging.
|
||||
raise
|
||||
except (Exception, Timeout):
|
||||
self.logger.exception(
|
||||
'Error in see_object(meta=%r, data_file_path=%r)',
|
||||
meta, data_file_path)
|
||||
# Do *not* flag watcher as being in an error state; a failure
|
||||
# to process one object shouldn't impact the ability to process
|
||||
# others.
|
||||
|
||||
def end(self):
|
||||
if self.watcher_in_error:
|
||||
return # can't trust the state of the thing; bail
|
||||
kwargs = {}
|
||||
try:
|
||||
self.watcher.end(**kwargs)
|
||||
except (Exception, Timeout):
|
||||
self.logger.exception('Error ending watcher')
|
||||
self.watcher_in_error = True
|
||||
|
0
swift/obj/watchers/__init__.py
Normal file
0
swift/obj/watchers/__init__.py
Normal file
146
swift/obj/watchers/dark_data.py
Normal file
146
swift/obj/watchers/dark_data.py
Normal file
@ -0,0 +1,146 @@
|
||||
# Copyright (c) 2019 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
#
|
||||
# This is an audit watcher that manages the dark data in the cluster.
|
||||
# Since the API for audit watchers is intended to use external plugins,
|
||||
# this code is invoked as if it were external: through pkg_resources.
|
||||
# Our setup.py comes pre-configured for convenience, but the operator has
|
||||
# to enable this watcher honestly by additing DarkDataWatcher to watchers=
|
||||
# in object-server.conf. The default is off, as if this does not exist.
|
||||
# Which is for the best, because of a large performance impact of this.
|
||||
#
|
||||
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
|
||||
from eventlet import Timeout
|
||||
|
||||
from swift.common.direct_client import direct_get_container
|
||||
from swift.common.exceptions import ClientException, QuarantineRequest
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import split_path
|
||||
|
||||
|
||||
class ContainerError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DarkDataWatcher(object):
|
||||
def __init__(self, conf, logger):
|
||||
|
||||
self.logger = logger
|
||||
|
||||
swift_dir = '/etc/swift'
|
||||
self.container_ring = Ring(swift_dir, ring_name='container')
|
||||
self.dark_data_policy = conf.get('action')
|
||||
if self.dark_data_policy not in ['log', 'delete', 'quarantine']:
|
||||
self.logger.warning(
|
||||
"Dark data action %r unknown, defaults to action = 'log'" %
|
||||
(self.dark_data_policy,))
|
||||
self.dark_data_policy = 'log'
|
||||
|
||||
def start(self, audit_type, **other_kwargs):
|
||||
self.is_zbf = audit_type == 'ZBF'
|
||||
self.tot_unknown = 0
|
||||
self.tot_dark = 0
|
||||
self.tot_okay = 0
|
||||
|
||||
def policy_based_object_handling(self, data_file_path, metadata):
|
||||
obj_path = metadata['name']
|
||||
|
||||
if self.dark_data_policy == "quarantine":
|
||||
self.logger.info("quarantining dark data %s" % obj_path)
|
||||
raise QuarantineRequest
|
||||
elif self.dark_data_policy == "log":
|
||||
self.logger.info("reporting dark data %s" % obj_path)
|
||||
elif self.dark_data_policy == "delete":
|
||||
obj_dir = os.path.dirname(data_file_path)
|
||||
self.logger.info("deleting dark data %s" % obj_dir)
|
||||
shutil.rmtree(obj_dir)
|
||||
|
||||
def see_object(self, object_metadata, data_file_path, **other_kwargs):
|
||||
|
||||
# No point in loading the container servers with unnecessary requests.
|
||||
if self.is_zbf:
|
||||
return
|
||||
|
||||
obj_path = object_metadata['name']
|
||||
try:
|
||||
obj_info = get_info_1(self.container_ring, obj_path, self.logger)
|
||||
except ContainerError:
|
||||
self.tot_unknown += 1
|
||||
return
|
||||
|
||||
if obj_info is None:
|
||||
self.tot_dark += 1
|
||||
self.policy_based_object_handling(data_file_path, object_metadata)
|
||||
else:
|
||||
# OK, object is there, but in the future we might want to verify
|
||||
# more. Watch out for versioned objects, EC, and all that.
|
||||
self.tot_okay += 1
|
||||
|
||||
def end(self, **other_kwargs):
|
||||
if self.is_zbf:
|
||||
return
|
||||
self.logger.info("total unknown %d ok %d dark %d" %
|
||||
(self.tot_unknown, self.tot_okay, self.tot_dark))
|
||||
|
||||
|
||||
#
|
||||
# Get the information for 1 object from container server
|
||||
#
|
||||
def get_info_1(container_ring, obj_path, logger):
|
||||
|
||||
path_comps = split_path(obj_path, 1, 3, True)
|
||||
account_name = path_comps[0]
|
||||
container_name = path_comps[1]
|
||||
obj_name = path_comps[2]
|
||||
|
||||
container_part, container_nodes = \
|
||||
container_ring.get_nodes(account_name, container_name)
|
||||
|
||||
if not container_nodes:
|
||||
raise ContainerError()
|
||||
|
||||
# Perhaps we should do something about the way we select the container
|
||||
# nodes. For now we just shuffle. It spreads the load, but it does not
|
||||
# improve upon the the case when some nodes are down, so auditor slows
|
||||
# to a crawl (if this plugin is enabled).
|
||||
random.shuffle(container_nodes)
|
||||
|
||||
dark_flag = 0
|
||||
for node in container_nodes:
|
||||
try:
|
||||
headers, objs = direct_get_container(
|
||||
node, container_part, account_name, container_name,
|
||||
prefix=obj_name, limit=1)
|
||||
except (ClientException, Timeout):
|
||||
# Something is wrong with that server, treat as an error.
|
||||
continue
|
||||
if not objs or objs[0]['name'] != obj_name:
|
||||
dark_flag += 1
|
||||
continue
|
||||
return objs[0]
|
||||
|
||||
# We do not ask for a quorum of container servers to know the object.
|
||||
# Even if 1 server knows the object, we return with the info above.
|
||||
# So, we only end here when all servers either have no record of the
|
||||
# object or error out. In such case, even one non-error server means
|
||||
# that the object is dark.
|
||||
if dark_flag:
|
||||
return None
|
||||
raise ContainerError()
|
@ -35,7 +35,7 @@ from swift.common.ring import Ring
|
||||
from swift.common.utils import Watchdog, get_logger, \
|
||||
get_remote_client, split_path, config_true_value, generate_trans_id, \
|
||||
affinity_key_function, affinity_locality_predicate, list_from_csv, \
|
||||
register_swift_info, readconf, config_auto_int_value
|
||||
register_swift_info, parse_prefixed_conf, config_auto_int_value
|
||||
from swift.common.constraints import check_utf8, valid_api_version
|
||||
from swift.proxy.controllers import AccountController, ContainerController, \
|
||||
ObjectControllerRouter, InfoController
|
||||
@ -773,15 +773,8 @@ def parse_per_policy_config(conf):
|
||||
:return: a dict mapping policy reference -> dict of policy options
|
||||
:raises ValueError: if a policy config section has an invalid name
|
||||
"""
|
||||
policy_config = {}
|
||||
all_conf = readconf(conf['__file__'])
|
||||
policy_section_prefix = conf['__name__'] + ':policy:'
|
||||
for section, options in all_conf.items():
|
||||
if not section.startswith(policy_section_prefix):
|
||||
continue
|
||||
policy_ref = section[len(policy_section_prefix):]
|
||||
policy_config[policy_ref] = options
|
||||
return policy_config
|
||||
return parse_prefixed_conf(conf['__file__'], policy_section_prefix)
|
||||
|
||||
|
||||
def app_factory(global_conf, **local_conf):
|
||||
|
@ -416,6 +416,11 @@ class ProbeTest(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
Manager(['all']).kill()
|
||||
|
||||
def assertLengthEqual(self, obj, length):
|
||||
obj_len = len(obj)
|
||||
self.assertEqual(obj_len, length, 'len(%r) == %d, not %d' % (
|
||||
obj, obj_len, length))
|
||||
|
||||
def device_dir(self, node):
|
||||
server_type, config_number = get_server_number(
|
||||
(node['ip'], node['port']), self.ipport2server)
|
||||
|
183
test/probe/test_dark_data.py
Normal file
183
test/probe/test_dark_data.py
Normal file
@ -0,0 +1,183 @@
|
||||
#!/usr/bin/python -u
|
||||
# Copyright (c) 2010-2012 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import unittest
|
||||
|
||||
import os
|
||||
import uuid
|
||||
import shutil
|
||||
|
||||
from datetime import datetime
|
||||
from six.moves.configparser import ConfigParser
|
||||
|
||||
from test.probe.brain import BrainSplitter
|
||||
from test.probe.common import ReplProbeTest
|
||||
from swift.common import manager
|
||||
from swift.common.storage_policy import get_policy_string
|
||||
from swift.common.manager import Manager, Server
|
||||
from swift.common.utils import readconf
|
||||
|
||||
|
||||
CONF_SECTION = 'object-auditor:watcher:swift#dark_data'
|
||||
|
||||
|
||||
class TestDarkDataDeletion(ReplProbeTest):
|
||||
# NB: could be 'quarantine' in another test
|
||||
action = 'delete'
|
||||
|
||||
def setUp(self):
|
||||
"""
|
||||
Reset all environment and start all servers.
|
||||
"""
|
||||
super(TestDarkDataDeletion, self).setUp()
|
||||
|
||||
self.conf_dest = \
|
||||
os.path.join('/tmp/',
|
||||
datetime.now().strftime('swift-%Y-%m-%d_%H-%M-%S-%f'))
|
||||
os.mkdir(self.conf_dest)
|
||||
|
||||
object_server_dir = os.path.join(self.conf_dest, 'object-server')
|
||||
os.mkdir(object_server_dir)
|
||||
|
||||
for conf_file in Server('object-auditor').conf_files():
|
||||
config = readconf(conf_file)
|
||||
if 'object-auditor' not in config:
|
||||
continue # *somebody* should be set up to run the auditor
|
||||
config['object-auditor'].update(
|
||||
{'watchers': 'swift#dark_data'})
|
||||
# Note that this setdefault business may mean the watcher doesn't
|
||||
# pick up DEFAULT values, but that (probably?) won't matter
|
||||
config.setdefault(CONF_SECTION, {}).update(
|
||||
{'action': self.action})
|
||||
|
||||
parser = ConfigParser()
|
||||
for section in ('object-auditor', CONF_SECTION):
|
||||
parser.add_section(section)
|
||||
for option, value in config[section].items():
|
||||
parser.set(section, option, value)
|
||||
|
||||
file_name = os.path.basename(conf_file)
|
||||
if file_name.endswith('.d'):
|
||||
# Work around conf.d setups (like you might see with VSAIO)
|
||||
file_name = file_name[:-2]
|
||||
with open(os.path.join(object_server_dir, file_name), 'w') as fp:
|
||||
parser.write(fp)
|
||||
|
||||
self.container_name = 'container-%s' % uuid.uuid4()
|
||||
self.object_name = 'object-%s' % uuid.uuid4()
|
||||
self.brain = BrainSplitter(self.url, self.token, self.container_name,
|
||||
self.object_name, 'object',
|
||||
policy=self.policy)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.conf_dest)
|
||||
|
||||
def gather_object_files_by_ext(self):
|
||||
result = collections.defaultdict(set)
|
||||
for node in self.brain.nodes:
|
||||
for path, _, files in os.walk(os.path.join(
|
||||
self.device_dir(node),
|
||||
get_policy_string('objects', self.policy))):
|
||||
for file in files:
|
||||
if file in ('.lock', 'hashes.pkl', 'hashes.invalid'):
|
||||
continue
|
||||
_, ext = os.path.splitext(file)
|
||||
result[ext].add(os.path.join(path, file))
|
||||
return result
|
||||
|
||||
def test_dark_data(self):
|
||||
self.brain.put_container()
|
||||
self.brain.put_object()
|
||||
self.brain.stop_handoff_half()
|
||||
self.brain.delete_object()
|
||||
Manager(['object-updater']).once()
|
||||
Manager(['container-replicator']).once()
|
||||
|
||||
# Sanity check:
|
||||
# * all containers are empty
|
||||
# * primaries that are still up have two .ts files
|
||||
# * primary that's down has one .data file
|
||||
for index, (headers, items) in self.direct_get_container(
|
||||
container=self.container_name).items():
|
||||
self.assertEqual(headers['X-Container-Object-Count'], '0')
|
||||
self.assertEqual(items, [])
|
||||
|
||||
files = self.gather_object_files_by_ext()
|
||||
self.assertLengthEqual(files, 2)
|
||||
self.assertLengthEqual(files['.ts'], 2)
|
||||
self.assertLengthEqual(files['.data'], 1)
|
||||
|
||||
# Simulate a reclaim_age passing,
|
||||
# so the tombstones all got cleaned up
|
||||
for file_path in files['.ts']:
|
||||
os.unlink(file_path)
|
||||
|
||||
# Old node gets reintroduced to the cluster
|
||||
self.brain.start_handoff_half()
|
||||
# ...so replication thinks its got some work to do
|
||||
Manager(['object-replicator']).once()
|
||||
|
||||
# Now we're back to *three* .data files
|
||||
files = self.gather_object_files_by_ext()
|
||||
self.assertLengthEqual(files, 1)
|
||||
self.assertLengthEqual(files['.data'], 3)
|
||||
|
||||
# But that's OK, audit watchers to the rescue!
|
||||
old_swift_dir = manager.SWIFT_DIR
|
||||
manager.SWIFT_DIR = self.conf_dest
|
||||
try:
|
||||
Manager(['object-auditor']).once()
|
||||
finally:
|
||||
manager.SWIFT_DIR = old_swift_dir
|
||||
|
||||
# Verify that the policy was applied.
|
||||
self.check_on_disk_files(files['.data'])
|
||||
|
||||
def check_on_disk_files(self, files):
|
||||
for file_path in files:
|
||||
# File's not there
|
||||
self.assertFalse(os.path.exists(file_path))
|
||||
# And it's not quaratined, either!
|
||||
self.assertPathDoesNotExist(os.path.join(
|
||||
file_path[:file_path.index('objects')], 'quarantined'))
|
||||
|
||||
def assertPathExists(self, path):
|
||||
msg = "Expected path %r to exist, but it doesn't" % path
|
||||
self.assertTrue(os.path.exists(path), msg)
|
||||
|
||||
def assertPathDoesNotExist(self, path):
|
||||
msg = "Expected path %r to not exist, but it does" % path
|
||||
self.assertFalse(os.path.exists(path), msg)
|
||||
|
||||
|
||||
class TestDarkDataQuarantining(TestDarkDataDeletion):
|
||||
action = 'quarantine'
|
||||
|
||||
def check_on_disk_files(self, files):
|
||||
for file_path in files:
|
||||
# File's not there
|
||||
self.assertPathDoesNotExist(file_path)
|
||||
# Got quarantined
|
||||
parts = file_path.split(os.path.sep)
|
||||
quarantine_dir = parts[:parts.index('objects')] + ['quarantined']
|
||||
quarantine_path = os.path.sep.join(
|
||||
quarantine_dir + ['objects'] + parts[-2:])
|
||||
self.assertPathExists(quarantine_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
@ -242,11 +242,6 @@ class BaseTestContainerSharding(ReplProbeTest):
|
||||
'\n '.join(result['other']))
|
||||
return result
|
||||
|
||||
def assertLengthEqual(self, obj, length):
|
||||
obj_len = len(obj)
|
||||
self.assertEqual(obj_len, length, 'len(%r) == %d, not %d' % (
|
||||
obj, obj_len, length))
|
||||
|
||||
def assert_dict_contains(self, expected_items, actual_dict):
|
||||
ignored = set(expected_items) ^ set(actual_dict)
|
||||
filtered_actual = {k: actual_dict[k]
|
||||
|
@ -12,32 +12,35 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
|
||||
import unittest
|
||||
import json
|
||||
import mock
|
||||
import os
|
||||
import sys
|
||||
import pkg_resources
|
||||
import signal
|
||||
import time
|
||||
import string
|
||||
import sys
|
||||
import time
|
||||
import xattr
|
||||
from shutil import rmtree
|
||||
from tempfile import mkdtemp
|
||||
import textwrap
|
||||
from os.path import dirname, basename
|
||||
from test.unit import (debug_logger, patch_policies, make_timestamp_iter,
|
||||
DEFAULT_TEST_EC_TYPE, skip_if_no_xattrs)
|
||||
from test.unit import (
|
||||
debug_logger, DEFAULT_TEST_EC_TYPE,
|
||||
make_timestamp_iter, patch_policies, skip_if_no_xattrs)
|
||||
from test.unit.obj.common import write_diskfile
|
||||
from swift.obj import auditor, replicator
|
||||
from swift.obj.watchers.dark_data import DarkDataWatcher
|
||||
from swift.obj.diskfile import (
|
||||
DiskFile, write_metadata, invalidate_hash, get_data_dir,
|
||||
DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status,
|
||||
get_auditor_status, HASH_FILE, HASH_INVALIDATIONS_FILE)
|
||||
from swift.common.utils import (
|
||||
mkdirs, normalize_timestamp, Timestamp, readconf, md5)
|
||||
mkdirs, normalize_timestamp, Timestamp, readconf, md5, PrefixLoggerAdapter)
|
||||
from swift.common.storage_policy import (
|
||||
ECStoragePolicy, StoragePolicy, POLICIES, EC_POLICY)
|
||||
from test.unit.obj.common import write_diskfile
|
||||
|
||||
_mocked_policies = [
|
||||
StoragePolicy(0, 'zero', False),
|
||||
@ -60,8 +63,33 @@ def works_only_once(callable_thing, exception):
|
||||
return only_once
|
||||
|
||||
|
||||
@patch_policies(_mocked_policies)
|
||||
class TestAuditor(unittest.TestCase):
|
||||
def no_audit_watchers(group, name=None):
|
||||
if group == 'swift.object_audit_watcher':
|
||||
return iter([])
|
||||
else:
|
||||
return pkg_resources.iter_entry_points(group, name)
|
||||
|
||||
|
||||
class FakeRing1(object):
|
||||
|
||||
def __init__(self, swift_dir, ring_name=None):
|
||||
return
|
||||
|
||||
def get_nodes(self, *args, **kwargs):
|
||||
x = 1
|
||||
node1 = {'ip': '10.0.0.%s' % x,
|
||||
'replication_ip': '10.0.0.%s' % x,
|
||||
'port': 6200 + x,
|
||||
'replication_port': 6200 + x,
|
||||
'device': 'sda',
|
||||
'zone': x % 3,
|
||||
'region': x % 2,
|
||||
'id': x,
|
||||
'handoff_index': 1}
|
||||
return (1, [node1])
|
||||
|
||||
|
||||
class TestAuditorBase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
skip_if_no_xattrs()
|
||||
@ -113,7 +141,7 @@ class TestAuditor(unittest.TestCase):
|
||||
# diskfiles for policy 0, 1, 2
|
||||
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
|
||||
policy=POLICIES[0])
|
||||
self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c',
|
||||
self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c2',
|
||||
'o', policy=POLICIES[1])
|
||||
self.disk_file_ec = self.ec_df_mgr.get_diskfile(
|
||||
'sda', '0', 'a', 'c', 'o', policy=POLICIES[2], frag_index=1)
|
||||
@ -121,6 +149,10 @@ class TestAuditor(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
|
||||
|
||||
|
||||
@patch_policies(_mocked_policies)
|
||||
class TestAuditor(TestAuditorBase):
|
||||
|
||||
def test_worker_conf_parms(self):
|
||||
def check_common_defaults():
|
||||
self.assertEqual(auditor_worker.max_bytes_per_second, 10000000)
|
||||
@ -1532,5 +1564,163 @@ class TestAuditor(unittest.TestCase):
|
||||
.format(outstanding_pids))
|
||||
|
||||
|
||||
@mock.patch('pkg_resources.iter_entry_points', no_audit_watchers)
|
||||
@patch_policies(_mocked_policies)
|
||||
class TestAuditWatchers(TestAuditorBase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestAuditWatchers, self).setUp()
|
||||
|
||||
timestamp = Timestamp(time.time())
|
||||
|
||||
data = b'0' * 1024
|
||||
etag = md5()
|
||||
with self.disk_file.create() as writer:
|
||||
writer.write(data)
|
||||
etag.update(data)
|
||||
etag = etag.hexdigest()
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': timestamp.internal,
|
||||
'Content-Length': str(len(data)),
|
||||
'X-Object-Meta-Flavor': 'banana',
|
||||
}
|
||||
writer.put(metadata)
|
||||
|
||||
data = b'1' * 2048
|
||||
etag = md5()
|
||||
with self.disk_file_p1.create() as writer:
|
||||
writer.write(data)
|
||||
etag.update(data)
|
||||
etag = etag.hexdigest()
|
||||
metadata = {
|
||||
'ETag': etag,
|
||||
'X-Timestamp': timestamp.internal,
|
||||
'Content-Length': str(len(data)),
|
||||
'X-Object-Meta-Flavor': 'orange',
|
||||
}
|
||||
writer.put(metadata)
|
||||
|
||||
def test_watchers(self):
|
||||
|
||||
calls = []
|
||||
|
||||
class TestWatcher(object):
|
||||
def __init__(self, conf, logger):
|
||||
self._started = False
|
||||
self._ended = False
|
||||
calls.append(["__init__", conf, logger])
|
||||
|
||||
# Make sure the logger is capable of quacking like a logger
|
||||
logger.debug("getting started")
|
||||
|
||||
def start(self, audit_type, **other_kwargs):
|
||||
if self._started:
|
||||
raise Exception("don't call it twice")
|
||||
self._started = True
|
||||
calls.append(['start', audit_type])
|
||||
|
||||
def see_object(self, object_metadata,
|
||||
data_file_path, **other_kwargs):
|
||||
calls.append(['see_object', object_metadata,
|
||||
data_file_path, other_kwargs])
|
||||
|
||||
def end(self, **other_kwargs):
|
||||
if self._ended:
|
||||
raise Exception("don't call it twice")
|
||||
self._ended = True
|
||||
calls.append(['end'])
|
||||
|
||||
conf = self.conf.copy()
|
||||
conf['watchers'] = 'test_watcher1'
|
||||
conf['__file__'] = '/etc/swift/swift.conf'
|
||||
ret_config = {'swift#dark_data': {'action': 'log'}}
|
||||
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
|
||||
return_value=ret_config), \
|
||||
mock.patch('swift.obj.auditor.load_pkg_resource',
|
||||
side_effect=[TestWatcher]) as mock_load, \
|
||||
mock.patch('swift.obj.auditor.get_logger',
|
||||
lambda *a, **kw: self.logger):
|
||||
my_auditor = auditor.ObjectAuditor(conf)
|
||||
|
||||
self.assertEqual(mock_load.mock_calls, [
|
||||
mock.call('swift.object_audit_watcher', 'test_watcher1'),
|
||||
])
|
||||
|
||||
my_auditor.run_audit(mode='once', zero_byte_fps=float("inf"))
|
||||
|
||||
self.assertEqual(len(calls), 5)
|
||||
|
||||
self.assertEqual(calls[0], ["__init__", conf, mock.ANY])
|
||||
self.assertIsInstance(calls[0][2], PrefixLoggerAdapter)
|
||||
self.assertIs(calls[0][2].logger, self.logger)
|
||||
|
||||
self.assertEqual(calls[1], ["start", "ZBF"])
|
||||
|
||||
self.assertEqual(calls[2][0], "see_object")
|
||||
self.assertEqual(calls[3][0], "see_object")
|
||||
|
||||
# The order in which the auditor finds things on the filesystem is
|
||||
# irrelevant; what matters is that it finds all the things.
|
||||
calls[2:4] = sorted(calls[2:4], key=lambda item: item[1]['name'])
|
||||
|
||||
self.assertDictContainsSubset({'name': '/a/c/o',
|
||||
'X-Object-Meta-Flavor': 'banana'},
|
||||
calls[2][1])
|
||||
self.assertIn('node/sda/objects/0/', calls[2][2]) # data_file_path
|
||||
self.assertTrue(calls[2][2].endswith('.data')) # data_file_path
|
||||
self.assertEqual({}, calls[2][3])
|
||||
|
||||
self.assertDictContainsSubset({'name': '/a/c2/o',
|
||||
'X-Object-Meta-Flavor': 'orange'},
|
||||
calls[3][1])
|
||||
self.assertIn('node/sda/objects-1/0/', calls[3][2]) # data_file_path
|
||||
self.assertTrue(calls[3][2].endswith('.data')) # data_file_path
|
||||
self.assertEqual({}, calls[3][3])
|
||||
|
||||
self.assertEqual(calls[4], ["end"])
|
||||
|
||||
log_lines = self.logger.get_lines_for_level('debug')
|
||||
self.assertIn(
|
||||
"[audit-watcher test_watcher1] getting started",
|
||||
log_lines)
|
||||
|
||||
def test_builtin_watchers(self):
|
||||
|
||||
conf = self.conf.copy()
|
||||
conf['watchers'] = 'test_watcher1'
|
||||
conf['__file__'] = '/etc/swift/swift.conf'
|
||||
ret_config = {'swift#dark_data': {'action': 'log'}}
|
||||
with mock.patch('swift.obj.auditor.parse_prefixed_conf',
|
||||
return_value=ret_config), \
|
||||
mock.patch('swift.obj.auditor.load_pkg_resource',
|
||||
side_effect=[DarkDataWatcher]):
|
||||
my_auditor = auditor.ObjectAuditor(conf, logger=self.logger)
|
||||
|
||||
def fake_direct_get_container(node, part, account, container,
|
||||
prefix=None, limit=None):
|
||||
self.assertEqual(part, 1)
|
||||
self.assertEqual(limit, 1)
|
||||
# The returned entry is not abbreviated, but is full of nonsese.
|
||||
entry = {'bytes': 30968411,
|
||||
'hash': '60303f4122966fe5925f045eb52d1129',
|
||||
'name': '%s' % prefix,
|
||||
'content_type': 'video/mp4',
|
||||
'last_modified': '2017-08-15T03:30:57.693210'}
|
||||
return {}, [entry]
|
||||
|
||||
with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1), \
|
||||
mock.patch("swift.obj.watchers.dark_data.direct_get_container",
|
||||
fake_direct_get_container):
|
||||
my_auditor.run_audit(mode='once')
|
||||
|
||||
# N.B. We want to check for ok files instead of dark because
|
||||
# if anything goes wrong inside, we want it fail the test.
|
||||
log_lines = self.logger.get_lines_for_level('info')
|
||||
self.assertIn(
|
||||
'[audit-watcher test_watcher1] total unknown 0 ok 2 dark 0',
|
||||
log_lines)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
x
Reference in New Issue
Block a user