diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..aee4a2a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,27 @@ +# Copyright 2019 Red Hat, Inc. +# +# This module is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software. If not, see . + +FROM opendevorg/python-builder as builder + +COPY . /tmp/src +RUN assemble + +FROM opendevorg/python-base as zuul-registry + +COPY --from=builder /output/ /output +RUN /output/install-from-bindep + +VOLUME /storage +CMD ["/usr/local/bin/zuul-registry -c /conf/registry.conf serve"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..25bc4e3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +pbr>=1.1.0 + +PyYAML +cherrypy +routes +requests +openstacksdk diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..e1ae57d --- /dev/null +++ b/setup.cfg @@ -0,0 +1,34 @@ +[metadata] +name = zuul-registry +summary = A speculative container image registry for Zuul +description-file = + README.rst +author = Zuul Team +author-email = zuul-discuss@lists.zuul-ci.org +home-page = https://zuul-ci.org/ +python-requires = >=3.5 +classifier = + Intended Audience :: Information Technology + Intended Audience :: System Administrators + License :: OSI Approved :: GNU General Public License v3 (GPLv3) + Operating System :: POSIX :: Linux + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.5 + +[pbr] +warnerrors = True + +[global] +setup_hooks = + zuul_registry._setup_hook.setup_hook + +[entry_points] +console_scripts = + zuul-registry = zuul_registry.main:main + +[build_sphinx] +source-dir = doc/source +build-dir = doc/build +all_files = 1 +warning-is-error = 1 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..9ccd253 --- /dev/null +++ b/setup.py @@ -0,0 +1,20 @@ +# Copyright 2019 Red Hat, Inc. +# +# This module is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software. If not, see . + +import setuptools + +setuptools.setup( + setup_requires=['pbr'], + pbr=True) diff --git a/tools/conf.yaml b/tools/conf.yaml new file mode 100644 index 0000000..b334253 --- /dev/null +++ b/tools/conf.yaml @@ -0,0 +1,23 @@ +registry: + address: '127.0.0.1' + port: 9000 + tls-cert: /path/to/cert.pem + tls-key: /path/to/cert.key + #shadow: + # docker.io: + # hostname: local-docker-mirror.example.com + # port: 443 + users: + - name: testuser + pass: testpass + access: write + - name: anonymous + pass: '' + access: read + xstorage: + driver: swift + cloud: registry + container: test_registry + storage: + driver: filesystem + root: /tmp/storage diff --git a/tools/delete_container.py b/tools/delete_container.py new file mode 100644 index 0000000..e89d578 --- /dev/null +++ b/tools/delete_container.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# +# Copyright 2019 Red Hat, Inc. +# +# This module is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software. If not, see . + +import argparse +import openstack +import requests +import logging +import os + +logging.basicConfig(level=logging.INFO) +# logging.getLogger("requests").setLevel(logging.DEBUG) +# logging.getLogger("keystoneauth").setLevel(logging.INFO) +# logging.getLogger("stevedore").setLevel(logging.INFO) +logging.captureWarnings(True) + + +def main(): + parser = argparse.ArgumentParser( + description="Delete a swift container" + ) + parser.add_argument('cloud', + help='Name of the cloud to use when uploading') + parser.add_argument('container', + help='Name of the container to use when uploading') + + args = parser.parse_args() + + cloud = openstack.connect(cloud=args.cloud) + + sess = cloud.config.get_session() + adapter = requests.adapters.HTTPAdapter(pool_maxsize=100) + sess.mount('https://', adapter) + + container = cloud.get_container(args.container) + print('Found container', container) + print() + for x in cloud.object_store.objects(args.container): + print('Delete object', x.name) + if x.name == '/': + endpoint = cloud.object_store.get_endpoint() + container = os.path.join(endpoint, args.container) + cloud.session.delete(container + '//') + else: + cloud.object_store.delete_object(x) + + print() + print('Delete container', container) + cloud.object_store.delete_container(args.container) + + +if __name__ == "__main__": + main() diff --git a/tools/test.sh b/tools/test.sh new file mode 100755 index 0000000..cbe9a2d --- /dev/null +++ b/tools/test.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Copyright 2019 Red Hat, Inc. +# +# This module is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software. If not, see . + +rm -fr /tmp/storage +docker rmi localhost:9000/zuul/registry +docker image prune -f +docker load . + +import os +import time +import storageutils + +class FilesystemDriver(storageutils.StorageDriver): + def __init__(self, conf): + self.root = conf['root'] + + def list_objects(self, path): + now = time.time() + path = os.path.join(self.root, path) + if not os.path.isdir(path): + return [] + ret = [] + for f in os.listdir(path): + obj_path = os.path.join(path, f) + ret.append(storageutils.ObjectInfo( + obj_path, f, os.stat(obj_path).st_ctime, + os.path.isdir(obj_path))) + return ret + + def get_object_size(self, path): + path = os.path.join(self.root, path) + if not os.path.exists(path): + return None + return os.stat(path).st_size + + def put_object(self, path, data): + path = os.path.join(self.root, path) + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, 'wb') as f: + if isinstance(data, bytes): + f.write(data) + else: + for chunk in data: + f.write(chunk) + + def get_object(self, path): + path = os.path.join(self.root, path) + if not os.path.exists(path): + return None + with open(path, 'rb') as f: + return f.read() + + def delete_object(self, path): + path = os.path.join(self.root, path) + if os.path.exists(path): + if os.path.isdir(path): + os.rmdir(path) + else: + os.unlink(path) + + def move_object(self, src_path, dst_path): + src_path = os.path.join(self.root, src_path) + dst_path = os.path.join(self.root, dst_path) + os.makedirs(os.path.dirname(dst_path), exist_ok=True) + os.rename(src_path, dst_path) + + def cat_objects(self, path, chunks): + path = os.path.join(self.root, path) + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, 'wb') as outf: + for chunk_path in chunks: + chunk_path = os.path.join(self.root, chunk_path) + with open(chunk_path, 'rb') as inf: + while True: + d = inf.read(4096) + if not d: + break + outf.write(d) + for chunk_path in chunks: + chunk_path = os.path.join(self.root, chunk_path) + os.unlink(chunk_path) + +Driver = FilesystemDriver diff --git a/zuul_registry/main.py b/zuul_registry/main.py new file mode 100644 index 0000000..dbf40af --- /dev/null +++ b/zuul_registry/main.py @@ -0,0 +1,334 @@ +# Copyright 2019 Red Hat, Inc. +# +# This module is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software. If not, see . + +import argparse +import sys +import logging +import cherrypy +import filesystem +import storage +import swift +import hashlib +import json +import pprint +import urllib +import yaml + +DRIVERS = { + 'filesystem': filesystem.Driver, + 'swift': swift.Driver, +} + +class Authorization: + def __init__(self, users): + self.ro = {} + self.rw = {} + + for user in users: + if user['access'] == 'write': + self.rw[user['name']] = user['pass'] + self.ro[user['name']] = user['pass'] + + def require_write(self, realm, user, password): + return self.check(self.rw, user, password) + + def require_read(self, realm, user, password): + return self.check(self.ro, user, password) + + def check(self, store, user, password): + if user not in store: + return False + return store[user] == password + +class RegistryAPI: + """Registry API server. + + Implements the container registry protocol as documented in + https://docs.docker.com/registry/spec/api/ + """ + log = logging.getLogger("registry.api") + + def __init__(self, store, authz): + self.storage = store + self.authz = authz + self.shadow = None + + # These are used in a decorator; they dispatch to the + # Authorization method of the same name. The eventual deferenced + # object is the instance of this class. + def require_write(*args): + return cherrypy.request.app.root.authz.require_write(*args) + + def require_read(*args): + return cherrypy.request.app.root.authz.require_read(*args) + + def get_namespace(self): + if not self.shadow: + return '_local' + return cherrypy.request.headers['Host'] + + def not_found(self): + if not self.shadow: + raise cherrypy.HTTPError(404) + # TODO: Proxy the request (this is where we implement the + # buildset registry functionality). + host = cherrypy.request.headers['Host'] + method = cherrypy.request.method + path = cherrypy.request.path_info + url = self.shadow.get(host) + if not url: + raise cherrypy.HTTPError(404) + url = urllib.parse.urljoin(url, path) + self.log.debug("Proxy request %s %s", method, url) + + @cherrypy.expose + @cherrypy.tools.json_out(content_type='application/json; charset=utf-8') + @cherrypy.config(**{'tools.auth_basic.checkpassword': require_read}) + def version_check(self): + self.log.info('Version check') + return {'version': '1.0'} + res = cherrypy.response + res.headers['Distribution-API-Version'] = 'registry/2.0' + + @cherrypy.expose + @cherrypy.config(**{'tools.auth_basic.checkpassword': require_read}) + def get_blob(self, repository, digest): + namespace = self.get_namespace() + method = cherrypy.request.method + self.log.info('%s blob %s %s', method, repository, digest) + size = self.storage.blob_size(namespace, digest) + if size is None: + return self.not_found() + res = cherrypy.response + res.headers['Docker-Content-Digest'] = digest + if method != 'HEAD': + data = self.storage.get_blob(namespace, digest) + return data + return {} + + @cherrypy.expose + @cherrypy.config(**{'tools.auth_basic.checkpassword': require_write}) + def start_upload(self, repository, digest=None): + namespace = self.get_namespace() + method = cherrypy.request.method + uuid = self.storage.start_upload(namespace) + self.log.info('Start upload %s %s uuid %s digest %s', + method, repository, uuid, digest) + res = cherrypy.response + res.headers['Location'] = '/v2/%s/blobs/uploads/%s' % (repository, uuid) + res.headers['Docker-Upload-UUID'] = uuid + res.headers['Range'] = '0-0' + res.status = '202 Accepted' + + @cherrypy.expose + @cherrypy.config(**{'tools.auth_basic.checkpassword': require_write}) + def upload_chunk(self, repository, uuid): + self.log.info('Upload chunk %s %s', repository, uuid) + namespace = self.get_namespace() + old_length, new_length = self.storage.upload_chunk( + namespace, uuid, cherrypy.request.body) + res = cherrypy.response + res.headers['Location'] = '/v2/%s/blobs/uploads/%s' % (repository, uuid) + res.headers['Docker-Upload-UUID'] = uuid + res.headers['Range'] = '0-%s' % (new_length,) + res.status = '204 No Content' + self.log.info('Finish Upload chunk %s %s %s', repository, uuid, new_length) + + @cherrypy.expose + @cherrypy.config(**{'tools.auth_basic.checkpassword': require_write}) + def finish_upload(self, repository, uuid, digest): + self.log.info('Finish upload %s %s', repository, uuid) + namespace = self.get_namespace() + old_length, new_length = self.storage.upload_chunk( + namespace, uuid, cherrypy.request.body) + self.storage.store_upload(namespace, uuid, digest) + res = cherrypy.response + res.headers['Location'] = '/v2/%s/blobs/%s' % (repository, digest) + res.headers['Docker-Content-Digest'] = digest + res.headers['Content-Range'] = '%s-%s' % (old_length, new_length) + res.status = '204 No Content' + + @cherrypy.expose + @cherrypy.config(**{'tools.auth_basic.checkpassword': require_write}) + def put_manifest(self, repository, ref): + namespace = self.get_namespace() + body = cherrypy.request.body.read() + hasher = hashlib.sha256() + hasher.update(body) + digest = 'sha256:' + hasher.hexdigest() + self.log.info('Put manifest %s %s digest %s', repository, ref, digest) + self.storage.put_blob(namespace, digest, body) + manifest = self.storage.get_manifest(namespace, repository, ref) + if manifest is None: + manifest = {} + else: + manifest = json.loads(manifest) + manifest[cherrypy.request.headers['Content-Type']] = digest + self.storage.put_manifest( + namespace, repository, ref, json.dumps(manifest).encode('utf8')) + res = cherrypy.response + res.headers['Location'] = '/v2/%s/manifests/%s' % (repository, ref) + res.headers['Docker-Content-Digest'] = digest + res.status = '201 Created' + + @cherrypy.expose + @cherrypy.config(**{'tools.auth_basic.checkpassword': require_read}) + def get_manifest(self, repository, ref): + namespace = self.get_namespace() + headers = cherrypy.request.headers + res = cherrypy.response + self.log.info('Get manifest %s %s', repository, ref) + if ref.startswith('sha256:'): + manifest = self.storage.get_blob(namespace, ref) + if manifest is None: + self.log.error('Manifest %s %s not found', repository, ref) + return self.not_found() + res.headers['Content-Type'] = json.loads(manifest)['mediaType'] + res.headers['Docker-Content-Digest'] = ref + return manifest + manifest = self.storage.get_manifest(namespace, repository, ref) + if manifest is None: + manifest = {} + else: + manifest = json.loads(manifest) + for ct in [x.strip() for x in headers['Accept'].split(',')]: + if ct in manifest: + self.log.debug('Manifest %s %s digest found %s', + repository, ref, manifest[ct]) + data = self.storage.get_blob(namespace, manifest[ct]) + res.headers['Content-Type'] = ct + res.headers['Docker-Content-Digest'] = manifest[ct] + hasher = hashlib.sha256() + hasher.update(data) + self.log.debug('Retrieved sha256 %s', hasher.hexdigest()) + return data + self.log.error('Manifest %s %s not found', repository, ref) + return self.not_found() + +class RegistryServer: + log = logging.getLogger("registry.server") + def __init__(self, config_path): + self.log.info("Loading config from %s", config_path) + self._load_config(config_path) + + # TODO: pyopenssl? + cherrypy.server.ssl_module = 'builtin' + cherrypy.server.ssl_certificate = self.conf['tls-cert'] + cherrypy.server.ssl_private_key = self.conf['tls-key'] + + driver = self.conf['storage']['driver'] + backend = DRIVERS[driver](self.conf['storage']) + self.store = storage.Storage(backend, self.conf['storage']) + + authz = Authorization(self.conf['users']) + + route_map = cherrypy.dispatch.RoutesDispatcher() + api = RegistryAPI(self.store, authz) + route_map.connect('api', '/v2/', + controller=api, action='version_check') + route_map.connect('api', '/v2/{repository:.*}/blobs/uploads/', + controller=api, action='start_upload') + route_map.connect('api', '/v2/{repository:.*}/blobs/uploads/{uuid}', + conditions=dict(method=['PATCH']), + controller=api, action='upload_chunk') + route_map.connect('api', '/v2/{repository:.*}/blobs/uploads/{uuid}', + conditions=dict(method=['PUT']), + controller=api, action='finish_upload') + route_map.connect('api', '/v2/{repository:.*}/manifests/{ref}', + conditions=dict(method=['PUT']), + controller=api, action='put_manifest') + route_map.connect('api', '/v2/{repository:.*}/manifests/{ref}', + conditions=dict(method=['GET']), + controller=api, action='get_manifest') + route_map.connect('api', '/v2/{repository:.*}/blobs/{digest}', + controller=api, action='get_blob') + + conf = { + '/': { + 'request.dispatch': route_map + } + } + cherrypy.config.update({ + 'global': { + 'environment': 'production', + 'server.socket_host': self.conf['address'], + 'server.socket_port': self.conf['port'], + 'tools.auth_basic.on': True, + 'tools.auth_basic.realm': 'Registry', + 'tools.auth_basic.accept_charset': 'UTF-8', + }, + }) + + cherrypy.tree.mount(api, '/', config=conf) + + def _load_config(self, path): + with open(path) as f: + conf = yaml.safe_load(f.read()) + self.conf = conf['registry'] + + @property + def port(self): + return cherrypy.server.bound_addr[1] + + def start(self): + self.log.info("Registry starting") + cherrypy.engine.start() + + def stop(self): + self.log.info("Registry stopping") + cherrypy.engine.exit() + # Not strictly necessary, but without this, if the server is + # started again (e.g., in the unit tests) it will reuse the + # same host/port settings. + cherrypy.server.httpserver = None + + def prune(self): + self.store.prune() + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description='Zuul registry server') + parser.add_argument('-c', dest='config', + help='Config file path', + default='/conf/registry.conf') + parser.add_argument('-d', dest='debug', + help='Debug log level', + action='store_true') + parser.add_argument('command', + nargs='?', + help='Command: serve, prune', + default='serve') + args = parser.parse_args() + if args.debug: + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + cherrypy.log.access_log.propagate = False + logging.getLogger("requests").setLevel(logging.DEBUG) + logging.getLogger("keystoneauth").setLevel(logging.ERROR) + logging.getLogger("urllib3").setLevel(logging.DEBUG) + logging.getLogger("stevedore").setLevel(logging.INFO) + logging.getLogger("openstack").setLevel(logging.DEBUG) + #cherrypy.log.error_log.propagate = False + + s = RegistryServer(args.config) + if args.command == 'serve': + s.start() + cherrypy.engine.block() + elif args.command == 'prune': + s.prune() + else: + print("Unknown command: %s", args.command) + sys.exit(1) diff --git a/zuul_registry/storage.py b/zuul_registry/storage.py new file mode 100644 index 0000000..8fe574f --- /dev/null +++ b/zuul_registry/storage.py @@ -0,0 +1,298 @@ +# Copyright 2019 Red Hat, Inc. +# +# This module is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software. If not, see . + +import base64 +import json +import logging +import os +import queue +import rehash +import threading +import time +from uuid import uuid4 + +class UploadRecord: + """Information about an upload. + + This class holds information about an upload in progress. It is + designed to be serialized into object storage and stored along + with the data of the upload so that as each new chunk is uploaded, + this is updated. + + The registry protocol guarantees that upload chunks are + sequential, so this does not need to be locked for use by multiple + writers. + + The most important part of this (that which could not otherwise be + constructed from a simple object listing) is the resumable hash of + the contents. We need to calculate the hash of the complete + upload, but we would like to support chunks being written by + different writers (for example, in a simple round-robin load + balancer). If we store the state of the hash algorithm after each + chunk is uploaded, we can avoid having to download the entire data + again at the end merely to calculate the hash. + """ + + def __init__(self): + self.chunks = [] + self.hasher = rehash.sha256() + + @property + def count(self): + return len(self.chunks) + + @property + def size(self): + return sum([x['size'] for x in self.chunks]) + + @property + def digest(self): + return 'sha256:' + self.hasher.hexdigest() + + def load(self, data): + data = json.loads(data.decode('utf8')) + self.chunks = data['chunks'] + hash_state = data['hash_state'] + hash_state['md_data'] = base64.decodebytes(hash_state['md_data'].encode('ascii')) + self.hasher.__setstate__(hash_state) + + def dump(self): + hash_state = self.hasher.__getstate__() + hash_state['md_data'] = base64.encodebytes(hash_state['md_data']).decode('ascii') + data = dict(chunks = self.chunks, + hash_state = hash_state) + return json.dumps(data).encode('utf8') + +class UploadStreamer: + """Stream an upload to the object storage. + + This returns data from an internal buffer as a generator. Pass + this to the `put_object` method to supply streaming data to it in + one thread, while another adds data to the buffer using the + `write` method. + """ + def __init__(self): + self.queue = queue.Queue() + + def write(self, data): + self.queue.put(data) + + def __iter__(self): + while True: + d = self.queue.get() + if d is None: + break + yield d + +class Storage: + """Storage abstraction layer. + + This class abstracts different storage backends, providing a + convenience API to the registry. + + Most of these methods take a namespace argument. The namespace + is, essentially, an entire registry isolated from the other + namespaces. They may even have duplicate object data. This + allows us to support serving multiple registries from the same + process (without confusing the contents of them). + + """ + + # Clients have 1 hour to complete an upload before we start + # deleting stale objects. + upload_exp = 60 * 60 + log = logging.getLogger('registry.storage') + + def __init__(self, backend, conf): + self.backend = backend + if 'expiration' in conf: + self.manifest_exp = conf['expiration'] + else: + self.manifest_exp = None + + def blob_size(self, namespace, digest): + path = os.path.join(namespace, 'blobs', digest, 'data') + return self.backend.get_object_size(path) + + def put_blob(self, namespace, digest, data): + path = os.path.join(namespace, 'blobs', digest, 'data') + return self.backend.put_object(path, data) + + def get_blob(self, namespace, digest): + path = os.path.join(namespace, 'blobs', digest, 'data') + return self.backend.get_object(path) + + def start_upload(self, namespace): + """Start an upload. + + Create an empty UploadRecord and store it. Later methods will + add to it. The uuid attribute of the UploadRecord uniquely + identifies the upload. + + Uploads have one or more chunks. See `upload_chunk`. + + """ + uuid = uuid4().hex + path = os.path.join(namespace, 'uploads', uuid, 'metadata') + upload = UploadRecord() + self._update_upload(namespace, uuid, upload) + return uuid + + def _get_upload(self, namespace, uuid): + path = os.path.join(namespace, 'uploads', uuid, 'metadata') + data = self.backend.get_object(path) + upload = UploadRecord() + upload.load(data) + return upload + + def _update_upload(self, namespace, uuid, upload): + path = os.path.join(namespace, 'uploads', uuid, 'metadata') + self.backend.put_object(path, upload.dump()) + + def upload_chunk(self, namespace, uuid, fp): + """Add a chunk to an upload. + + Uploads contain one or more chunk of data which are ultimately + concatenated into one blob. + + This streams the data from `fp` and writes it into the + registry. + + :arg namespace str: The registry namespace. + :arg uuid str: The UUID of the upload. + :arg file fp: An open file pointer to the source data. + + """ + + upload = self._get_upload(namespace, uuid) + path = os.path.join(namespace, 'uploads', uuid, str(upload.count + 1)) + streamer = UploadStreamer() + t = threading.Thread(target=self.backend.put_object, + args=(path, streamer)) + t.start() + size = 0 + while True: + try: + d = fp.read(4096) + except ValueError: + # We get this on an empty body + d = b'' + if not d: + break + upload.hasher.update(d) + size += len(d) + streamer.write(d) + streamer.write(None) + t.join() + upload.chunks.append(dict(size=size)) + self._update_upload(namespace, uuid, upload) + return upload.size-size, upload.size + + def store_upload(self, namespace, uuid, digest): + """Complete an upload. + + Verify the supplied digest matches the uploaded data, and if + so, stores the uploaded data as a blob in the registry. Until + this is called, the upload is incomplete and the data blob is + not addressible. + """ + upload = self._get_upload(namespace, uuid) + if digest != upload.digest: + raise Exception('Digest does not match %s %s' % + (digest, upload.digest)) + # Move the chunks into the blob dir to get them out of the + # uploads dir. + chunks = [] + for i in range(1, upload.count+1): + src_path = os.path.join(namespace, 'uploads', uuid, str(i)) + dst_path = os.path.join(namespace, 'blobs', digest, str(i)) + chunks.append(dst_path) + self.backend.move_object(src_path, dst_path) + # Concatenate the chunks into one blob. + path = os.path.join(namespace, 'blobs', digest, 'data') + self.backend.cat_objects(path, chunks) + path = os.path.join(namespace, 'uploads', uuid, 'metadata') + self.backend.delete_object(path) + + def put_manifest(self, namespace, repo, tag, data): + path = os.path.join(namespace, 'repos', repo, 'manifests', tag) + self.backend.put_object(path, data) + + def get_manifest(self, namespace, repo, tag): + path = os.path.join(namespace, 'repos', repo, 'manifests', tag) + return self.backend.get_object(path) + + def prune(self): + """Prune the registry + + Prune all namespaces in the registry according to configured + expiration times. + """ + now = time.time() + upload_target = now - self.upload_exp + if self.manifest_exp: + manifest_target = now - self.manifest_exp + else: + manifest_target = None + for namespace in self.backend.list_objects(''): + uploadpath = os.path.join(namespace.path, 'uploads/') + for upload in self.backend.list_objects(uploadpath): + self._prune(upload, upload_target) + if not manifest_target: + continue + repopath = os.path.join(namespace.path, 'repos/') + for repo in self.backend.list_objects(repopath): + kept_manifests = self._prune(repo, manifest_target) + # mark/sweep manifest blobs + layers = set() + for manifest in kept_manifests: + if manifest.isdir: + continue + layers.update(self._get_layers_from_manifest( + namespace.name, manifest.path)) + blobpath = os.path.join(namespace.path, 'blobs/') + for blob in self.backend.list_objects(blobpath): + if blob.name not in layers: + self._prune(blob, upload_target) + + def _get_layers_from_manifest(self, namespace, path): + self.log.debug('Get layers %s', path) + data = self.backend.get_object(path) + manifest = json.loads(data) + target = manifest.get('application/vnd.docker.distribution.manifest.v2+json') + layers = [] + if not target: + self.log.debug('Unknown manifest %s', path) + return layers + layers.append(target) + data = self.get_blob(namespace, target) + manifest = json.loads(data) + layers.append(manifest['config']['digest']) + for layer in manifest['layers']: + layers.append(layer['digest']) + return layers + + def _prune(self, root_obj, target): + kept = [] + if root_obj.isdir: + for obj in self.backend.list_objects(root_obj.path): + kept.extend(self._prune(obj, target)) + if not kept and root_obj.ctime < target: + self.log.debug('Prune %s', root_obj.path) + self.backend.delete_object(root_obj.path) + else: + self.log.debug('Keep %s', root_obj.path) + kept.append(root_obj) + return kept diff --git a/zuul_registry/storageutils.py b/zuul_registry/storageutils.py new file mode 100644 index 0000000..46d6704 --- /dev/null +++ b/zuul_registry/storageutils.py @@ -0,0 +1,123 @@ +# Copyright 2019 Red Hat, Inc. +# +# This module is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software. If not, see . + +from abc import ABCMeta, abstractmethod + +class ObjectInfo: + def __init__(self, path, name, ctime, isdir): + self.path = path + self.name = name + self.ctime = ctime + self.isdir = isdir + +class StorageDriver(metaclass=ABCMeta): + """Base class for storage drivers. + + Storage drivers should implement all of the methods in this class. + This is a low-level API with no knowledge of the intended use as + an image registry. This makes it easy to add backend drivers + since the storage abstraction layer is designed to deal with the + lowest common denominator. + """ + + @abstractmethod + def __init__(self, conf): + """Initialize a driver. + + :arg dict conf: The 'storage' section from the config file. + """ + pass + + @abstractmethod + def list_objects(self, path): + """List objects at path. + + Returns a list of objects rooted at `path`, one level deep. + + :arg str path: The object path. + + :returns: A list of ObjectInfo objects, one for each object. + :rtype: ObjectInfo + """ + pass + + @abstractmethod + def get_object_size(self, path): + """Return the size of object at path. + + :arg str path: The object path. + + :returns: The size of the object in bytes. + :rtype: int + """ + pass + + @abstractmethod + def put_object(self, path, data): + """Store an object. + + Store the contents of `data` at `path`. The `data` parameter + may be a bytearray or a generator which produces bytearrays. + + :arg str path: The object path. + :arg bytearray data: The data to store. + """ + pass + + @abstractmethod + def get_object(self, path): + """Retrieve an object. + + Return the contents of the object at `path`. + + :arg str path: The object path. + + :returns: The contents of the object. + :rtype: bytearray + """ + pass + + @abstractmethod + def delete_object(self, path): + """Delete an object. + + Delete the object stored at `path`. + + :arg str path: The object path. + """ + pass + + @abstractmethod + def move_object(self, src_path, dst_path): + """Move an object. + + Move the object from `src_path` to `dst_path`. + + :arg str src_path: The original path. + :arg str dst_path: The new path. + """ + pass + + @abstractmethod + def cat_objects(self, path, chunks): + """Concatenate objects. + + Concatenate one or more objects to create a new object. + The original objects are deleted. + + :arg str path: The new path. + :arg list chunks: A list of paths of objects to concatenate. + """ + pass diff --git a/zuul_registry/swift.py b/zuul_registry/swift.py new file mode 100644 index 0000000..7523df3 --- /dev/null +++ b/zuul_registry/swift.py @@ -0,0 +1,154 @@ +# Copyright 2019 Red Hat, Inc. +# +# This module is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software. If not, see . + +import logging +import openstack +import os +import keystoneauth1 +import tempfile +import time +import json + +import dateutil.parser + +import storageutils + +POST_ATTEMPTS = 3 + +def retry_function(func): + for attempt in range(1, POST_ATTEMPTS + 1): + try: + return func() + except keystoneauth1.exceptions.http.NotFound: + raise + except Exception: + if attempt >= POST_ATTEMPTS: + raise + else: + logging.exception("Error on attempt %d" % attempt) + time.sleep(attempt * 10) + +class SwiftDriver(storageutils.StorageDriver): + log = logging.getLogger('registry.swift') + + def __init__(self, conf): + self.cloud_name = conf['cloud'] + self.container_name = conf['container'] + self.conn = openstack.connect(cloud=self.cloud_name) + container = retry_function( + lambda: self.conn.get_container(self.container_name)) + if not container: + self.log.info("Creating container %s", self.container_name) + retry_function( + lambda: self.conn.create_container( + name=self.container_name, public=False)) + endpoint = self.conn.object_store.get_endpoint() + self.url = os.path.join(endpoint, self.container_name) + + def get_url(self, path): + return os.path.join(self.url, path) + + def list_objects(self, path): + self.log.debug("List objects %s", path) + url = self.get_url('') + '?prefix=%s&delimiter=/&format=json' % (path,) + ret = retry_function( + lambda: self.conn.session.get(url).content.decode('utf8')) + data = json.loads(ret) + ret = [] + for obj in data: + if 'subdir' in obj: + objpath = obj['subdir'] + name = obj['subdir'].split('/')[-2] + ctime = time.time() + isdir = True + else: + objpath = obj['name'] + name = obj['name'].split('/')[-1] + ctime = dateutil.parser.parse(obj['last_modified']+'Z').timestamp() + isdir = False + ret.append(storageutils.ObjectInfo( + objpath, name, ctime, isdir)) + return ret + + def get_object_size(self, path): + try: + ret = retry_function( + lambda: self.conn.session.head(self.get_url(path))) + except keystoneauth1.exceptions.http.NotFound: + return None + return ret.headers['Content-Length'] + + def put_object(self, path, data): + name = None + try: + with tempfile.NamedTemporaryFile('wb', delete=False) as f: + name = f.name + if isinstance(data, bytes): + f.write(data) + else: + for chunk in data: + f.write(chunk) + retry_function( + lambda: self.conn.object_store.upload_object( + self.container_name, + path, + filename=name)) + finally: + if name: + os.unlink(name) + + def get_object(self, path): + try: + ret = retry_function( + lambda: self.conn.session.get(self.get_url(path))) + except keystoneauth1.exceptions.http.NotFound: + return None + return ret.content + + def delete_object(self, path): + retry_function( + lambda: self.conn.session.delete( + self.get_url(path))) + + def move_object(self, src_path, dst_path): + dst = os.path.join(self.container_name, dst_path) + retry_function( + lambda: self.conn.session.request( + self.get_url(src_path)+"?multipart-manfest=get", + 'COPY', + headers={'Destination': dst} + )) + retry_function( + lambda: self.conn.session.delete( + self.get_url(src_path))) + + def cat_objects(self, path, chunks): + manifest = [] + #TODO: Would it be better to move 1-chunk objects? + for chunk_path in chunks: + ret = retry_function( + lambda: self.conn.session.head(self.get_url(chunk_path))) + if int(ret.headers['Content-Length']) == 0: + continue + manifest.append({'path': + os.path.join(self.container_name, chunk_path), + 'etag': ret.headers['Etag'], + 'size_bytes': ret.headers['Content-Length']}) + retry_function(lambda: + self.conn.session.put( + self.get_url(path)+"?multipart-manifest=put", + data=json.dumps(manifest))) + +Driver = SwiftDriver