Follow-up of "Add Etcd backend for jobboard"

Address the two minor improvements found during the review.
 - Rename 'join' method to more explicit name
 - Allow float timeout value

Change-Id: I08b1b1584414a7675257f0aac90017e91c7cd716
This commit is contained in:
Takashi Kajinami 2024-07-08 11:30:08 +09:00
parent c3ebdc67c8
commit fc14510adb

View File

@ -207,7 +207,7 @@ class EtcdJobBoard(base.JobBoard):
("ca_cert", str),
("cert_key", str),
("cert_cert", str),
("timeout", int),
("timeout", float),
("api_path", str),
)
@ -226,7 +226,7 @@ class EtcdJobBoard(base.JobBoard):
path_elems = [self.ROOT_PATH,
self._conf.get("path", self.DEFAULT_PATH)]
self._root_path = self.join(*path_elems)
self._root_path = self._create_path(*path_elems)
self._job_cache = {}
self._job_cond = threading.Condition()
@ -238,7 +238,7 @@ class EtcdJobBoard(base.JobBoard):
self._watcher = None
self._watcher_cancel = None
def join(self, root, *args):
def _create_path(self, root, *args):
return "/".join([root] + [a.strip("/") for a in args])
def incr(self, key):
@ -275,7 +275,7 @@ class EtcdJobBoard(base.JobBoard):
return sorted(self._job_cache.values())
def _ensure_fresh(self):
prefix = self.join(self._root_path, self.JOB_PREFIX)
prefix = self._create_path(self._root_path, self.JOB_PREFIX)
jobs = self._client.get_prefix(prefix)
listed_jobs = {}
for job in jobs:
@ -435,8 +435,8 @@ class EtcdJobBoard(base.JobBoard):
created_on=timeutils.utcnow(),
book=book, details=details,
priority=job_priority)
seq = self.incr(self.join(self._root_path, self.SEQUENCE_KEY))
key = self.join(self._root_path, f"{self.JOB_PREFIX}{seq}")
seq = self.incr(self._create_path(self._root_path, self.SEQUENCE_KEY))
key = self._create_path(self._root_path, f"{self.JOB_PREFIX}{seq}")
job_posting["sequence"] = seq
raw_job_posting = jsonutils.dumps(job_posting)
@ -567,7 +567,7 @@ class EtcdJobBoard(base.JobBoard):
self._client = etcd3gw.Etcd3Client(**etcd_conf)
self._state = self.CONNECTED_STATE
watch_url = self.join(self._root_path, self.JOB_PREFIX)
watch_url = self._create_path(self._root_path, self.JOB_PREFIX)
self._thread_cancel = threading.Event()
try:
(self._watcher,