Refactor java written SDaemon

This patch refactors java written SDaemon, by migrating
task specific logics from SDaemon to each Task classes.

*bonus* eliminate trailing white space that can not be
found by pep8 (because it's Java code!)

Change-Id: Ife21fe368a17428dab27cf245ada4f3c0f9f1278
This commit is contained in:
Takashi Kajinami 2019-08-07 22:16:07 +09:00
parent 92e0e58d7f
commit fa433a44c1
9 changed files with 216 additions and 149 deletions

View File

@ -28,7 +28,7 @@ import org.slf4j.Logger;
*
* A common parent object for different Tasks created by STaskFactory
* */
public class SAbstractTask {
public abstract class SAbstractTask {
protected Logger logger;
@ -52,4 +52,5 @@ public class SAbstractTask {
return bStatus;
}
public abstract boolean exec();
}

View File

@ -20,6 +20,7 @@ package org.openstack.storlet.daemon;
import java.io.IOException;
import java.io.OutputStream;
import org.openstack.storlet.daemon.SExecutionManager;
import org.slf4j.Logger;
/*----------------------------------------------------------------------------
@ -32,29 +33,37 @@ import org.slf4j.Logger;
public class SCancelTask extends SAbstractTask {
private OutputStream sOut_ = null;
private String taskId_ = null;
private SExecutionManager sExecManager_ = null;
/*------------------------------------------------------------------------
* CTOR
* */
public SCancelTask(OutputStream sOut, Logger logger, String taskId) {
public SCancelTask(OutputStream sOut, Logger logger,
SExecutionManager sExecManager, String taskId) {
super(logger);
this.sOut_ = sOut;
this.sExecManager_ = sExecManager;
this.taskId_ = taskId;
}
public String getTaskId() {
return taskId_;
}
public OutputStream getSOut() {
return sOut_;
}
/*------------------------------------------------------------------------
* run
* exec
* */
public boolean run() {
return respond(this.sOut_, true, new String("OK"));
@Override
public boolean exec() {
boolean respStatus;
String respMessage;
boolean result = this.sExecManager_.cancelTask(this.taskId_);
if (result) {
respStatus = true;
respMessage = new String("OK");
} else {
respStatus = false;
respMessage = new String("Task id " + this.taskId_
+ "is not found");
}
return respond(this.sOut_, respStatus, respMessage);
}
}
/* ============================== END OF FILE =============================== */

View File

@ -20,21 +20,19 @@ package org.openstack.storlet.daemon;
import java.io.OutputStream;
import java.io.IOException;
import java.util.HashMap;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.Level;
import org.openstack.storlet.common.*;
import org.openstack.storlet.daemon.STaskFactory;
import org.openstack.storlet.daemon.SExecutionManager;
import org.openstack.storlet.sbus.*;
import java.util.concurrent.*;
/*----------------------------------------------------------------------------
* SDaemon
*
*
* This class acts as a language binding and management layer for
* user's Storlet logic implementation(~s?)
* */
@ -43,10 +41,8 @@ public class SDaemon {
private static Logger logger_;
private static SBus sbus_;
private static STaskFactory storletTaskFactory_;
private static ExecutorService threadPool_;
private static String strStorletName_;
private static HashMap<String, Future> taskIdToTask_;
private static int nDefaultTimeoutToWaitBeforeShutdown_ = 3;
private static SExecutionManager sExecManager_;
private static boolean initLog(final String strClassName,
final String strLogLevel) {
@ -80,25 +76,26 @@ public class SDaemon {
/*------------------------------------------------------------------------
* main
*
*
* Entry point.
* args[0] - storlet class name
* args[1] - path to SBus
* args[2] - log level
* args[3] - thread pool size
*
* args[4] - container id
*
* Invocation from CLI example:
* java -Djava.library.path=. ...
*
*
* when packed in a .jar with the native .so use:
* java
* -Djava.library.path=.
* -Djava.class.path=.:./storletdaemon.jar
* org.openstack.storlet.daemon.StorletDaemon
* <args>
*
*
* where <args> can be: storlet.test.TestStorlet /tmp/aaa FINE 5
*
*
* */
public static void main(String[] args) throws Exception {
initialize(args);
@ -108,7 +105,7 @@ public class SDaemon {
/*------------------------------------------------------------------------
* initialize
*
*
* Initialize the resources
* */
private static void initialize(String[] args) throws Exception {
@ -135,14 +132,14 @@ public class SDaemon {
logger_.error(strStorletName_ + ": Failed to create SBus");
return;
}
logger_.trace("Initialising thread pool with " + nPoolSize + " threads");
threadPool_ = Executors.newFixedThreadPool(nPoolSize);
taskIdToTask_ = new HashMap<String, Future>();
sExecManager_ = new SExecutionManager(strStorletName_, logger_, nPoolSize);
sExecManager_.initialize();
}
/*------------------------------------------------------------------------
* mainLoop
*
*
* The main loop - listen, receive, execute till the HALT command.
* */
private static void mainLoop() throws Exception {
@ -179,7 +176,7 @@ public class SDaemon {
/*------------------------------------------------------------------------
* processDatagram
*
*
* Analyze the request datagram. Invoke the relevant storlet
* or do some other job ( halt, description, or maybe something
* else in the future ).
@ -190,7 +187,7 @@ public class SDaemon {
try {
logger_.trace(strStorletName_ + ": Calling createStorletTask with "
+ dtg.toString());
sTask = storletTaskFactory_.createStorletTask(dtg);
sTask = storletTaskFactory_.createStorletTask(dtg, sExecManager_);
} catch (StorletException e) {
logger_.trace(strStorletName_ + ": Failed to init task "
+ e.toString());
@ -201,80 +198,20 @@ public class SDaemon {
logger_.error(strStorletName_
+ ": Unknown command received Quitting");
bStatus = false;
} else if (sTask instanceof SHaltTask) {
logger_.trace(strStorletName_ + ": Got Halt Command");
bStatus = false;
} else if (sTask instanceof SExecutionTask) {
logger_.trace(strStorletName_ + ": Got Invoke command");
Future futureTask = threadPool_.submit((SExecutionTask) sTask);
String taskId = futureTask.toString().split("@")[1];
((SExecutionTask) sTask).setTaskIdToTask(taskIdToTask_);
((SExecutionTask) sTask).setTaskId(taskId);
logger_.trace(strStorletName_ + ": task id is " + taskId);
synchronized (taskIdToTask_) {
taskIdToTask_.put(taskId, futureTask);
}
OutputStream taskIdOut = ((SExecutionTask) sTask).getTaskIdOut();
try {
taskIdOut.write(taskId.getBytes());
} catch (IOException e) {
logger_.trace(strStorletName_ + ": problem returning taskId "
+ taskId + ": " + e.toString());
bStatus = false;
} finally {
try{
taskIdOut.close();
} catch (IOException e) {
}
}
} else if (sTask instanceof SDescriptorTask) {
logger_.trace(strStorletName_ + ": Got Descriptor command");
((SDescriptorTask) sTask).run();
} else if (sTask instanceof SPingTask) {
logger_.trace(strStorletName_ + ": Got Ping command");
bStatus = ((SPingTask) sTask).run();
} else if (sTask instanceof SCancelTask) {
String taskId = ((SCancelTask) sTask).getTaskId();
logger_.trace(strStorletName_ + ": Got Cancel command for taskId "
+ taskId);
if (taskIdToTask_.get(taskId) == null) {
bStatus = false;
logger_.trace(strStorletName_ + ": COULD NOT FIND taskId "
+ taskId);
try {
((SCancelTask) sTask).getSOut().write(
(new String("BAD")).getBytes());
} catch (IOException e) {
}
} else {
logger_.trace(strStorletName_ + ": good. found taskId "
+ taskId);
(taskIdToTask_.get(taskId)).cancel(true);
taskIdToTask_.remove(taskId);
}
bStatus = ((SCancelTask) sTask).run();
} else {
bStatus = sTask.exec();
}
return bStatus;
}
/*------------------------------------------------------------------------
* exit
*
*
* Release the resources and quit
* */
private static void exit() {
logger_.info(strStorletName_ + ": Daemon for storlet "
+ strStorletName_ + " is going down...shutting down threadpool");
try {
threadPool_.awaitTermination(nDefaultTimeoutToWaitBeforeShutdown_,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPool_.shutdown();
logger_.info(strStorletName_ + ": threadpool down");
logger_.info(strStorletName_ + ": Daemon for storlet " + strStorletName_ +
" is going down...");
sExecManager_.terminate();
}
}

View File

@ -41,13 +41,15 @@ public class SDescriptorTask extends SAbstractTask {
}
/*------------------------------------------------------------------------
* run
* exec
* */
public void run() {
logger.trace("StorletDescriptorTask: " + "run going to extract key "
@Override
public boolean exec() {
boolean bStatus = true;
logger.trace("StorletDescriptorTask: " + "exec going to extract key "
+ strKey_);
ObjectRequestEntry entry = requestsTable_.Get(strKey_);
logger.trace("StorletDescriptorTask: " + "run got entry "
logger.trace("StorletDescriptorTask: " + "exec got entry "
+ entry.toString());
try {
logger.trace("StorletDescriptorTask: "
@ -57,7 +59,9 @@ public class SDescriptorTask extends SAbstractTask {
+ "run obj stream is in the table ");
} catch (InterruptedException e) {
logger.error("InterruptedException while putting obj stream");
bStatus = false;
}
return bStatus;
}
}
/* ============================== END OF FILE =============================== */

View File

@ -0,0 +1,102 @@
/*
* Copyright (c) 2015, 2016 OpenStack Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openstack.storlet.daemon;
import java.io.OutputStream;
import java.io.IOException;
import java.util.HashMap;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.Level;
import java.util.concurrent.*;
/*----------------------------------------------------------------------------
* SExecutionManager
*
* This class manages tread workers to execute storlet application
* */
public class SExecutionManager {
private Logger logger_;
private ExecutorService threadPool_;
private String strStorletName_;
private int nPoolSize;
private HashMap<String, Future> taskIdToTask_;
private static int nDefaultTimeoutToWaitBeforeShutdown_ = 3;
public SExecutionManager(final String strStorletName,
final Logger logger, final int nPoolSize) {
this.strStorletName_ = strStorletName;
this.logger_ = logger;
this.nPoolSize = nPoolSize;
}
public void initialize() {
this.logger_.trace("Initialising thread pool with "
+ nPoolSize + " threads");
this.threadPool_ = Executors.newFixedThreadPool(nPoolSize);
this.taskIdToTask_ = new HashMap<String, Future>();
}
public void terminate() {
this.logger_.info(this.strStorletName_ + ": Shutting down threadpool");
try {
this.threadPool_.awaitTermination(
nDefaultTimeoutToWaitBeforeShutdown_,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.threadPool_.shutdown();
this.logger_.info(this.strStorletName_ + ": threadpool down");
}
public String submitTask(final SExecutionTask sTask) {
Future futureTask = threadPool_.submit(sTask);
String taskId = futureTask.toString().split("@")[1];
synchronized (this.taskIdToTask_) {
this.taskIdToTask_.put(taskId, futureTask);
}
return taskId;
}
public boolean cancelTask(final String taskId) {
boolean bStatus = true;
if (this.taskIdToTask_.get(taskId) == null) {
this.logger_.trace(this.strStorletName_ + ": " + taskId + " is not found");
bStatus = false;
} else {
this.logger_.trace(this.strStorletName_ + ": cancelling " + taskId);
(this.taskIdToTask_.get(taskId)).cancel(true);
synchronized (this.taskIdToTask_) {
this.taskIdToTask_.remove(taskId);
}
}
return bStatus;
}
public void cleanupTask(final String taskId) {
synchronized (this.taskIdToTask_) {
this.taskIdToTask_.remove(taskId);
}
}
}

View File

@ -20,14 +20,13 @@ package org.openstack.storlet.daemon;
import org.slf4j.Logger;
import org.openstack.storlet.common.*;
import org.openstack.storlet.daemon.SExecutionManager;
import java.util.HashMap;
import java.util.ArrayList;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Future;
/*----------------------------------------------------------------------------
* SExecutionTask
*
@ -42,13 +41,14 @@ public class SExecutionTask extends SAbstractTask implements Runnable {
private HashMap<String, String> executionParams_ = null;
private OutputStream taskIdOut_ = null;
private String taskId_ = null;
private HashMap<String, Future> taskIdToTask_ = null;
private SExecutionManager sExecManager_ = null;
public SExecutionTask(IStorlet storlet,
ArrayList<StorletInputStream> instreams,
ArrayList<StorletOutputStream> outstreams, OutputStream taskIdOut,
HashMap<String, String> executionParams,
StorletLogger storletLogger, Logger logger) {
StorletLogger storletLogger, Logger logger,
SExecutionManager sExecManager) {
super(logger);
this.storlet_ = storlet;
this.inStreams_ = instreams;
@ -56,67 +56,73 @@ public class SExecutionTask extends SAbstractTask implements Runnable {
this.executionParams_ = executionParams;
this.storletLogger_ = storletLogger;
this.taskIdOut_ = taskIdOut;
this.sExecManager_ = sExecManager;
}
public ArrayList<StorletInputStream> getInStreams() {
return inStreams_;
return this.inStreams_;
}
public ArrayList<StorletOutputStream> getOutStreams() {
return outStreams_;
return this.outStreams_;
}
public HashMap<String, String> getExecutionParams() {
return executionParams_;
}
public OutputStream getTaskIdOut() {
return taskIdOut_;
}
public void setTaskId(String taskId) {
taskId_ = taskId;
}
public void setTaskIdToTask(HashMap<String, Future> taskIdToTask) {
taskIdToTask_ = taskIdToTask;
return this.executionParams_;
}
private void closeStorletInputStreams(){
for(StorletInputStream stream : inStreams_){
for(StorletInputStream stream : this.inStreams_){
stream.close();
}
}
private void closeStorletOutputStreams(){
for(StorletOutputStream stream: outStreams_){
for(StorletOutputStream stream: this.outStreams_){
stream.close();
}
}
private void closeStorletStreams(){
closeStorletInputStreams();
closeStorletOutputStreams();
this.closeStorletInputStreams();
this.closeStorletOutputStreams();
}
@Override
public boolean exec() {
boolean bStatus = true;
this.taskId_ = this.sExecManager_.submitTask((SExecutionTask) this);
try {
this.taskIdOut_.write(this.taskId_.getBytes());
} catch (IOException e) {
e.printStackTrace();
bStatus = false;
} finally {
try{
this.taskIdOut_.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return bStatus;
}
@Override
public void run() {
try {
storletLogger_.emitLog("About to invoke storlet");
storlet_.invoke(inStreams_, outStreams_, executionParams_,
this.storletLogger_.emitLog("About to invoke storlet");
this.storlet_.invoke(inStreams_, outStreams_, executionParams_,
storletLogger_);
storletLogger_.emitLog("Storlet invocation done");
synchronized (taskIdToTask_) {
taskIdToTask_.remove(taskId_);
}
this.storletLogger_.emitLog("Storlet invocation done");
this.sExecManager_.cleanupTask(this.taskId_);
} catch (StorletException e) {
storletLogger_.emitLog(e.getMessage());
this.storletLogger_.emitLog(e.getMessage());
} finally {
storletLogger_.close();
this.storletLogger_.close();
// We make sure all streams are closed
closeStorletStreams();
this.closeStorletStreams();
}
}
}

View File

@ -40,12 +40,14 @@ public class SHaltTask extends SAbstractTask {
}
/*------------------------------------------------------------------------
* run
* exec
*
* The actual response on "ping" command.
* The actual response on "halt" command.
* */
public boolean run() {
return respond(this.sOut_, true, new String("OK"));
@Override
public boolean exec() {
respond(this.sOut_, true, new String("OK"));
return false;
}
}
/* ============================== END OF FILE =============================== */

View File

@ -40,11 +40,12 @@ public class SPingTask extends SAbstractTask {
}
/*------------------------------------------------------------------------
* run
* exec
*
* The actual response on "ping" command.
* */
public boolean run() {
@Override
public boolean exec() {
return respond(this.sOut_, true, new String("OK"));
}
}

View File

@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.openstack.storlet.common.*;
import org.openstack.storlet.daemon.SExecutionTask;
import org.openstack.storlet.daemon.SExecutionManager;
import org.openstack.storlet.sbus.ServerSBusInDatagram;
/*----------------------------------------------------------------------------
@ -47,7 +48,8 @@ public class STaskFactory {
this.requestsTable_ = new ObjectRequestsTable();
}
public SAbstractTask createStorletTask(ServerSBusInDatagram dtg)
public SAbstractTask createStorletTask(
ServerSBusInDatagram dtg, SExecutionManager sExecManager)
throws StorletException {
SAbstractTask ResObj = null;
String command = dtg.getCommand();
@ -59,18 +61,19 @@ public class STaskFactory {
} else if (command.equals("SBUS_CMD_EXECUTE")) {
this.logger_.trace("createStorletTask: "
+ "received EXECUTE command");
ResObj = createExecutionTask(dtg);
ResObj = createExecutionTask(dtg, sExecManager);
} else if (command.equals("SBUS_CMD_DESCRIPTOR")) {
this.logger_.trace("createStorletTask: "
+ "received Descriptor command");
ResObj = createDescriptorTask(dtg);
} else if (command.equals("SBUS_CMD_PING")) {
this.logger_.trace("createStorletTask: " + "received Ping command");
this.logger_.trace("createStorletTask: "
+ "received Ping command");
ResObj = createPingTask(dtg);
} else if (command.equals("SBUS_CMD_CANCEL")) {
this.logger_.trace("createStorletTask: "
+ "received Cancel command");
ResObj = createCancelTask(dtg);
ResObj = createCancelTask(dtg, sExecManager);
} else {
this.logger_.error("createStorletTask: " + command
+ " is not supported");
@ -78,7 +81,8 @@ public class STaskFactory {
return ResObj;
}
private SExecutionTask createExecutionTask(ServerSBusInDatagram dtg)
private SExecutionTask createExecutionTask(
ServerSBusInDatagram dtg, SExecutionManager sExecManager)
throws StorletException {
ArrayList<StorletInputStream> inStreams = new ArrayList<StorletInputStream>();
ArrayList<StorletOutputStream> outStreams = new ArrayList<StorletOutputStream>();
@ -148,7 +152,7 @@ public class STaskFactory {
+ " is of unknown type " + strFDtype);
}
return new SExecutionTask(storlet_, inStreams, outStreams, taskIdOut,
dtg.getExecParams(), storletLogger, logger_);
dtg.getExecParams(), storletLogger, logger_, sExecManager);
}
private SDescriptorTask createDescriptorTask(ServerSBusInDatagram dtg) {
@ -202,7 +206,8 @@ public class STaskFactory {
return ResObj;
}
private SCancelTask createCancelTask(ServerSBusInDatagram dtg) {
private SCancelTask createCancelTask(
ServerSBusInDatagram dtg, SExecutionManager sExecManager) {
SCancelTask ResObj = null;
String taskId = dtg.getTaskId();
boolean bStatus = true;
@ -233,7 +238,7 @@ public class STaskFactory {
// parse descriptor stuff
this.logger_.trace("createCancelTask: "
+ "Returning StorletCancelTask");
ResObj = new SCancelTask(sOut, logger_, taskId);
ResObj = new SCancelTask(sOut, logger_, sExecManager, taskId);
}
return ResObj;
}