added base gearman code for client side.
Change-Id: If02456a99f17399c6a1d5b155ff3c58b79a9eade
This commit is contained in:
parent
9b432104a8
commit
1614a1e63b
3
refstack.cfg
Executable file
3
refstack.cfg
Executable file
@ -0,0 +1,3 @@
|
||||
# gearman config
|
||||
GEARMAN_SERVER = "localhost"
|
||||
GEARMAN_PORT = 4730
|
116
refstack/gearman.py
Executable file
116
refstack/gearman.py
Executable file
@ -0,0 +1,116 @@
|
||||
# Copyright 2014 Piston Cloud Computing, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import gear
|
||||
import logging
|
||||
import json
|
||||
import time
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
class RefstackGearmanClient(gear.Client):
|
||||
|
||||
def __init__(self, refstack_gearman):
|
||||
""" init wrapper sets local refstack_gearman
|
||||
object with passed in one"""
|
||||
super(RefstackGearmanClient, self).__init__()
|
||||
self.__refstack_gearman = refstack_gearman
|
||||
|
||||
|
||||
class Gearman(object):
|
||||
log = logging.getLogger("refstack.Gearman")
|
||||
|
||||
def __init__(self, config):
|
||||
""" sets up connection and client object"""
|
||||
self.config = config
|
||||
self.meta_jobs = {} # A list of meta-jobs like stop or describe
|
||||
|
||||
server = config.get('gearman', 'server')
|
||||
if config.has_option('gearman', 'port'):
|
||||
port = config.get('gearman', 'port')
|
||||
else:
|
||||
port = 4730
|
||||
|
||||
self.gearman = RefstackGearmanClient(self)
|
||||
self.gearman.addServer(server, port)
|
||||
|
||||
def add_job(self, name, params):
|
||||
""" adds job to the gearman queue"""
|
||||
self.log.info("starting test run")
|
||||
uuid = str(uuid4().hex)
|
||||
|
||||
gearman_job = gear.Job(name, json.dumps(params),
|
||||
unique=uuid)
|
||||
|
||||
if not self.is_job_registered(gearman_job.name):
|
||||
self.log.error("Job %s is not registered with Gearman" %
|
||||
gearman_job)
|
||||
self.on_job_completed(gearman_job, 'NOT_REGISTERED')
|
||||
#return build
|
||||
|
||||
try:
|
||||
self.gearman.submitJob(gearman_job)
|
||||
except Exception:
|
||||
self.log.exception("Unable to submit job to Gearman")
|
||||
self.on_build_completed(gearman_job, 'EXCEPTION')
|
||||
#return build
|
||||
|
||||
if not gearman_job.handle:
|
||||
self.log.error("No job handle was received for %s after 30 seconds"
|
||||
" marking as lost." %
|
||||
gearman_job)
|
||||
self.on_build_completed(gearman_job, 'NO_HANDLE')
|
||||
|
||||
self.log.debug("Received handle %s for job" % gearman_job.handle)
|
||||
|
||||
def on_job_completed(self, job, result=None):
|
||||
"""called when test is completed"""
|
||||
if job.unique in self.meta_jobs:
|
||||
del self.meta_jobs[job.unique]
|
||||
return
|
||||
|
||||
def is_job_registered(self, name):
|
||||
""" checks to see if job registered with gearman or not"""
|
||||
if self.function_cache_time:
|
||||
for connection in self.gearman.active_connections:
|
||||
if connection.connect_time > self.function_cache_time:
|
||||
self.function_cache = set()
|
||||
self.function_cache_time = 0
|
||||
break
|
||||
if name in self.function_cache:
|
||||
self.log.debug("Function %s is registered" % name)
|
||||
return True
|
||||
if ((time.time() - self.function_cache_time) <
|
||||
self.negative_function_cache_ttl):
|
||||
self.log.debug("Function %s is not registered "
|
||||
"(negative ttl in effect)" % name)
|
||||
return False
|
||||
self.function_cache_time = time.time()
|
||||
for connection in self.gearman.active_connections:
|
||||
try:
|
||||
req = gear.StatusAdminRequest()
|
||||
connection.sendAdminRequest(req)
|
||||
except Exception:
|
||||
self.log.exception("Exception while checking functions")
|
||||
continue
|
||||
for line in req.response.split('\n'):
|
||||
parts = [x.strip() for x in line.split()]
|
||||
if not parts or parts[0] == '.':
|
||||
continue
|
||||
self.function_cache.add(parts[0])
|
||||
if name in self.function_cache:
|
||||
self.log.debug("Function %s is registered" % name)
|
||||
return True
|
||||
self.log.debug("Function %s is not registered" % name)
|
||||
return False
|
@ -20,3 +20,4 @@ python-openid==2.2.5
|
||||
requests==1.2.3
|
||||
python-keystoneclient
|
||||
Jinja2==2.7.1
|
||||
gear>=0.5.4,<1.0.0
|
Loading…
Reference in New Issue
Block a user