Initial implementation

This provides a basic registry implementation.

See http://lists.zuul-ci.org/pipermail/zuul-discuss/2019-September/001009.html
for more information.

Change-Id: Ic0807edf7c9e3e6d6d42f070e7efc44b425ff372
This commit is contained in:
James E. Blair 2019-09-23 09:35:16 -04:00
parent 3626d05e99
commit a83eba28f9
12 changed files with 1200 additions and 0 deletions

27
Dockerfile Normal file
View File

@ -0,0 +1,27 @@
# Copyright 2019 Red Hat, Inc.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
FROM opendevorg/python-builder as builder
COPY . /tmp/src
RUN assemble
FROM opendevorg/python-base as zuul-registry
COPY --from=builder /output/ /output
RUN /output/install-from-bindep
VOLUME /storage
CMD ["/usr/local/bin/zuul-registry -c /conf/registry.conf serve"]

7
requirements.txt Normal file
View File

@ -0,0 +1,7 @@
pbr>=1.1.0
PyYAML
cherrypy
routes
requests
openstacksdk

34
setup.cfg Normal file
View File

@ -0,0 +1,34 @@
[metadata]
name = zuul-registry
summary = A speculative container image registry for Zuul
description-file =
README.rst
author = Zuul Team
author-email = zuul-discuss@lists.zuul-ci.org
home-page = https://zuul-ci.org/
python-requires = >=3.5
classifier =
Intended Audience :: Information Technology
Intended Audience :: System Administrators
License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Operating System :: POSIX :: Linux
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3.5
[pbr]
warnerrors = True
[global]
setup_hooks =
zuul_registry._setup_hook.setup_hook
[entry_points]
console_scripts =
zuul-registry = zuul_registry.main:main
[build_sphinx]
source-dir = doc/source
build-dir = doc/build
all_files = 1
warning-is-error = 1

20
setup.py Normal file
View File

@ -0,0 +1,20 @@
# Copyright 2019 Red Hat, Inc.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import setuptools
setuptools.setup(
setup_requires=['pbr'],
pbr=True)

23
tools/conf.yaml Normal file
View File

@ -0,0 +1,23 @@
registry:
address: '127.0.0.1'
port: 9000
tls-cert: /path/to/cert.pem
tls-key: /path/to/cert.key
#shadow:
# docker.io:
# hostname: local-docker-mirror.example.com
# port: 443
users:
- name: testuser
pass: testpass
access: write
- name: anonymous
pass: ''
access: read
xstorage:
driver: swift
cloud: registry
container: test_registry
storage:
driver: filesystem
root: /tmp/storage

66
tools/delete_container.py Normal file
View File

@ -0,0 +1,66 @@
#!/usr/bin/env python3
#
# Copyright 2019 Red Hat, Inc.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import argparse
import openstack
import requests
import logging
import os
logging.basicConfig(level=logging.INFO)
# logging.getLogger("requests").setLevel(logging.DEBUG)
# logging.getLogger("keystoneauth").setLevel(logging.INFO)
# logging.getLogger("stevedore").setLevel(logging.INFO)
logging.captureWarnings(True)
def main():
parser = argparse.ArgumentParser(
description="Delete a swift container"
)
parser.add_argument('cloud',
help='Name of the cloud to use when uploading')
parser.add_argument('container',
help='Name of the container to use when uploading')
args = parser.parse_args()
cloud = openstack.connect(cloud=args.cloud)
sess = cloud.config.get_session()
adapter = requests.adapters.HTTPAdapter(pool_maxsize=100)
sess.mount('https://', adapter)
container = cloud.get_container(args.container)
print('Found container', container)
print()
for x in cloud.object_store.objects(args.container):
print('Delete object', x.name)
if x.name == '/':
endpoint = cloud.object_store.get_endpoint()
container = os.path.join(endpoint, args.container)
cloud.session.delete(container + '//')
else:
cloud.object_store.delete_object(x)
print()
print('Delete container', container)
cloud.object_store.delete_container(args.container)
if __name__ == "__main__":
main()

24
tools/test.sh Executable file
View File

@ -0,0 +1,24 @@
#!/bin/bash
# Copyright 2019 Red Hat, Inc.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
rm -fr /tmp/storage
docker rmi localhost:9000/zuul/registry
docker image prune -f
docker load <registry.img
docker image push localhost:9000/zuul/registry
docker rmi localhost:9000/zuul/registry
docker image prune -f
docker image pull localhost:9000/zuul/registry

View File

@ -0,0 +1,90 @@
# Copyright 2019 Red Hat, Inc.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import os
import time
import storageutils
class FilesystemDriver(storageutils.StorageDriver):
def __init__(self, conf):
self.root = conf['root']
def list_objects(self, path):
now = time.time()
path = os.path.join(self.root, path)
if not os.path.isdir(path):
return []
ret = []
for f in os.listdir(path):
obj_path = os.path.join(path, f)
ret.append(storageutils.ObjectInfo(
obj_path, f, os.stat(obj_path).st_ctime,
os.path.isdir(obj_path)))
return ret
def get_object_size(self, path):
path = os.path.join(self.root, path)
if not os.path.exists(path):
return None
return os.stat(path).st_size
def put_object(self, path, data):
path = os.path.join(self.root, path)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wb') as f:
if isinstance(data, bytes):
f.write(data)
else:
for chunk in data:
f.write(chunk)
def get_object(self, path):
path = os.path.join(self.root, path)
if not os.path.exists(path):
return None
with open(path, 'rb') as f:
return f.read()
def delete_object(self, path):
path = os.path.join(self.root, path)
if os.path.exists(path):
if os.path.isdir(path):
os.rmdir(path)
else:
os.unlink(path)
def move_object(self, src_path, dst_path):
src_path = os.path.join(self.root, src_path)
dst_path = os.path.join(self.root, dst_path)
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
os.rename(src_path, dst_path)
def cat_objects(self, path, chunks):
path = os.path.join(self.root, path)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wb') as outf:
for chunk_path in chunks:
chunk_path = os.path.join(self.root, chunk_path)
with open(chunk_path, 'rb') as inf:
while True:
d = inf.read(4096)
if not d:
break
outf.write(d)
for chunk_path in chunks:
chunk_path = os.path.join(self.root, chunk_path)
os.unlink(chunk_path)
Driver = FilesystemDriver

334
zuul_registry/main.py Normal file
View File

@ -0,0 +1,334 @@
# Copyright 2019 Red Hat, Inc.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import argparse
import sys
import logging
import cherrypy
import filesystem
import storage
import swift
import hashlib
import json
import pprint
import urllib
import yaml
DRIVERS = {
'filesystem': filesystem.Driver,
'swift': swift.Driver,
}
class Authorization:
def __init__(self, users):
self.ro = {}
self.rw = {}
for user in users:
if user['access'] == 'write':
self.rw[user['name']] = user['pass']
self.ro[user['name']] = user['pass']
def require_write(self, realm, user, password):
return self.check(self.rw, user, password)
def require_read(self, realm, user, password):
return self.check(self.ro, user, password)
def check(self, store, user, password):
if user not in store:
return False
return store[user] == password
class RegistryAPI:
"""Registry API server.
Implements the container registry protocol as documented in
https://docs.docker.com/registry/spec/api/
"""
log = logging.getLogger("registry.api")
def __init__(self, store, authz):
self.storage = store
self.authz = authz
self.shadow = None
# These are used in a decorator; they dispatch to the
# Authorization method of the same name. The eventual deferenced
# object is the instance of this class.
def require_write(*args):
return cherrypy.request.app.root.authz.require_write(*args)
def require_read(*args):
return cherrypy.request.app.root.authz.require_read(*args)
def get_namespace(self):
if not self.shadow:
return '_local'
return cherrypy.request.headers['Host']
def not_found(self):
if not self.shadow:
raise cherrypy.HTTPError(404)
# TODO: Proxy the request (this is where we implement the
# buildset registry functionality).
host = cherrypy.request.headers['Host']
method = cherrypy.request.method
path = cherrypy.request.path_info
url = self.shadow.get(host)
if not url:
raise cherrypy.HTTPError(404)
url = urllib.parse.urljoin(url, path)
self.log.debug("Proxy request %s %s", method, url)
@cherrypy.expose
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
@cherrypy.config(**{'tools.auth_basic.checkpassword': require_read})
def version_check(self):
self.log.info('Version check')
return {'version': '1.0'}
res = cherrypy.response
res.headers['Distribution-API-Version'] = 'registry/2.0'
@cherrypy.expose
@cherrypy.config(**{'tools.auth_basic.checkpassword': require_read})
def get_blob(self, repository, digest):
namespace = self.get_namespace()
method = cherrypy.request.method
self.log.info('%s blob %s %s', method, repository, digest)
size = self.storage.blob_size(namespace, digest)
if size is None:
return self.not_found()
res = cherrypy.response
res.headers['Docker-Content-Digest'] = digest
if method != 'HEAD':
data = self.storage.get_blob(namespace, digest)
return data
return {}
@cherrypy.expose
@cherrypy.config(**{'tools.auth_basic.checkpassword': require_write})
def start_upload(self, repository, digest=None):
namespace = self.get_namespace()
method = cherrypy.request.method
uuid = self.storage.start_upload(namespace)
self.log.info('Start upload %s %s uuid %s digest %s',
method, repository, uuid, digest)
res = cherrypy.response
res.headers['Location'] = '/v2/%s/blobs/uploads/%s' % (repository, uuid)
res.headers['Docker-Upload-UUID'] = uuid
res.headers['Range'] = '0-0'
res.status = '202 Accepted'
@cherrypy.expose
@cherrypy.config(**{'tools.auth_basic.checkpassword': require_write})
def upload_chunk(self, repository, uuid):
self.log.info('Upload chunk %s %s', repository, uuid)
namespace = self.get_namespace()
old_length, new_length = self.storage.upload_chunk(
namespace, uuid, cherrypy.request.body)
res = cherrypy.response
res.headers['Location'] = '/v2/%s/blobs/uploads/%s' % (repository, uuid)
res.headers['Docker-Upload-UUID'] = uuid
res.headers['Range'] = '0-%s' % (new_length,)
res.status = '204 No Content'
self.log.info('Finish Upload chunk %s %s %s', repository, uuid, new_length)
@cherrypy.expose
@cherrypy.config(**{'tools.auth_basic.checkpassword': require_write})
def finish_upload(self, repository, uuid, digest):
self.log.info('Finish upload %s %s', repository, uuid)
namespace = self.get_namespace()
old_length, new_length = self.storage.upload_chunk(
namespace, uuid, cherrypy.request.body)
self.storage.store_upload(namespace, uuid, digest)
res = cherrypy.response
res.headers['Location'] = '/v2/%s/blobs/%s' % (repository, digest)
res.headers['Docker-Content-Digest'] = digest
res.headers['Content-Range'] = '%s-%s' % (old_length, new_length)
res.status = '204 No Content'
@cherrypy.expose
@cherrypy.config(**{'tools.auth_basic.checkpassword': require_write})
def put_manifest(self, repository, ref):
namespace = self.get_namespace()
body = cherrypy.request.body.read()
hasher = hashlib.sha256()
hasher.update(body)
digest = 'sha256:' + hasher.hexdigest()
self.log.info('Put manifest %s %s digest %s', repository, ref, digest)
self.storage.put_blob(namespace, digest, body)
manifest = self.storage.get_manifest(namespace, repository, ref)
if manifest is None:
manifest = {}
else:
manifest = json.loads(manifest)
manifest[cherrypy.request.headers['Content-Type']] = digest
self.storage.put_manifest(
namespace, repository, ref, json.dumps(manifest).encode('utf8'))
res = cherrypy.response
res.headers['Location'] = '/v2/%s/manifests/%s' % (repository, ref)
res.headers['Docker-Content-Digest'] = digest
res.status = '201 Created'
@cherrypy.expose
@cherrypy.config(**{'tools.auth_basic.checkpassword': require_read})
def get_manifest(self, repository, ref):
namespace = self.get_namespace()
headers = cherrypy.request.headers
res = cherrypy.response
self.log.info('Get manifest %s %s', repository, ref)
if ref.startswith('sha256:'):
manifest = self.storage.get_blob(namespace, ref)
if manifest is None:
self.log.error('Manifest %s %s not found', repository, ref)
return self.not_found()
res.headers['Content-Type'] = json.loads(manifest)['mediaType']
res.headers['Docker-Content-Digest'] = ref
return manifest
manifest = self.storage.get_manifest(namespace, repository, ref)
if manifest is None:
manifest = {}
else:
manifest = json.loads(manifest)
for ct in [x.strip() for x in headers['Accept'].split(',')]:
if ct in manifest:
self.log.debug('Manifest %s %s digest found %s',
repository, ref, manifest[ct])
data = self.storage.get_blob(namespace, manifest[ct])
res.headers['Content-Type'] = ct
res.headers['Docker-Content-Digest'] = manifest[ct]
hasher = hashlib.sha256()
hasher.update(data)
self.log.debug('Retrieved sha256 %s', hasher.hexdigest())
return data
self.log.error('Manifest %s %s not found', repository, ref)
return self.not_found()
class RegistryServer:
log = logging.getLogger("registry.server")
def __init__(self, config_path):
self.log.info("Loading config from %s", config_path)
self._load_config(config_path)
# TODO: pyopenssl?
cherrypy.server.ssl_module = 'builtin'
cherrypy.server.ssl_certificate = self.conf['tls-cert']
cherrypy.server.ssl_private_key = self.conf['tls-key']
driver = self.conf['storage']['driver']
backend = DRIVERS[driver](self.conf['storage'])
self.store = storage.Storage(backend, self.conf['storage'])
authz = Authorization(self.conf['users'])
route_map = cherrypy.dispatch.RoutesDispatcher()
api = RegistryAPI(self.store, authz)
route_map.connect('api', '/v2/',
controller=api, action='version_check')
route_map.connect('api', '/v2/{repository:.*}/blobs/uploads/',
controller=api, action='start_upload')
route_map.connect('api', '/v2/{repository:.*}/blobs/uploads/{uuid}',
conditions=dict(method=['PATCH']),
controller=api, action='upload_chunk')
route_map.connect('api', '/v2/{repository:.*}/blobs/uploads/{uuid}',
conditions=dict(method=['PUT']),
controller=api, action='finish_upload')
route_map.connect('api', '/v2/{repository:.*}/manifests/{ref}',
conditions=dict(method=['PUT']),
controller=api, action='put_manifest')
route_map.connect('api', '/v2/{repository:.*}/manifests/{ref}',
conditions=dict(method=['GET']),
controller=api, action='get_manifest')
route_map.connect('api', '/v2/{repository:.*}/blobs/{digest}',
controller=api, action='get_blob')
conf = {
'/': {
'request.dispatch': route_map
}
}
cherrypy.config.update({
'global': {
'environment': 'production',
'server.socket_host': self.conf['address'],
'server.socket_port': self.conf['port'],
'tools.auth_basic.on': True,
'tools.auth_basic.realm': 'Registry',
'tools.auth_basic.accept_charset': 'UTF-8',
},
})
cherrypy.tree.mount(api, '/', config=conf)
def _load_config(self, path):
with open(path) as f:
conf = yaml.safe_load(f.read())
self.conf = conf['registry']
@property
def port(self):
return cherrypy.server.bound_addr[1]
def start(self):
self.log.info("Registry starting")
cherrypy.engine.start()
def stop(self):
self.log.info("Registry stopping")
cherrypy.engine.exit()
# Not strictly necessary, but without this, if the server is
# started again (e.g., in the unit tests) it will reuse the
# same host/port settings.
cherrypy.server.httpserver = None
def prune(self):
self.store.prune()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Zuul registry server')
parser.add_argument('-c', dest='config',
help='Config file path',
default='/conf/registry.conf')
parser.add_argument('-d', dest='debug',
help='Debug log level',
action='store_true')
parser.add_argument('command',
nargs='?',
help='Command: serve, prune',
default='serve')
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
cherrypy.log.access_log.propagate = False
logging.getLogger("requests").setLevel(logging.DEBUG)
logging.getLogger("keystoneauth").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.DEBUG)
logging.getLogger("stevedore").setLevel(logging.INFO)
logging.getLogger("openstack").setLevel(logging.DEBUG)
#cherrypy.log.error_log.propagate = False
s = RegistryServer(args.config)
if args.command == 'serve':
s.start()
cherrypy.engine.block()
elif args.command == 'prune':
s.prune()
else:
print("Unknown command: %s", args.command)
sys.exit(1)

298
zuul_registry/storage.py Normal file
View File

@ -0,0 +1,298 @@
# Copyright 2019 Red Hat, Inc.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import base64
import json
import logging
import os
import queue
import rehash
import threading
import time
from uuid import uuid4
class UploadRecord:
"""Information about an upload.
This class holds information about an upload in progress. It is
designed to be serialized into object storage and stored along
with the data of the upload so that as each new chunk is uploaded,
this is updated.
The registry protocol guarantees that upload chunks are
sequential, so this does not need to be locked for use by multiple
writers.
The most important part of this (that which could not otherwise be
constructed from a simple object listing) is the resumable hash of
the contents. We need to calculate the hash of the complete
upload, but we would like to support chunks being written by
different writers (for example, in a simple round-robin load
balancer). If we store the state of the hash algorithm after each
chunk is uploaded, we can avoid having to download the entire data
again at the end merely to calculate the hash.
"""
def __init__(self):
self.chunks = []
self.hasher = rehash.sha256()
@property
def count(self):
return len(self.chunks)
@property
def size(self):
return sum([x['size'] for x in self.chunks])
@property
def digest(self):
return 'sha256:' + self.hasher.hexdigest()
def load(self, data):
data = json.loads(data.decode('utf8'))
self.chunks = data['chunks']
hash_state = data['hash_state']
hash_state['md_data'] = base64.decodebytes(hash_state['md_data'].encode('ascii'))
self.hasher.__setstate__(hash_state)
def dump(self):
hash_state = self.hasher.__getstate__()
hash_state['md_data'] = base64.encodebytes(hash_state['md_data']).decode('ascii')
data = dict(chunks = self.chunks,
hash_state = hash_state)
return json.dumps(data).encode('utf8')
class UploadStreamer:
"""Stream an upload to the object storage.
This returns data from an internal buffer as a generator. Pass
this to the `put_object` method to supply streaming data to it in
one thread, while another adds data to the buffer using the
`write` method.
"""
def __init__(self):
self.queue = queue.Queue()
def write(self, data):
self.queue.put(data)
def __iter__(self):
while True:
d = self.queue.get()
if d is None:
break
yield d
class Storage:
"""Storage abstraction layer.
This class abstracts different storage backends, providing a
convenience API to the registry.
Most of these methods take a namespace argument. The namespace
is, essentially, an entire registry isolated from the other
namespaces. They may even have duplicate object data. This
allows us to support serving multiple registries from the same
process (without confusing the contents of them).
"""
# Clients have 1 hour to complete an upload before we start
# deleting stale objects.
upload_exp = 60 * 60
log = logging.getLogger('registry.storage')
def __init__(self, backend, conf):
self.backend = backend
if 'expiration' in conf:
self.manifest_exp = conf['expiration']
else:
self.manifest_exp = None
def blob_size(self, namespace, digest):
path = os.path.join(namespace, 'blobs', digest, 'data')
return self.backend.get_object_size(path)
def put_blob(self, namespace, digest, data):
path = os.path.join(namespace, 'blobs', digest, 'data')
return self.backend.put_object(path, data)
def get_blob(self, namespace, digest):
path = os.path.join(namespace, 'blobs', digest, 'data')
return self.backend.get_object(path)
def start_upload(self, namespace):
"""Start an upload.
Create an empty UploadRecord and store it. Later methods will
add to it. The uuid attribute of the UploadRecord uniquely
identifies the upload.
Uploads have one or more chunks. See `upload_chunk`.
"""
uuid = uuid4().hex
path = os.path.join(namespace, 'uploads', uuid, 'metadata')
upload = UploadRecord()
self._update_upload(namespace, uuid, upload)
return uuid
def _get_upload(self, namespace, uuid):
path = os.path.join(namespace, 'uploads', uuid, 'metadata')
data = self.backend.get_object(path)
upload = UploadRecord()
upload.load(data)
return upload
def _update_upload(self, namespace, uuid, upload):
path = os.path.join(namespace, 'uploads', uuid, 'metadata')
self.backend.put_object(path, upload.dump())
def upload_chunk(self, namespace, uuid, fp):
"""Add a chunk to an upload.
Uploads contain one or more chunk of data which are ultimately
concatenated into one blob.
This streams the data from `fp` and writes it into the
registry.
:arg namespace str: The registry namespace.
:arg uuid str: The UUID of the upload.
:arg file fp: An open file pointer to the source data.
"""
upload = self._get_upload(namespace, uuid)
path = os.path.join(namespace, 'uploads', uuid, str(upload.count + 1))
streamer = UploadStreamer()
t = threading.Thread(target=self.backend.put_object,
args=(path, streamer))
t.start()
size = 0
while True:
try:
d = fp.read(4096)
except ValueError:
# We get this on an empty body
d = b''
if not d:
break
upload.hasher.update(d)
size += len(d)
streamer.write(d)
streamer.write(None)
t.join()
upload.chunks.append(dict(size=size))
self._update_upload(namespace, uuid, upload)
return upload.size-size, upload.size
def store_upload(self, namespace, uuid, digest):
"""Complete an upload.
Verify the supplied digest matches the uploaded data, and if
so, stores the uploaded data as a blob in the registry. Until
this is called, the upload is incomplete and the data blob is
not addressible.
"""
upload = self._get_upload(namespace, uuid)
if digest != upload.digest:
raise Exception('Digest does not match %s %s' %
(digest, upload.digest))
# Move the chunks into the blob dir to get them out of the
# uploads dir.
chunks = []
for i in range(1, upload.count+1):
src_path = os.path.join(namespace, 'uploads', uuid, str(i))
dst_path = os.path.join(namespace, 'blobs', digest, str(i))
chunks.append(dst_path)
self.backend.move_object(src_path, dst_path)
# Concatenate the chunks into one blob.
path = os.path.join(namespace, 'blobs', digest, 'data')
self.backend.cat_objects(path, chunks)
path = os.path.join(namespace, 'uploads', uuid, 'metadata')
self.backend.delete_object(path)
def put_manifest(self, namespace, repo, tag, data):
path = os.path.join(namespace, 'repos', repo, 'manifests', tag)
self.backend.put_object(path, data)
def get_manifest(self, namespace, repo, tag):
path = os.path.join(namespace, 'repos', repo, 'manifests', tag)
return self.backend.get_object(path)
def prune(self):
"""Prune the registry
Prune all namespaces in the registry according to configured
expiration times.
"""
now = time.time()
upload_target = now - self.upload_exp
if self.manifest_exp:
manifest_target = now - self.manifest_exp
else:
manifest_target = None
for namespace in self.backend.list_objects(''):
uploadpath = os.path.join(namespace.path, 'uploads/')
for upload in self.backend.list_objects(uploadpath):
self._prune(upload, upload_target)
if not manifest_target:
continue
repopath = os.path.join(namespace.path, 'repos/')
for repo in self.backend.list_objects(repopath):
kept_manifests = self._prune(repo, manifest_target)
# mark/sweep manifest blobs
layers = set()
for manifest in kept_manifests:
if manifest.isdir:
continue
layers.update(self._get_layers_from_manifest(
namespace.name, manifest.path))
blobpath = os.path.join(namespace.path, 'blobs/')
for blob in self.backend.list_objects(blobpath):
if blob.name not in layers:
self._prune(blob, upload_target)
def _get_layers_from_manifest(self, namespace, path):
self.log.debug('Get layers %s', path)
data = self.backend.get_object(path)
manifest = json.loads(data)
target = manifest.get('application/vnd.docker.distribution.manifest.v2+json')
layers = []
if not target:
self.log.debug('Unknown manifest %s', path)
return layers
layers.append(target)
data = self.get_blob(namespace, target)
manifest = json.loads(data)
layers.append(manifest['config']['digest'])
for layer in manifest['layers']:
layers.append(layer['digest'])
return layers
def _prune(self, root_obj, target):
kept = []
if root_obj.isdir:
for obj in self.backend.list_objects(root_obj.path):
kept.extend(self._prune(obj, target))
if not kept and root_obj.ctime < target:
self.log.debug('Prune %s', root_obj.path)
self.backend.delete_object(root_obj.path)
else:
self.log.debug('Keep %s', root_obj.path)
kept.append(root_obj)
return kept

View File

@ -0,0 +1,123 @@
# Copyright 2019 Red Hat, Inc.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
from abc import ABCMeta, abstractmethod
class ObjectInfo:
def __init__(self, path, name, ctime, isdir):
self.path = path
self.name = name
self.ctime = ctime
self.isdir = isdir
class StorageDriver(metaclass=ABCMeta):
"""Base class for storage drivers.
Storage drivers should implement all of the methods in this class.
This is a low-level API with no knowledge of the intended use as
an image registry. This makes it easy to add backend drivers
since the storage abstraction layer is designed to deal with the
lowest common denominator.
"""
@abstractmethod
def __init__(self, conf):
"""Initialize a driver.
:arg dict conf: The 'storage' section from the config file.
"""
pass
@abstractmethod
def list_objects(self, path):
"""List objects at path.
Returns a list of objects rooted at `path`, one level deep.
:arg str path: The object path.
:returns: A list of ObjectInfo objects, one for each object.
:rtype: ObjectInfo
"""
pass
@abstractmethod
def get_object_size(self, path):
"""Return the size of object at path.
:arg str path: The object path.
:returns: The size of the object in bytes.
:rtype: int
"""
pass
@abstractmethod
def put_object(self, path, data):
"""Store an object.
Store the contents of `data` at `path`. The `data` parameter
may be a bytearray or a generator which produces bytearrays.
:arg str path: The object path.
:arg bytearray data: The data to store.
"""
pass
@abstractmethod
def get_object(self, path):
"""Retrieve an object.
Return the contents of the object at `path`.
:arg str path: The object path.
:returns: The contents of the object.
:rtype: bytearray
"""
pass
@abstractmethod
def delete_object(self, path):
"""Delete an object.
Delete the object stored at `path`.
:arg str path: The object path.
"""
pass
@abstractmethod
def move_object(self, src_path, dst_path):
"""Move an object.
Move the object from `src_path` to `dst_path`.
:arg str src_path: The original path.
:arg str dst_path: The new path.
"""
pass
@abstractmethod
def cat_objects(self, path, chunks):
"""Concatenate objects.
Concatenate one or more objects to create a new object.
The original objects are deleted.
:arg str path: The new path.
:arg list chunks: A list of paths of objects to concatenate.
"""
pass

154
zuul_registry/swift.py Normal file
View File

@ -0,0 +1,154 @@
# Copyright 2019 Red Hat, Inc.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import logging
import openstack
import os
import keystoneauth1
import tempfile
import time
import json
import dateutil.parser
import storageutils
POST_ATTEMPTS = 3
def retry_function(func):
for attempt in range(1, POST_ATTEMPTS + 1):
try:
return func()
except keystoneauth1.exceptions.http.NotFound:
raise
except Exception:
if attempt >= POST_ATTEMPTS:
raise
else:
logging.exception("Error on attempt %d" % attempt)
time.sleep(attempt * 10)
class SwiftDriver(storageutils.StorageDriver):
log = logging.getLogger('registry.swift')
def __init__(self, conf):
self.cloud_name = conf['cloud']
self.container_name = conf['container']
self.conn = openstack.connect(cloud=self.cloud_name)
container = retry_function(
lambda: self.conn.get_container(self.container_name))
if not container:
self.log.info("Creating container %s", self.container_name)
retry_function(
lambda: self.conn.create_container(
name=self.container_name, public=False))
endpoint = self.conn.object_store.get_endpoint()
self.url = os.path.join(endpoint, self.container_name)
def get_url(self, path):
return os.path.join(self.url, path)
def list_objects(self, path):
self.log.debug("List objects %s", path)
url = self.get_url('') + '?prefix=%s&delimiter=/&format=json' % (path,)
ret = retry_function(
lambda: self.conn.session.get(url).content.decode('utf8'))
data = json.loads(ret)
ret = []
for obj in data:
if 'subdir' in obj:
objpath = obj['subdir']
name = obj['subdir'].split('/')[-2]
ctime = time.time()
isdir = True
else:
objpath = obj['name']
name = obj['name'].split('/')[-1]
ctime = dateutil.parser.parse(obj['last_modified']+'Z').timestamp()
isdir = False
ret.append(storageutils.ObjectInfo(
objpath, name, ctime, isdir))
return ret
def get_object_size(self, path):
try:
ret = retry_function(
lambda: self.conn.session.head(self.get_url(path)))
except keystoneauth1.exceptions.http.NotFound:
return None
return ret.headers['Content-Length']
def put_object(self, path, data):
name = None
try:
with tempfile.NamedTemporaryFile('wb', delete=False) as f:
name = f.name
if isinstance(data, bytes):
f.write(data)
else:
for chunk in data:
f.write(chunk)
retry_function(
lambda: self.conn.object_store.upload_object(
self.container_name,
path,
filename=name))
finally:
if name:
os.unlink(name)
def get_object(self, path):
try:
ret = retry_function(
lambda: self.conn.session.get(self.get_url(path)))
except keystoneauth1.exceptions.http.NotFound:
return None
return ret.content
def delete_object(self, path):
retry_function(
lambda: self.conn.session.delete(
self.get_url(path)))
def move_object(self, src_path, dst_path):
dst = os.path.join(self.container_name, dst_path)
retry_function(
lambda: self.conn.session.request(
self.get_url(src_path)+"?multipart-manfest=get",
'COPY',
headers={'Destination': dst}
))
retry_function(
lambda: self.conn.session.delete(
self.get_url(src_path)))
def cat_objects(self, path, chunks):
manifest = []
#TODO: Would it be better to move 1-chunk objects?
for chunk_path in chunks:
ret = retry_function(
lambda: self.conn.session.head(self.get_url(chunk_path)))
if int(ret.headers['Content-Length']) == 0:
continue
manifest.append({'path':
os.path.join(self.container_name, chunk_path),
'etag': ret.headers['Etag'],
'size_bytes': ret.headers['Content-Length']})
retry_function(lambda:
self.conn.session.put(
self.get_url(path)+"?multipart-manifest=put",
data=json.dumps(manifest)))
Driver = SwiftDriver