AI-based CFN Traffic Control and Computer Force Scheduling

Change-Id: I16cd7730c1e0732253ac52f51010f6b813295aa7
This commit is contained in:
Weisen Pan 2023-11-03 00:08:42 -07:00
parent 496188f902
commit a877aed45f
241 changed files with 811937 additions and 0 deletions

BIN
ai_computing_force_scheduling/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (code_sent)" project-jdk-type="Python SDK" />
</project>

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/code_sent.iml" filepath="$PROJECT_DIR$/.idea/code_sent.iml" />
</modules>
</component>
</project>

View File

@ -0,0 +1,51 @@
# AI-based Computer Force Network Traffic Control and Computer Force Matching
# Description:
Computer Force Network integrates distributed and ubiquitous computing capabilities in different geographic locations, and its sources include various computing devices such as cloud computing nodes, edge computing nodes, end devices, network devices, etc. The computing tasks in the CFN environment are large in volume and diverse in type, including data analysis, AI reasoning, graphics rendering, and other computing tasks. In this case, the traditional traffic control strategy may not be able to effectively handle the diversity and magnitude of tasks, which may lead to the waste of computing resources, delay of computing tasks, and degradation of service quality. To solve these problems, AI-based traffic control and computing force matching can be used to train AI models using deep learning algorithms by collecting a large amount of network traffic data, device state data, and task demand data. The model can not only learn the pattern of network traffic and computing tasks but also predict future traffic changes and task demands, as well as the computing capacity of devices, and adjust the traffic control strategy and arithmetic matching strategy in real-time based on this information.
With the help of AI, operators can manage traffic and computing power more effectively, reduce network congestion, improve the utilization of computing resources, reduce the latency of computing tasks, and improve the quality of service. For example, when a large number of data analysis tasks are predicted to be coming, AI systems can adjust network configurations in advance to prioritize allocating computing resources to these tasks to meet demand. When the capacity of computing devices is predicted to be insufficient to handle the upcoming tasks, the AI system can adjust the traffic control policy in advance to redirect some tasks to other devices to prevent congestion.
AI-based Computer Force Network traffic control and computer force matching bring significant performance improvements to large-scale CFN, enabling operators to manage computing resources better to meet the demands of various computing tasks.
# Implementation:
Data collection: The dataset used is extracted and aggregated based on cluster-trace-v2018
Data pre-processing: Pre-process the collected data, including data cleaning, format conversion, feature extraction, etc.
Model selection training: According to the characteristics and needs of CFN, suitable AI models are selected for training. The training goal is for AI models to learn how to do workflow performance prediction.
Model testing and optimization: The trained AI models are tested in a simulated or natural environment, and the model is adjusted and optimized according to the test results.
Model deployment: The optimized AI model is deployed to CFN.
Real-time adjustment: The model needs to be dynamically adjusted and optimized according to the real-time network status and task demand data collected after deployment.
Model update: The model is regularly updated and optimized according to the network operation and model performance.
Continuous monitoring and adjustment: After the model is deployed, the network state and task execution need to be continuously monitored, the AI model needs to be adjusted as required, and the model needs to be periodically retrained to cope with changes in the network environment
# Usage:
python run_CFN_schedule --model_name=path_of_model
# Requirements:
- PyTorch & PyTorch Geometric
- Nvidia CUDA
- matplotlib
- numpy
- seaborn
- six
- tzdata
- zipp
- packaging
- pandas
- Pillow
- pyparsing
- python-dateutil
- pytz
- PyYAML
- contourpy
- cycler
- fonttools
- importlib-resources
- kiwisolver

Binary file not shown.

View File

@ -0,0 +1,12 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from abc import ABC, abstractmethod
class AlgorithmBase(ABC):
@abstractmethod
def execute(self, cluster, clock):
"""Execute the algorithm with given cluster and clock."""
pass

View File

@ -0,0 +1,52 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from core.job import Job
class Broker:
"""Handles job submissions to a simulation cluster."""
job_cls = Job
def __init__(self, env, job_configs):
"""
Initializes the broker.
Args:
- env: The simulation environment.
- job_configs (list): A list of configurations for the jobs.
"""
self.env = env
self.simulation = None
self.cluster = None
self.job_configs = job_configs
self.destroyed = False
def attach(self, simulation):
"""
Attaches the broker to a given simulation.
Args:
- simulation: The simulation to attach to.
"""
self.simulation = simulation
self.cluster = simulation.cluster
def run(self):
"""
Executes the job submissions based on their configurations.
"""
for job_config in self.job_configs:
# Ensure the job's submit time is in the future
assert job_config.submit_time >= self.env.now
# Wait until the job's submit time
yield self.env.timeout(job_config.submit_time - self.env.now)
# Create and add the job to the cluster
job = self.job_cls(self.env, job_config)
self.cluster.add_job(job)
self.destroyed = True

View File

@ -0,0 +1,104 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from core.machine import Machine
class Cluster:
"""Represents a cluster of machines, handling jobs and their tasks."""
def __init__(self):
self.machines = []
self.jobs = []
def _filtered_jobs(self, condition):
return [job for job in self.jobs if condition(job)]
def _tasks_from_jobs(self, task_property):
return [task for job in self.jobs for task in getattr(job, task_property)]
@property
def unfinished_jobs(self):
return self._filtered_jobs(lambda job: not job.finished)
@property
def finished_jobs(self):
return self._filtered_jobs(lambda job: job.finished)
@property
def unfinished_tasks(self):
return self._tasks_from_jobs('unfinished_tasks')
@property
def ready_unfinished_tasks(self):
return self._tasks_from_jobs('ready_unfinished_tasks')
@property
def tasks_which_has_waiting_instance(self):
return self._tasks_from_jobs('tasks_which_has_waiting_instance')
@property
def ready_tasks_which_has_waiting_instance(self):
return self._tasks_from_jobs('ready_tasks_which_has_waiting_instance')
@property
def finished_tasks(self):
return self._tasks_from_jobs('finished_tasks')
@property
def running_task_instances(self):
return [task_instance for machine in self.machines for task_instance in machine.running_task_instances]
def add_machines(self, machine_configs):
for machine_config in machine_configs:
machine = Machine(machine_config)
self.machines.append(machine)
machine.attach(self)
def add_job(self, job):
self.jobs.append(job)
@property
def _resource_sum(self, resource):
return sum(getattr(machine, resource) for machine in self.machines)
@property
def cpu(self):
return self._resource_sum('cpu')
@property
def memory(self):
return self._resource_sum('memory')
@property
def disk(self):
return self._resource_sum('disk')
@property
def cpu_capacity(self):
return self._resource_sum('cpu_capacity')
@property
def memory_capacity(self):
return self._resource_sum('memory_capacity')
@property
def disk_capacity(self):
return self._resource_sum('disk_capacity')
@property
def state(self):
return {
'arrived_jobs': len(self.jobs),
'unfinished_jobs': len(self.unfinished_jobs),
'finished_jobs': len(self.finished_jobs),
'unfinished_tasks': len(self.unfinished_tasks),
'finished_tasks': len(self.finished_tasks),
'running_task_instances': len(self.running_task_instances),
'machine_states': [machine.state for machine in self.machines],
'cpu': self.cpu / self.cpu_capacity,
'memory': self.memory / self.memory_capacity,
'disk': self.disk / self.disk_capacity,
}

View File

@ -0,0 +1,30 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
class TaskInstanceConfig:
def __init__(self, task_config):
self.cpu = task_config.cpu
self.memory = task_config.memory
self.disk = task_config.disk
self.duration = task_config.duration
class TaskConfig:
def __init__(self, task_index, instances_number, cpu, memory, disk, duration, parent_indices=None):
self.task_index = task_index
self.instances_number = instances_number
self.cpu = cpu
self.memory = memory
self.disk = disk
self.duration = duration
self.parent_indices = parent_indices
class JobConfig:
def __init__(self, idx, submit_time, task_configs):
self.submit_time = submit_time
self.task_configs = task_configs
self.id = idx

View File

@ -0,0 +1,256 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from core.config import *
class Task(object):
def __init__(self, env, job, task_config):
self.env = env
self.job = job
self.task_index = task_config.task_index
self.task_config = task_config
self._ready = False
self._parents = None
self.task_instances = []
task_instance_config = TaskInstanceConfig(task_config)
for task_instance_index in range(int(self.task_config.instances_number)):
self.task_instances.append(TaskInstance(self.env, self, task_instance_index, task_instance_config))
self.next_instance_pointer = 0
@property
def id(self):
return str(self.job.id) + '-' + str(self.task_index)
@property
def parents(self):
if self._parents is None:
if self.task_config.parent_indices is None:
raise ValueError("Task_config's parent_indices should not be None.")
self._parents = []
for parent_index in self.task_config.parent_indices:
self._parents.append(self.job.tasks_map[parent_index])
return self._parents
@property
def ready(self):
if not self._ready:
for p in self.parents:
if not p.finished:
return False
self._ready = True
return self._ready
@property
def running_task_instances(self):
ls = []
for task_instance in self.task_instances:
if task_instance.started and not task_instance.finished:
ls.append(task_instance)
return ls
@property
def finished_task_instances(self):
ls = []
for task_instance in self.task_instances:
if task_instance.finished:
ls.append(task_instance)
return ls
# the most heavy
def start_task_instance(self, machine):
self.task_instances[self.next_instance_pointer].schedule(machine)
self.next_instance_pointer += 1
@property
def started(self):
for task_instance in self.task_instances:
if task_instance.started:
return True
return False
@property
def waiting_task_instances_number(self):
return self.task_config.instances_number - self.next_instance_pointer
@property
def has_waiting_task_instances(self):
return self.task_config.instances_number > self.next_instance_pointer
@property
def finished(self):
"""
A task is finished only if it has no waiting task instances and no running task instances.
:return: bool
"""
if self.has_waiting_task_instances:
return False
if len(self.running_task_instances) != 0:
return False
return True
@property
def started_timestamp(self):
t = None
for task_instance in self.task_instances:
if task_instance.started_timestamp is not None:
if (t is None) or (t > task_instance.started_timestamp):
t = task_instance.started_timestamp
return t
@property
def finished_timestamp(self):
if not self.finished:
return None
t = None
for task_instance in self.task_instances:
if (t is None) or (t < task_instance.finished_timestamp):
t = task_instance.finished_timestamp
return t
class Job(object):
task_cls = Task
def __init__(self, env, job_config):
self.env = env
self.job_config = job_config
self.id = job_config.id
self.tasks_map = {}
for task_config in job_config.task_configs:
task_index = task_config.task_index
self.tasks_map[task_index] = Job.task_cls(env, self, task_config)
@property
def tasks(self):
return self.tasks_map.values()
@property
def unfinished_tasks(self):
ls = []
for task in self.tasks:
if not task.finished:
ls.append(task)
return ls
@property
def ready_unfinished_tasks(self):
ls = []
for task in self.tasks:
if not task.finished and task.ready:
ls.append(task)
return ls
@property
def tasks_which_have_waiting_instance(self):
ls = []
for task in self.tasks:
if task.has_waiting_task_instances:
ls.append(task)
return ls
@property
def ready_tasks_which_have_waiting_instance(self):
ls = []
for task in self.tasks:
if task.has_waiting_task_instances and task.ready:
ls.append(task)
return ls
@property
def running_tasks(self):
ls = []
for task in self.tasks:
if task.started and not task.finished:
ls.append(task)
return ls
@property
def finished_tasks(self):
ls = []
for task in self.tasks:
if task.finished:
ls.append(task)
return ls
@property
def started(self):
for task in self.tasks:
if task.started:
return True
return False
@property
def finished(self):
for task in self.tasks:
if not task.finished:
return False
return True
@property
def started_timestamp(self):
t = None
for task in self.tasks:
if task.started_timestamp is not None:
if (t is None) or (t > task.started_timestamp):
t = task.started_timestamp
return t
@property
def finished_timestamp(self):
if not self.finished:
return None
t = None
for task in self.tasks:
if (t is None) or (t < task.finished_timestamp):
t = task.finished_timestamp
return t
class TaskInstance(object):
def __init__(self, env, task, task_instance_index, task_instance_config):
self.env = env
self.task = task
self.task_instance_index = task_instance_index
self.config = task_instance_config
self.cpu = task_instance_config.cpu
self.memory = task_instance_config.memory
self.disk = task_instance_config.disk
self.duration = task_instance_config.duration
self.machine = None
self.process = None
self.new = True
self.started = False
self.finished = False
self.started_timestamp = None
self.finished_timestamp = None
@property
def id(self):
return str(self.task.id) + '-' + str(self.task_instance_index)
def do_work(self):
# self.cluster.waiting_tasks.remove(self)
# self.cluster.running_tasks.append(self)
# self.machine.run(self)
yield self.env.timeout(self.duration)
self.finished = True
self.finished_timestamp = self.env.now
self.machine.stop_task_instance(self)
def schedule(self, machine):
self.started = True
self.started_timestamp = self.env.now
self.machine = machine
self.machine.run_task_instance(self)
self.process = self.env.process(self.do_work())

View File

@ -0,0 +1,100 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from enum import Enum
class MachineConfig:
idx = 0
def __init__(self, cpu_capacity, memory_capacity, disk_capacity, cpu=None, memory=None, disk=None):
self.cpu_capacity = cpu_capacity
self.memory_capacity = memory_capacity
self.disk_capacity = disk_capacity
self.cpu = cpu or cpu_capacity
self.memory = memory or memory_capacity
self.disk = disk or disk_capacity
self.id = MachineConfig.idx
MachineConfig.idx += 1
class MachineDoor(Enum):
TASK_IN = 0
TASK_OUT = 1
NULL = 3
class Machine:
def __init__(self, machine_config: MachineConfig):
self.id = machine_config.id
self.cpu_capacity = machine_config.cpu_capacity
self.memory_capacity = machine_config.memory_capacity
self.disk_capacity = machine_config.disk_capacity
self.cpu = machine_config.cpu
self.memory = machine_config.memory
self.disk = machine_config.disk
self.cluster = None
self.task_instances = []
self.machine_door = MachineDoor.NULL
def run_task_instance(self, task_instance):
self.cpu -= task_instance.cpu
self.memory -= task_instance.memory
self.disk -= task_instance.disk
self.task_instances.append(task_instance)
self.machine_door = MachineDoor.TASK_IN
def stop_task_instance(self, task_instance):
self.cpu += task_instance.cpu
self.memory += task_instance.memory
self.disk += task_instance.disk
self.machine_door = MachineDoor.TASK_OUT
@property
def running_task_instances(self):
return [task_instance for task_instance in self.task_instances if task_instance.started and not task_instance.finished]
@property
def finished_task_instances(self):
return [task_instance for task_instance in self.task_instances if task_instance.finished]
def attach(self, cluster):
self.cluster = cluster
def accommodate(self, task):
return all([
self.cpu >= task.task_config.cpu,
self.memory >= task.task_config.memory,
self.disk >= task.task_config.disk
])
@property
def feature(self):
return [self.cpu, self.memory, self.disk]
@property
def capacity(self):
return [self.cpu_capacity, self.memory_capacity, self.disk_capacity]
@property
def state(self):
return {
'id': self.id,
'cpu_capacity': self.cpu_capacity,
'memory_capacity': self.memory_capacity,
'disk_capacity': self.disk_capacity,
'cpu': self.cpu / self.cpu_capacity,
'memory': self.memory / self.memory_capacity,
'disk': self.disk / self.disk_capacity,
'running_task_instances': len(self.running_task_instances),
'finished_task_instances': len(self.finished_task_instances)
}
def __eq__(self, other):
return isinstance(other, Machine) and other.id == self.id

View File

@ -0,0 +1,36 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
# monitor implements the class Monitor, which is used to monitor and
# record the status of the simulation during the simulation process.
import json
class Monitor(object):
def __init__(self, simulation):
self.simulation = simulation
self.env = simulation.env
self.event_file = simulation.event_file
self.events = []
def run(self):
while not self.simulation.finished:
state = {
'timestamp': self.env.now,
'cluster_state': self.simulation.cluster.state
}
self.events.append(state)
yield self.env.timeout(1)
state = {
'timestamp': self.env.now,
'cluster_state': self.simulation.cluster.state
}
self.events.append(state)
self.write_to_file()
def write_to_file(self):
with open(self.event_file, 'w') as f:
json.dump(self.events, f, indent=4)

View File

@ -0,0 +1,31 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
class Scheduler(object):
def __init__(self, env, algorithm):
self.env = env
self.algorithm = algorithm
self.simulation = None
self.cluster = None
self.destroyed = False
def attach(self, simulation):
self.simulation = simulation
self.cluster = simulation.cluster
def make_decision(self):
while True:
machine, task = self.algorithm(self.cluster, self.env.now)
if machine is None or task is None:
break
else:
task.start_task_instance(machine)
def run(self):
while not self.simulation.finished:
self.make_decision()
yield self.env.timeout(1)
self.destroyed = True

View File

@ -0,0 +1,33 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from core.monitor import Monitor
class Simulation(object):
def __init__(self, env, cluster, task_broker, scheduler, event_file):
self.env = env
self.cluster = cluster
self.task_broker = task_broker
self.scheduler = scheduler
self.event_file = event_file
if event_file is not None:
self.monitor = Monitor(self)
self.task_broker.attach(self)
self.scheduler.attach(self)
def run(self):
# Starting monitor process before task_broker process
# and scheduler process is necessary for log records integrity.
if self.event_file is not None:
self.env.process(self.monitor.run())
self.env.process(self.task_broker.run())
self.env.process(self.scheduler.run())
@property
def finished(self):
return self.task_broker.destroyed \
and len(self.cluster.unfinished_jobs) == 0

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,34 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
# The Episode class in episode is used for episodic simulation experiments
import simpy
from core.cluster import Cluster
from core.scheduler import Scheduler
from core.broker import Broker
from core.simulation import Simulation
class Episode:
def __init__(self, machine_configs, task_configs, algorithm, event_file):
# Create a simulation environment
self.env = simpy.Environment()
# Create a cluster and add machines based on machine_configs
cluster = Cluster()
cluster.add_machines(machine_configs)
# Create a task broker with the provided task_configs
task_broker = Broker(self.env, task_configs)
# Create a scheduler with the specified algorithm
scheduler = Scheduler(self.env, algorithm)
# Create a simulation instance with the environment, cluster, task broker, scheduler, and event_file
self.simulation = Simulation(self.env, cluster, task_broker, scheduler, event_file)
def run(self):
# Run the simulation and the environment
self.simulation.run()
self.env.run()

View File

@ -0,0 +1,104 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
# multiprocessing_run in tools is used for multi-process mode training;
# average_slowdown and average_completion are used to extract calculation
# statistics from an Episode class object
import time
import numpy as np
import tensorflow as tf
def average_metric(exp, metric_func):
"""
Calculate the average of a given metric for all tasks.
Args:
- exp: The simulation experiment.
- metric_func: Function to compute the desired metric.
Returns:
- float: Average of the metric across tasks.
"""
total = sum(metric_func(task) for job in exp.simulation.cluster.jobs for task in job.tasks)
number_of_tasks = sum(1 for job in exp.simulation.cluster.jobs for _ in job.tasks)
return total / number_of_tasks
def completion_metric(task):
"""
Compute the completion metric for a task.
Args:
- task: The task object.
Returns:
- float: Completion metric for the task.
"""
return task.finished_timestamp - task.started_timestamp
def slowdown_metric(task):
"""
Compute the slowdown metric for a task.
Args:
- task: The task object.
Returns:
- float: Slowdown metric for the task.
"""
return (task.finished_timestamp - task.started_timestamp) / task.task_config.duration
def average_completion(exp):
"""
Compute the average completion time for all tasks.
Args:
- exp: The simulation experiment.
Returns:
- float: Average completion time.
"""
return average_metric(exp, completion_metric)
def average_slowdown(exp):
"""
Compute the average slowdown for all tasks.
Args:
- exp: The simulation experiment.
Returns:
- float: Average slowdown.
"""
return average_metric(exp, slowdown_metric)
def multiprocessing_run(episode, trajectories, makespans, avg_completions, avg_slowdowns):
"""
Run an episode in a multiprocessing environment and gather results.
Args:
- episode: The simulation episode.
- trajectories: List to collect trajectories.
- makespans: List to collect makespans.
- avg_completions: List to collect average completions.
- avg_slowdowns: List to collect average slowdowns.
"""
# Set random seeds
np.random.seed(int(time.time()))
tf.random.set_random_seed(time.time())
# Execute the episode
episode.run()
# Gather and append results to respective lists
trajectories.append(episode.simulation.scheduler.algorithm.current_trajectory)
makespans.append(episode.simulation.env.now)
avg_completions.append(average_completion(episode))
avg_slowdowns.append(average_slowdown(episode))

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,9 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from core.broker import Broker
from .job import Job
Broker.job_cls = Job

View File

@ -0,0 +1,9 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from csimpy_exp.AUX.episode import Episode
from .broker import Broker
# Set the broker class for Episode
Episode.broker_cls = Broker

View File

@ -0,0 +1,18 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from core import job as job_module
from csimpy_exp.DAG_exp.utils.feature_synthesize import task_features
# Define a feature property for the Task class
def task_feature(self):
self._features = self.job.features[self.task_index]
return self._features
setattr(job_module.Task, 'feature', property(task_feature))
class Job(job_module.Job):
def __init__(self, env, job_config):
super().__init__(env, job_config)
self.features = task_features(self)

View File

@ -0,0 +1,55 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import tensorflow as tf
class Node:
def __init__(self, observation, action, reward, clock):
self.observation = observation
self.action = action
self.reward = reward
self.clock = clock
class RLAlgorithm:
def __init__(self, agent, reward_giver, features_normalize_func, features_extract_func):
self.agent = agent
self.reward_giver = reward_giver
self.features_normalize_func = features_normalize_func
self.features_extract_func = features_extract_func
self.current_trajectory = []
def extract_features(self, valid_machine_task_pairs):
"""Extract features from valid machine-task pairs."""
return [
[machine.cpu, machine.memory] + self.features_extract_func(task)
for machine, task in valid_machine_task_pairs
]
def filter_valid_pairs(self, machines, tasks):
"""Return all valid machine-task pairs."""
return [
(machine, task)
for machine in machines
for task in tasks
if machine.accommodate(task)
]
def __call__(self, cluster, clock):
machines = cluster.machines
tasks = cluster.ready_tasks_which_has_waiting_instance
all_candidates = self.filter_valid_pairs(machines, tasks)
if not all_candidates:
self.current_trajectory.append(Node(None, None, self.reward_giver.get_reward(), clock))
return None, None
normalized_features = self.features_normalize_func(self.extract_features(all_candidates))
features_tensor = tf.convert_to_tensor(normalized_features, dtype=tf.float32)
logits = self.agent.brain(features_tensor)
pair_index = tf.squeeze(tf.random.categorical(logits, num_samples=1), axis=1).numpy()[0]
self.current_trajectory.append(Node(normalized_features, pair_index, 0, clock))
return all_candidates[pair_index]

View File

@ -0,0 +1,57 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
# Author: Wisen Pan
# Date: 10/24/2023
import time
import copy
import numpy as np
import tensorflow as tf
class Agent:
def __init__(self, name, brain, gamma, reward_to_go, nn_baseline, normalize_advantages, model_save_path=None,
summary_path=None):
"""Initialize the agent."""
self.gamma = gamma
self.reward_to_go = reward_to_go
self.baseline = nn_baseline
self.normalize_advantages = normalize_advantages
self.optimizer = tf.train.AdamOptimizer(learning_rate=0.001)
self.global_step = tf.train.get_or_create_global_step()
self.summary_path = summary_path or './tensorboard/%s--%s' % (name, time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime()))
self.summary_writer = tf.contrib.summary.create_file_writer(self.summary_path)
self.brain = brain
self.checkpoint = tf.train.Checkpoint(brain=self.brain)
self.model_save_path = model_save_path
def restore(self, model_path):
"""Restore the model from the given path."""
self.checkpoint.restore(model_path)
def save(self):
"""Save the model to the specified path."""
self.checkpoint.save(self.model_save_path)
def _sum_of_rewards(self, rewards_n):
"""Compute Monte Carlo estimation of the Q function."""
q_s = [self._calculate_q(re) for re in rewards_n]
if not self.reward_to_go:
q_s = [[q[0]] * len(q) for q in q_s]
return q_s
def _calculate_q(self, re):
"""Calculate Q value for a given reward trajectory."""
return list(reversed([sum([self.gamma**i * reward for i, reward in enumerate(re[j:])]) for j in range(len(re))]))
def _compute_advantage(self, q_n):
"""Computes advantages by possibly subtracting a baseline from the estimated Q values."""
if not self.baseline:
return copy.deepcopy(q_n)
padded_q_n = [q + [0] * (max(len(q) for q in q_n) - len(q)) for q in q_n]
adv_n = np.array(padded_q_n) - np.mean(padded_q_n, axis=0

View File

@ -0,0 +1,66 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import tensorflow as tf
class BaseBrain(tf.keras.Model):
"""Base Brain class containing shared functionalities."""
def forward_pass(self, state, layers):
"""Pass the state through the given layers."""
for layer in layers:
state = layer(state)
return tf.expand_dims(tf.squeeze(state, axis=-1), axis=0)
def call(self, state):
"""Placeholder call method, to be implemented by subclasses."""
raise NotImplementedError
class BrainBig(BaseBrain):
name = 'BrainBig'
def __init__(self, state_size):
super().__init__()
self.layers_sequence = [
tf.keras.layers.Dense(3, input_shape=(None, state_size), activation=tf.tanh),
tf.keras.layers.Dense(9, activation=tf.tanh),
tf.keras.layers.Dense(9, activation=tf.tanh),
tf.keras.layers.Dense(18, activation=tf.tanh),
tf.keras.layers.Dense(9, activation=tf.tanh),
tf.keras.layers.Dense(1)
]
def call(self, state):
return self.forward_pass(state, self.layers_sequence)
class Brain(BaseBrain):
name = 'Brain'
def __init__(self, state_size):
super().__init__()
self.layers_sequence = [
tf.keras.layers.Dense(3, input_shape=(None, state_size), activation=tf.tanh),
tf.keras.layers.Dense(9, activation=tf.tanh),
tf.keras.layers.Dense(18, activation=tf.tanh),
tf.keras.layers.Dense(9, activation=tf.tanh),
tf.keras.layers.Dense(1)
]
def call(self, state):
return self.forward_pass(state, self.layers_sequence)
class BrainSmall(BaseBrain):
name = 'BrainSmall'
def __init__(self, state_size):
super().__init__()
self.layers_sequence = [
tf.keras.layers.Dense(3, input_shape=(None, state_size), activation=tf.tanh),
tf.keras.layers.Dense(9, activation=tf.tanh),
tf.keras.layers.Dense(6, activation=tf.tanh),
tf.keras.layers.Dense(1)
]
def call(self, state):
return self.forward_pass(state, self.layers_sequence)

View File

@ -0,0 +1,75 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
# The strategy pattern is used to provide different reward calculation methods for
# deep reinforcement learning-based job scheduling models with different optimization goals:
# MakespanRewardGiver gives rewards for optimizing makespan (Makespan)
# AverageSlowDownRewardGiver gives the reward used to optimize the average SlowDown
# AverageCompletionRewardGiver gives the reward used to optimize the average completion time
from abc import ABC, abstractmethod
class RewardGiver(ABC):
"""Abstract base class for a reward giver."""
def __init__(self):
self.simulation = None
def attach(self, simulation):
"""Attach the reward giver to a simulation."""
self.simulation = simulation
@abstractmethod
def get_reward(self):
"""Retrieve the reward. Ensure simulation is attached before fetching."""
if not self.simulation:
raise ValueError('Reward giver must be attached to a simulation before fetching reward.')
class MakespanRewardGiver(RewardGiver):
"""Reward giver that returns a constant reward per timestamp."""
name = 'Makespan'
def __init__(self, reward_per_timestamp):
super().__init__()
self.reward_per_timestamp = reward_per_timestamp
def get_reward(self):
"""Retrieve the reward per timestamp."""
super().get_reward()
return self.reward_per_timestamp
class AverageSlowDownRewardGiver(RewardGiver):
"""Reward giver based on average slowdown."""
name = 'AS'
def get_reward(self):
"""Compute reward based on slowdown of unfinished tasks."""
super().get_reward()
cluster = self.simulation.cluster
unfinished_tasks = cluster.unfinished_tasks
reward = sum((-1 / task.task_config.duration) for task in unfinished_tasks)
return reward
class AverageCompletionRewardGiver(RewardGiver):
"""Reward giver based on average completion."""
name = 'AC'
def get_reward(self):
"""Compute reward based on number of unfinished tasks."""
super().get_reward()
cluster = self.simulation.cluster
unfinished_task_count = len(cluster.unfinished_tasks)
return -unfinished_task_count

View File

@ -0,0 +1,20 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from core.algorithm import Algorithm
class DRF(Algorithm):
def __call__(self, cluster, clock):
machines = cluster.machines
unfinished_tasks = cluster.ready_unfinished_tasks
candidate_task = None
candidate_machine = None
for machine in machines:
for task in unfinished_tasks:
if machine.accommodate(task):
candidate_machine = machine
candidate_task = task
break
return candidate_machine, candidate_task

View File

@ -0,0 +1,20 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from core.algorithm import Algorithm
class FirstFitAlgorithm(Algorithm):
def __call__(self, cluster, clock):
machines = cluster.machines
tasks = cluster.ready_tasks_which_has_waiting_instance
candidate_task = None
candidate_machine = None
for machine in machines:
for task in tasks:
if machine.accommodate(task):
candidate_machine = machine
candidate_task = task
break
return candidate_machine, candidate_task

View File

@ -0,0 +1,22 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from csimpy_exp.DAG_exp.utils.feature_synthesize import weights_calculate
from core.algorithm import Algorithm # Updated import
class MaxWeightAlgorithm(Algorithm):
def __call__(self, cluster, clock):
machines = cluster.machines
tasks = weights_calculate(cluster.ready_tasks_which_has_waiting_instance)
candidate_task = None
candidate_machine = None
for machine in machines:
for task in tasks:
if machine.accommodate(task):
candidate_machine = machine
candidate_task = task
break
return candidate_machine, candidate_task

View File

@ -0,0 +1,128 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import time
import torch
import numpy as np
import torch.nn.functional as F
from datetime import timedelta
from sklearn import metrics
from tqdm import tqdm
from scheduler import WarmUpLR, downLR
def elapsed_time_since(start_time):
"""Compute time difference since the given start_time."""
elapsed = time.time() - start_time
return timedelta(seconds=int(round(elapsed)))
def evaluate(model, data_iter, device, test=False):
"""Evaluate the model on a given dataset."""
model.eval()
total_loss = 0
predictions, actuals = [], []
with torch.no_grad():
for data, labels in data_iter:
data, labels = data.float().to(device), labels.long().to(device)
outputs = model(data)
loss = F.cross_entropy(outputs, labels)
total_loss += loss.item()
predictions.extend(torch.max(outputs.data, 1)[1].tolist())
actuals.extend(labels.data.tolist())
accuracy = calculate_accuracy(actuals, predictions)
if test:
confusion = metrics.confusion_matrix(actuals, predictions)
return accuracy, total_loss / len(data_iter), confusion
return accuracy, total_loss / len(data_iter)
def calculate_accuracy(y_true, y_pred):
"""Calculate accuracy score."""
return metrics.accuracy_score(y_true, y_pred)
def train_and_evaluate(config, model, train_iter, dev_iter, test_iter):
"""Train the model and evaluate on the development and test sets."""
start_time = time.time()
model.train()
optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate)
warmup_epochs = config.num_epochs // 2
scheduler = downLR(optimizer, (config.num_epochs - warmup_epochs) * len(train_iter))
warmup_scheduler = WarmUpLR(optimizer, warmup_epochs * len(train_iter))
dev_best_loss = float('inf')
dev_best_acc = test_best_acc = 0
total_batch = 0
for epoch in range(config.num_epochs):
train_loss = 0
print(f'Epoch [{epoch + 1}/{config.num_epochs}]')
predictions, actuals = [], []
for data, labels in tqdm(train_iter):
data, labels = data.to(config.device), labels.long().to(config.device)
outputs = model(data)
loss = F.cross_entropy(outputs, labels)
train_loss += loss.item()
optimizer.zero_grad()
loss.backward()
optimizer.step()
warmup_scheduler.step() if epoch < warmup_epochs else scheduler.step()
total_batch += 1
predictions.extend(torch.max(outputs.data, 1)[1].tolist())
actuals.extend(labels.data.tolist())
train_acc = calculate_accuracy(actuals, predictions)
dev_acc, dev_loss = evaluate(model, dev_iter, config.device)
test_acc, test_loss = evaluate(model, test_iter, config.device)
elapsed_time = elapsed_time_since(start_time)
dev_best_loss, dev_best_acc, test_best_acc = update_best_metrics(dev_loss, dev_best_loss, dev_acc, dev_best_acc, test_acc, test_best_acc)
print_metrics(total_batch, train_loss, len(train_iter), train_acc, dev_loss, dev_acc, test_loss, test_acc, elapsed_time, dev_best_loss, dev_best_acc, test_best_acc)
evaluate_test_set(config, model, test_iter)
def update_best_metrics(dev_loss, dev_best_loss, dev_acc, dev_best_acc, test_acc, test_best_acc):
"""Update best metrics if needed."""
if dev_loss < dev_best_loss:
dev_best_loss = dev_loss
if dev_acc > dev_best_acc:
dev_best_acc = dev_acc
test_best_acc = test_acc
return dev_best_loss, dev_best_acc, test_best_acc
def print_metrics(total_batch, train_loss, num_train_iter, train_acc, dev_loss, dev_acc, test_loss, test_acc, elapsed_time, dev_best_loss, dev_best_acc, test_best_acc):
"""Display the metrics."""
print((
f'Iter: {total_batch:6}, Train Loss: {train_loss/num_train_iter:.2f}, '
f'Train Acc: {train_acc:.2%}, Dev Loss: {dev_loss:.2f}, Dev Acc: {dev_acc:.2%}, '
f'Test Loss: {test_loss:.2f}, Test Acc: {test_acc:.2%}, Time: {elapsed_time} {"*" if dev_loss == dev_best_loss else ""}'
))
print(f'Best Dev Acc: {dev_best_acc:.2%}, Best Test Acc: {test_best_acc:.2%}')
def evaluate_test_set(config, model, test_iter):
"""Evaluate the model on the test set."""
start_time = time.time()
test_acc, test_loss, test_confusion = evaluate(model, test_iter, config.device, test=True)
print(f'Test Loss: {test_loss:.2f}, Test Acc: {test_acc:.2%}')
print(test_confusion)
print(f"Time elapsed: {elapsed_time_since(start_time)}")

View File

@ -0,0 +1,40 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import numpy as np
from core.algorithm import Algorithm
class Tetris(Algorithm):
@staticmethod
def calculate_alignment(valid_pairs):
"""Calculate the alignment score between machine and task features."""
machine_features = []
task_features = []
# Extract the features for machines and tasks
for machine, task in valid_pairs:
machine_features.append(machine.feature[:2])
task_features.append([task.task_config.cpu, task.task_config.memory])
# Compute the alignment score and return the index of the maximum
alignment_scores = np.sum(np.array(machine_features) * np.array(task_features), axis=1)
return np.argmax(alignment_scores)
def __call__(self, cluster, clock):
"""Determine the best machine-task pair based on the Tetris algorithm."""
machines = cluster.machines
tasks = cluster.ready_tasks_which_has_waiting_instance
valid_pairs = [(machine, task) for machine in machines for task in tasks if machine.accommodate(task)]
# If no valid pairs, return None for both machine and task
if not valid_pairs:
return None, None
# Get the best pair based on the alignment
best_pair_idx = Tetris.calculate_alignment(valid_pairs)
best_machine, best_task = valid_pairs[best_pair_idx]
return best_machine, best_task

View File

@ -0,0 +1,148 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import os
import time
import numpy as np
import tensorflow as tf
import sys
# Set TensorFlow log level to suppress warnings
os.environ["TF_CPP_MIN_LOG_LEVEL"] = '3'
# Add the parent directory to sys.path
sys.path.append('..')
from core.machine import MachineConfig
from csimpy_exp.DAG_exp.algorithm.heuristics.random_algorithm import RandomAlgorithm
from csimpy_exp.DAG_exp.algorithm.heuristics.tetris import Tetris
from csimpy_exp.DAG_exp.algorithm.heuristics.first_fit import FirstFitAlgorithm
from csimpy_exp.DAG_exp.algorithm.heuristics.max_weight import MaxWeightAlgorithm
from csimpy_exp.DAG_exp.algorithm.MCJS.CMDRL import RLAlgorithm
from csimpy_exp.DAG_exp.algorithm.MCJS.agent import Agent
from csimpy_exp.DAG_exp.algorithm.MCJS.brain import BrainSmall
from csimpy_exp.DAG_exp.algorithm.MCJS.reward_giver import MakespanRewardGiver
from csimpy_exp.DAG_exp.utils.csv_reader import CSVReader
from csimpy_exp.DAG_exp.utils.feature_functions import features_extract_func_ac, features_normalize_func_ac
from csimpy_exp.AUX.tools import average_completion, average_slowdown
from csimpy_exp.DAG_exp.adapter.episode import Episode
# Set environment variable to disable GPU visibility
os.environ['CUDA_VISIBLE_DEVICES'] = ''
# Seed for reproducibility
np.random.seed(41)
tf.random.set_random_seed(41)
# ************************ Parameters Setting Start ************************
machines_number = 1
jobs_len = 1
n_iter = 30
jobs_csv = './datasets/alibaba_clusterdata_v2017/job_DAG.csv'
brain = BrainSmall(14)
reward_giver = MakespanRewardGiver(-1)
features_extract_func = features_extract_func_ac
features_normalize_func = features_normalize_func_ac
name = '%s-%s-m%d' % (reward_giver.name, brain.name, machines_number)
model_dir = './agents/%s' % name
# ************************ Parameters Setting End ************************
# Create model directory if it doesn't exist
if not os.path.isdir(model_dir):
os.makedirs(model_dir)
# Initialize the RL agent
agent = Agent(name, brain, 1, reward_to_go=True, nn_baseline=True, normalize_advantages=True,
model_save_path='%s/model.ckpt' % model_dir)
# Define machine configurations and load job configurations
machine_configs = [MachineConfig(2, 1, 1) for i in range(machines_number)]
csv_reader = CSVReader(jobs_csv)
jobs_configs = csv_reader.generate(0, jobs_len)
# Run the RandomAlgorithm and print results
tic = time.time()
algorithm = RandomAlgorithm()
episode = Episode(machine_configs, jobs_configs, algorithm, None)
episode.run()
print('RandomAlgorithm')
print(episode.env.now, time.time() - tic, average_completion(episode), average_slowdown(episode))
# Run the Tetris algorithm and print results
tic = time.time()
algorithm = Tetris()
episode = Episode(machine_configs, jobs_configs, algorithm, None)
episode.run()
print('Tetris')
print(episode.env.now, time.time() - tic, average_completion(episode), average_slowdown(episode))
# Run the FirstFitAlgorithm and print results
tic = time.time()
algorithm = FirstFitAlgorithm()
episode = Episode(machine_configs, jobs_configs, algorithm, None)
episode.run()
print('FirstFitAlgorithm')
print(episode.env.now, time.time() - tic, average_completion(episode), average_slowdown(episode))
# Run the MaxWeightAlgorithm and print results
tic = time.time()
algorithm = MaxWeightAlgorithm()
episode = Episode(machine_configs, jobs_configs, algorithm, None)
episode.run()
print('MaxWeightAlgorithm')
print(episode.env.now, time.time() - tic, average_completion(episode), average_slowdown(episode))
# Perform iterations for RL training
for itr in range(n_iter):
print("********** Iteration %i ************" % itr)
all_observations = []
all_actions = []
all_rewards = []
makespans = []
average_completions = []
average_slowdowns = []
trajectories = []
tic = time.time()
for i in range(12):
algorithm = RLAlgorithm(agent, reward_giver, features_extract_func=features_extract_func,
features_normalize_func=features_normalize_func)
episode = Episode(machine_configs, jobs_configs, algorithm, None)
algorithm.reward_giver.attach(episode.simulation)
episode.run()
trajectories.append(episode.simulation.scheduler.algorithm.current_trajectory)
makespans.append(episode.simulation.env.now)
average_completions.append(average_completion(episode))
average_slowdowns.append(average_slowdown(episode))
agent.log('makespan', np.mean(makespans), agent.global_step)
agent.log('average_completions', np.mean(average_completions), agent.global_step)
agent.log('average_slowdowns', np.mean(average_slowdowns), agent.global_step)
toc = time.time()
print(np.mean(makespans), (toc - tic) / 12, np.mean(average_completions), np.mean(average_slowdowns))
for trajectory in trajectories:
observations = []
actions = []
rewards = []
for node in trajectory:
observations.append(node.observation)
actions.append(node.action)
rewards.append(node.reward)
all_observations.append(observations)
all_actions.append(actions)
all_rewards.append(rewards)
all_q_s, all_advantages = agent.estimate_return(all_rewards)
agent.update_parameters(all_observations, all_actions, all_advantages)
# Save the trained agent
agent.save()

Binary file not shown.

View File

@ -0,0 +1,64 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import pandas as pd
import numpy as np
from operator import attrgetter
from core.job import JobConfig, TaskConfig
from csimpy_exp.DAG_exp.utils.feature_synthesize import father_task_indices
class CSVReader:
def __init__(self, filename):
self.filename = filename
self._load_data()
def _load_data(self):
df = pd.read_csv(self.filename).astype({"job_id": int, "instances_num": int})
job_task_map = {}
job_submit_time_map = {}
for _, series in df.iterrows():
job_id = series['job_id']
task_id, parent_indices = father_task_indices(series['task_id'], series['task_type'])
task_config = TaskConfig(task_id, series['instances_num'], series['cpu'], series['memory'],
series['disk'], series['duration'], parent_indices)
job_task_map.setdefault(job_id, []).append(task_config)
job_submit_time_map[job_id] = series['submit_time']
self.job_configs = sorted(
(JobConfig(job_id, job_submit_time_map[job_id], tasks) for job_id, tasks in job_task_map.items()),
key=attrgetter('submit_time')
)
def generate(self, offset, number):
selected_jobs = self.job_configs[offset: offset + number]
if selected_jobs:
submit_time_base = selected_jobs[0].submit_time
for job_config in selected_jobs:
job_config.submit_time -= submit_time_base
self._print_statistics(selected_jobs)
return selected_jobs
def _print_statistics(self, job_configs):
tasks = [task for job in job_configs for task in job.task_configs]
metrics = {
"instances_number": "Task instances number",
"cpu": "Task instances cpu",
"memory": "Task instances memory",
"duration": "Task instances duration"
}
print(f"Jobs number: {len(job_configs)}")
print(f"Tasks number: {len(tasks)}")
for key, label in metrics.items():
values = [getattr(task, key) * task.instances_number for task in tasks]
print(f"{label} mean: {np.mean(values)}")
print(f"{label} std: {np.std(values)}")

View File

@ -0,0 +1,47 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import numpy as np
# Base features and normalization values
BASE_FEATURES = [0, 0, 0, 0, 0, 0, 1.167, 1.167, 1.5, 1.833, 1.833]
BASE_NORMALIZE = [2, 1, 1, 1, 100, 1, 0.897, 0.897, 0.957, 1.572, 1.572]
# Additional features and normalization values for AC (Auto-Scaling)
AC_EXTENSION = [0, 0, 0]
AC_NORMALIZE_EXTENSION = [1, 1, 1]
def features_extract_func(task):
"""
Extracts basic features from a task.
"""
return [
task.task_config.cpu, task.task_config.memory, task.task_config.duration,
task.waiting_task_instances_number,
task.feature['first_layer_task'], task.feature['first_layer_instance'],
task.feature['layers_task'], task.feature['child_task_numbers'],
task.feature['child_instance_numbers']
]
def features_extract_func_ac(task):
"""
Extracts features for Auto-Scaling (AC) from a task.
"""
base_features = features_extract_func(task)
ac_features = [task.task_config.instances_number, len(task.running_task_instances), len(task.finished_task_instances)]
return base_features + ac_features
def features_normalize_func(x):
"""
Normalizes features using base values.
"""
return (np.array(x) - np.array(BASE_FEATURES)) / np.array(BASE_NORMALIZE)
def features_normalize_func_ac(x):
"""
Normalizes features for Auto-Scaling (AC) using extended base values.
"""
extended_features = BASE_FEATURES + AC_EXTENSION
extended_normalize = BASE_NORMALIZE + AC_NORMALIZE_EXTENSION
return (np.array(x) - np.array(extended_features)) / np.array(extended_normalize)

View File

@ -0,0 +1,113 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
# Extract parent node information
def extract_parent_task_indices(task_id, task_type):
parent_indices = []
if task_id.find('task_') != -1:
task_index = task_type + '_' + 'task_id'
return task_index, parent_indices
num_list = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
start_index = -1
for i, char_s in enumerate(task_id):
if (char_s in num_list) and (start_index == -1):
start_index = i
if (char_s not in num_list) and (start_index != -1):
parent_index = task_type + '_' + task_id[start_index: i]
parent_indices.append(parent_index)
start_index = -1
if start_index != -1:
parent_index = task_type + '_' + task_id[start_index:]
parent_indices.append(parent_index)
task_index = parent_indices[0]
parent_indices = parent_indices[1:]
return task_index, parent_indices
# Extract child node features
def extract_task_features(job):
child_indices = {}
parent_indices = {}
tasks = job.tasks_map.values()
for task in tasks:
task_index = task.task_config.task_index
task_parent_indices = task.task_config.parent_indices
parent_indices[task_index] = task_parent_indices
child_indices[task_index] = []
for parent_index in task_parent_indices:
child_index = child_indices.setdefault(parent_index, [])
child_index.append(task_index)
descendant_indices = {}
descendant_index = []
for task_index, child_index in child_indices.items():
descendant_index = child_index[:]
for i in child_index:
descendant_index += child_indices[i]
descendant_index = list(set(descendant_index))
descendant_indices.update({task_index: descendant_index})
task_features = {}
queue = []
for task_index in child_indices.keys():
child_index = child_indices[task_index]
task_feature = task_features.setdefault(task_index, {})
task_feature['first_layer_task'] = len(child_index)
task_feature['first_layer_instance'] = 0
for child in child_index:
task_feature['first_layer_instance'] += job.tasks_map[child].task_config.instances_number
task_feature['layers_task'] = 0
task_feature['child_task_numbers'] = len(descendant_indices[task_index])
task_feature['child_instance_numbers'] = 0
for descendant_index in descendant_indices[task_index]:
task_feature['child_instance_numbers'] += job.tasks_map[descendant_index].task_config.instances_number
# print(parent_indices)
# print(child_indices)
# print(descendant_indices)
for task_index, child_index in child_indices.items():
if not child_index:
queue.append(task_index)
while queue:
child_node = queue.pop()
# print('************')
# print(child_node)
parent_nodes = parent_indices[child_node]
queue += parent_nodes
for parent_node in parent_nodes:
parent_feature = task_features[parent_node]
child_feature = task_features[child_node]
parent_feature['layers_task'] = child_feature['layers_task'] + 1 if parent_feature[
'layers_task'] == 0 else max(
parent_feature['layers_task'], child_feature['layers_task'] + 1)
return task_features
# Calculate weights
def calculate_task_weights(tasks):
weight_tasks = {}
for task in tasks:
feature = task.feature
weight = feature['first_layer_task'] + feature['first_layer_instance'] + feature['layers_task'] + feature[
'child_task_numbers'] + feature['child_instance_numbers']
task_list = weight_tasks.setdefault(weight, [])
task_list.append(task)
sorted_weights = sorted(weight_tasks.keys(), reverse=True)
sorted_tasks = []
for weight in sorted_weights:
sorted_tasks.extend(weight_tasks[weight])
return sorted_tasks

Binary file not shown.

View File

@ -0,0 +1,55 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
# CMCMDRL Data center job scheduling algorithm based on deep reinforcement learning
import tensorflow as tf
import numpy as np
# Enabling eager execution for TensorFlow
tf.enable_eager_execution()
class Node:
def __init__(self, observation, action, reward, clock):
self.observation = observation
self.action = action
self.reward = reward
self.clock = clock
class RLAlgorithm:
def __init__(self, agent, reward_giver, features_normalize_func, features_extract_func):
self.agent = agent
self.reward_giver = reward_giver
self.features_normalize_func = features_normalize_func
self.features_extract_func = features_extract_func
self.current_trajectory = []
def extract_features(self, valid_pairs):
return self.features_normalize_func(
[[machine.cpu, machine.memory] + self.features_extract_func(task) for machine, task in valid_pairs]
)
def _get_valid_pairs(self, machines, tasks):
return [(machine, task) for machine in machines for task in tasks if machine.accommodate(task)]
def __call__(self, cluster, clock):
machines = cluster.machines
tasks = cluster.tasks_which_has_waiting_instance
valid_pairs = self._get_valid_pairs(machines, tasks)
if not valid_pairs:
self.current_trajectory.append(Node(None, None, self.reward_giver.get_reward(), clock))
return None, None
features = self.extract_features(valid_pairs)
features_tensor = tf.convert_to_tensor(features, dtype=np.float32)
logits = self.agent.brain(features_tensor)
selected_pair_index = tf.squeeze(tf.multinomial(logits, num_samples=1), axis=1).numpy()[0]
self.current_trajectory.append(Node(features, selected_pair_index, 0, clock))
return valid_pairs[selected_pair_index]

View File

@ -0,0 +1,117 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
# agent agent, which implements the policy gradient in reinforcement learning
import time
import copy
import numpy as np
import tensorflow as tf
class Agent:
def __init__(self, name, brain, gamma, reward_to_go, nn_baseline, normalize_advantages, model_save_path=None,
summary_path=None):
"""Initialize the agent."""
self.gamma = gamma
self.reward_to_go = reward_to_go
self.baseline = nn_baseline
self.normalize_advantages = normalize_advantages
self.optimizer = tf.train.AdamOptimizer(learning_rate=0.001)
self.global_step = tf.train.get_or_create_global_step()
self.summary_path = summary_path or './tensorboard/%s--%s' % (name, time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime()))
self.summary_writer = tf.contrib.summary.create_file_writer(self.summary_path)
self.brain = brain
self.checkpoint = tf.train.Checkpoint(brain=self.brain)
self.model_save_path = model_save_path
def restore(self, model_path):
"""Restore the model from the given path."""
self.checkpoint.restore(model_path)
def save(self):
"""Save the model to the specified path."""
self.checkpoint.save(self.model_save_path)
def _sum_of_rewards(self, rewards_n):
"""Compute Monte Carlo estimation of the Q function."""
q_s = [self._calculate_q(re) for re in rewards_n]
if not self.reward_to_go:
q_s = [[q[0]] * len(q) for q in q_s]
return q_s
def _calculate_q(self, re):
"""Calculate Q value for a given reward trajectory."""
return list(reversed([sum([self.gamma**i * reward for i, reward in enumerate(re[j:])]) for j in range(len(re))]))
def _compute_advantage(self, q_n):
"""Computes advantages by possibly subtracting a baseline from the estimated Q values."""
if not self.baseline:
return copy.deepcopy(q_n)
padded_q_n = [q + [0] * (max(len(q) for q in q_n) - len(q)) for q in q_n]
adv_n = np.array(padded_q_n) - np.mean(padded_q_n, axis=0)
return [list(adv[:len(q_n[i])]) for i, adv in enumerate(adv_n)]
def estimate_return(self, rewards_n):
"""Estimate the returns over a set of trajectories."""
q_n = self._sum_of_rewards(rewards_n)
adv_n = self._compute_advantage(q_n)
if self.normalize_advantages:
flattened_adv = [advantage for advantages in adv_n for advantage in advantages]
mean, std = np.mean(flattened_adv), np.std(flattened_adv)
adv_n = [[(adv - mean) / (std + np.finfo(np.float32).eps) for adv in advantages] for advantages in adv_n]
return q_n, adv_n
def _loss(self, X, y, adv):
"""Calculate loss for the agent."""
logits = self.brain(X)
logprob = - tf.losses.sparse_softmax_cross_entropy(labels=y, logits=logits)
return logprob * adv
def update_parameters(self, all_observations, all_actions, all_advantages):
"""Update the parameters of the policy."""
loss_values, advantages__ = [], []
for observations, actions, advantages in zip(all_observations, all_actions, all_advantages):
self._update_by_trajectory(observations, actions, advantages, loss_values, advantages__)
self.log('loss', np.mean(loss_values), self.global_step)
self.log('adv', np.mean(advantages__), self.global_step)
def _update_by_trajectory(self, observations, actions, advantages, loss_values, advantages__):
"""Update parameters by trajectory."""
grads_by_trajectory = []
for i, (observation, action, advantage) in enumerate(zip(observations, actions, advantages)):
if observation is None or action is None:
continue
with tf.GradientTape() as t:
loss_value = - self._loss(observation, [action], advantage)
grads = t.gradient(loss_value, self.brain.variables)
grads_by_trajectory.append(grads)
loss_values.append(loss_value)
advantages__.append(advantage)
if (i + 1) % 1000 == 0:
self.optimize(grads_by_trajectory)
grads_by_trajectory = []
if grads_by_trajectory:
self.optimize(grads_by_trajectory)
def optimize(self, grads_by_trajectory):
"""Optimize the agent's parameters."""
average_grads = [np.mean([grad.numpy() for grad in grads_by_layer], axis=0) for grads_by_layer in zip(*grads_by_trajectory)]
self.optimizer.apply_gradients(zip(average_grads, self.brain.variables), self.global_step)
def log(self, name, loss_value, step):
"""Log a scalar value."""
with self.summary_writer.as_default(), tf.contrib.summary.always_record_summaries():
tf.contrib.summary.scalar(name, loss_value, step=step)

View File

@ -0,0 +1,74 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
# Neural network structure implemented by brain TensorFlow
import tensorflow as tf
class BaseBrain(tf.keras.Model):
def __init__(self, state_size, layer_configs):
super().__init__()
self.layers_list = self._build_layers(state_size, layer_configs)
def _build_layers(self, state_size, layer_configs):
layers = []
input_shape = (None, state_size)
for config in layer_configs:
layers.append(tf.keras.layers.Dense(config['units'],
input_shape=input_shape,
activation=config.get('activation')))
# Reset input_shape after the first layer
input_shape = None
return layers
def call(self, state):
for layer in self.layers_list:
state = layer(state)
return tf.expand_dims(tf.squeeze(state, axis=-1), axis=0)
class BrainBig(BaseBrain):
name = 'BrainBig'
def __init__(self, state_size):
layer_configs = [
{'units': 3, 'activation': tf.tanh},
{'units': 9, 'activation': tf.tanh},
{'units': 9, 'activation': tf.tanh},
{'units': 18, 'activation': tf.tanh},
{'units': 9, 'activation': tf.tanh},
{'units': 1}
]
super().__init__(state_size, layer_configs)
class Brain(BaseBrain):
name = 'Brain'
def __init__(self, state_size):
layer_configs = [
{'units': 3, 'activation': tf.tanh},
{'units': 9, 'activation': tf.tanh},
{'units': 18, 'activation': tf.tanh},
{'units': 9, 'activation': tf.tanh},
{'units': 1}
]
super().__init__(state_size, layer_configs)
class BrainSmall(BaseBrain):
name = 'BrainSmall'
def __init__(self, state_size):
layer_configs = [
{'units': 3, 'activation': tf.tanh},
{'units': 9, 'activation': tf.tanh},
{'units': 6, 'activation': tf.tanh},
{'units': 1}
]
super().__init__(state_size, layer_configs)

View File

@ -0,0 +1,69 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
# reward_giver reinforcement learning reward function
from abc import ABC, abstractmethod
class RewardGiver(ABC):
"""Abstract base class for a reward giver."""
def __init__(self):
self.simulation = None
def attach(self, simulation):
"""Attach the reward giver to a simulation."""
self.simulation = simulation
@abstractmethod
def get_reward(self):
"""Retrieve the reward. Ensure simulation is attached before fetching."""
if not self.simulation:
raise ValueError('Reward giver must be attached to a simulation before fetching reward.')
class MakespanRewardGiver(RewardGiver):
"""Reward giver that returns a constant reward per timestamp."""
name = 'Makespan'
def __init__(self, reward_per_timestamp):
super().__init__()
self.reward_per_timestamp = reward_per_timestamp
def get_reward(self):
"""Retrieve the reward per timestamp."""
super().get_reward()
return self.reward_per_timestamp
class AverageSlowDownRewardGiver(RewardGiver):
"""Reward giver based on average slowdown."""
name = 'AS'
def get_reward(self):
"""Compute reward based on slowdown of unfinished tasks."""
super().get_reward()
cluster = self.simulation.cluster
unfinished_tasks = cluster.unfinished_tasks
reward = sum((-1 / task.task_config.duration) for task in unfinished_tasks)
return reward
class AverageCompletionRewardGiver(RewardGiver):
"""Reward giver based on average completion."""
name = 'AC'
def get_reward(self):
"""Compute reward based on number of unfinished tasks."""
super().get_reward()
cluster = self.simulation.cluster
unfinished_task_count = len(cluster.unfinished_tasks)
return -unfinished_task_count

View File

@ -0,0 +1,20 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from core.algorithm import Algorithm
class DRF(Algorithm):
def __call__(self, cluster, clock):
machines = cluster.machines
unfinished_tasks = cluster.unfinished_tasks
candidate_task = None
candidate_machine = None
for machine in machines:
for task in unfinished_tasks:
if machine.accommodate(task):
candidate_machine = machine
candidate_task = task
break
return candidate_machine, candidate_task

View File

@ -0,0 +1,21 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from core.algorithm import Algorithm # Corrected a typo in "algorithm"
class FirstFitAlgorithm(Algorithm):
def __call__(self, cluster, clock):
machines = cluster.machines
tasks = cluster.tasks_which_has_waiting_instance
candidate_task = None
candidate_machine = None
for machine in machines:
for task in tasks:
if machine.accommodate(task):
candidate_machine = machine
candidate_task = task
break
return candidate_machine, candidate_task

View File

@ -0,0 +1,40 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import numpy as np
from core.algorithm import Algorithm
class RandomAlgorithm(Algorithm):
def __init__(self, threshold=0.8):
self.threshold = threshold
def __call__(self, cluster, clock):
machines = cluster.machines
tasks = cluster.tasks_with_waiting_instances
selected_task = None
selected_machine = None
all_candidates = []
# Iterate through machines and tasks to find suitable pairings
for machine in machines:
for task in tasks:
if machine.can_accommodate(task):
all_candidates.append((machine, task))
# Randomly decide whether to select this pair
if np.random.rand() > self.threshold:
selected_machine = machine
selected_task = task
break
# If no suitable pairings found, return None, None
if len(all_candidates) == 0:
return None, None
# If a candidate task was selected, return it; otherwise, select one randomly
if selected_task is None:
pair_index = np.random.randint(0, len(all_candidates))
return all_candidates[pair_index]
else:
return selected_machine, selected_task

View File

@ -0,0 +1,40 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import numpy as np
from core.algorithm import Algorithm
class Tetris(Algorithm):
@staticmethod
def calculate_alignment(valid_pairs):
"""Calculate the alignment score between machine and task features."""
machine_features = []
task_features = []
# Extract the features for machines and tasks
for machine, task in valid_pairs:
machine_features.append(machine.feature[:2])
task_features.append([task.task_config.cpu, task.task_config.memory])
# Compute the alignment score and return the index of the maximum
alignment_scores = np.sum(np.array(machine_features) * np.array(task_features), axis=1)
return np.argmax(alignment_scores)
def __call__(self, cluster, clock):
"""Determine the best machine-task pair using the Tetris algorithm."""
machines = cluster.machines
tasks = cluster.ready_tasks_with_waiting_instance
valid_pairs = [(machine, task) for machine in machines for task in tasks if machine.accommodate(task)]
# If there are no valid pairs, return None for both machine and task
if not valid_pairs:
return None, None
# Get the best pair based on the alignment
best_pair_idx = Tetris.calculate_alignment(valid_pairs)
best_machine, best_task = valid_pairs[best_pair_idx]
return best_machine, best_task

View File

@ -0,0 +1,117 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import os
import time
import numpy as np
import tensorflow as tf
from multiprocessing import Process, Manager
import sys
sys.path.append('..')
from core.machine import MachineConfig
from csimpy_exp.Non_DAG_exp.algorithm.random_algorithm import RandomAlgorithm
from csimpy_exp.Non_DAG_exp.algorithm.tetris import Tetris
from csimpy_exp.Non_DAG_exp.algorithm.first_fit import FirstFitAlgorithm
from csimpy_exp.Non_DAG_exp.algorithm.MCJS.CMDRL import RLAlgorithm
from csimpy_exp.Non_DAG_exp.algorithm.MCJS.agent import Agent
from csimpy_exp.Non_DAG_exp.algorithm.MCJS.brain import Brain
from csimpy_exp.Non_DAG_exp.algorithm.MCJS.reward_giver import AverageCompletionRewardGiver
from csimpy_exp.Non_DAG_exp.utils.csv_reader import CSVReader
from csimpy_exp.Non_DAG_exp.utils.feature_functions import features_extract_func_ac, features_normalize_func_ac
from csimpy_exp.Non_DAG_exp.utils.tools import multiprocessing_run, average_completion, average_slowdown
from csimpy_exp.Non_DAG_exp.utils.episode import Episode
os.environ['CUDA_VISIBLE_DEVICES'] = ''
np.random.seed(41)
tf.random.set_random_seed(41)
def setup_parameters():
machines_number = 5
jobs_len = 10
n_iter = 100
n_episode = 12
jobs_csv = '../jobs_files/jobs.csv'
brain_instance = Brain(9)
reward_instance = AverageCompletionRewardGiver()
extract_func = features_extract_func_ac
normalize_func = features_normalize_func_ac
agent_name = '%s-%s-m%d' % (reward_instance.name, brain_instance.name, machines_number)
agent_dir = './agents/%s' % agent_name
if not os.path.isdir(agent_dir):
os.makedirs(agent_dir)
agent_instance = Agent(agent_name, brain_instance, 1, reward_to_go=True, nn_baseline=True,
normalize_advantages=True, model_save_path='%s/model.ckpt' % agent_dir)
machine_configs_list = [MachineConfig(64, 1, 1) for _ in range(machines_number)]
csv_instance = CSVReader(jobs_csv)
jobs_configs_list = csv_instance.generate(0, jobs_len)
return agent_instance, machine_configs_list, jobs_configs_list, n_iter, n_episode
def run_algorithm_and_print_stats(algorithm_class, machine_configs, jobs_configs):
tic = time.time()
algorithm_instance = algorithm_class()
episode_instance = Episode(machine_configs, jobs_configs, algorithm_instance, None)
episode_instance.run()
print(episode_instance.env.now, time.time() - tic, average_completion(episode_instance), average_slowdown(episode_instance))
def main():
agent, machine_configs, jobs_configs, n_iter, n_episode = setup_parameters()
# Test and print results for various algorithms
for algorithm in [RandomAlgorithm, FirstFitAlgorithm, Tetris]:
run_algorithm_and_print_stats(algorithm, machine_configs, jobs_configs)
for itr in range(n_iter):
tic = time.time()
print("********** Iteration %i ************" % itr)
processes = []
manager = Manager()
trajectories, makespans, average_completions, average_slowdowns = manager.list([]), manager.list([]), manager.list([]), manager.list([])
for _ in range(n_episode):
algorithm = RLAlgorithm(agent, AverageCompletionRewardGiver(), features_extract_func=features_extract_func_ac,
features_normalize_func=features_normalize_func_ac)
episode = Episode(machine_configs, jobs_configs, algorithm, None)
algorithm.reward_giver.attach(episode.simulation)
p = Process(target=multiprocessing_run,
args=(episode, trajectories, makespans, average_completions, average_slowdowns))
processes.append(p)
[p.start() for p in processes]
[p.join() for p in processes]
agent.log_metrics(np.mean(makespans), np mean(average_completions), np.mean(average_slowdowns))
print(np.mean(makespans), time.time() - tic, np.mean(average_completions), np.mean(average_slowdowns))
observations, actions, rewards = [], [], []
for trajectory in trajectories:
obs, act, rew = [], [], []
for node in trajectory:
obs.append(node.observation)
act.append(node.action)
rew.append(node.reward)
observations.append(obs)
actions.append(act)
rewards.append(rew)
q_s, advantages = agent.estimate_return(rewards)
agent.update_parameters(observations, actions, advantages)
agent.save()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,130 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import os
import time
import numpy as np
import tensorflow as tf
from multiprocessing import Process, Manager
import sys
sys.path.append('..')
from core.machine import MachineConfig
from csimpy_exp.Non_DAG_exp.algorithm.random_algorithm import RandomAlgorithm
from csimpy_exp.Non_DAG_exp.algorithm.tetris import Tetris
from csimpy_exp.Non_DAG_exp.algorithm.first_fit import FirstFitAlgorithm
from csimpy_exp.Non_DAG_exp.algorithm.MCJS.CMDRL import RLAlgorithm
from csimpy_exp.Non_DAG_exp.algorithm.MCJS.agent import Agent
from csimpy_exp.Non_DAG_exp.algorithm.MCJS.brain import Brain
from csimpy_exp.Non_DAG_exp.algorithm.MCJS.reward_giver import MakespanRewardGiver
from csimpy_exp.Non_DAG_exp.utils.csv_reader import CSVReader
from csimpy_exp.Non_DAG_exp.utils.feature_functions import features_extract_func, features_normalize_func
from csimpy_exp.Non_DAG_exp.utils.tools import multiprocessing_run, average_completion, average_slowdown
from csimpy_exp.Non_DAG_exp.utils.episode import Episode
os.environ['CUDA_VISIBLE_DEVICES'] = ''
np.random.seed(41)
tf.random.set_random_seed(41)
# ************************ Parameters Setting Start ************************
machines_number = 5
jobs_len = 10
n_iter = 100
n_episode = 12
jobs_csv = '../jobs_files/jobs.csv'
brain = Brain(6)
reward_giver = MakespanRewardGiver(-1)
features_extract_func = features_extract_func
features_normalize_func = features_normalize_func
name = '%s-%s-m%d' % (reward_giver.name, brain.name, machines_number)
model_dir = './agents/%s' % name
# ************************ Parameters Setting End ************************
if not os.path.isdir(model_dir):
os.makedirs(model_dir)
agent = Agent(name, brain, 1, reward_to_go=True, nn_baseline=True, normalize_advantages=True,
model_save_path='%s/model.ckpt' % model_dir)
machine_configs = [MachineConfig(64, 1, 1) for i in range(machines_number)]
csv_reader = CSVReader(jobs_csv)
jobs_configs = csv_reader.generate(0, jobs_len)
tic = time.time()
algorithm = RandomAlgorithm()
episode = Episode(machine_configs, jobs_configs, algorithm, None)
episode.run()
print(episode.env.now, time.time() - tic, average_completion(episode), average_slowdown(episode))
tic = time.time()
algorithm = FirstFitAlgorithm()
episode = Episode(machine_configs, jobs_configs, algorithm, None)
episode.run()
print(episode.env.now, time.time() - tic, average_completion(episode), average_slowdown(episode))
tic = time.time()
algorithm = Tetris()
episode = Episode(machine_configs, jobs_configs, algorithm, None)
episode.run()
print(episode.env.now, time.time() - tic, average_completion(episode), average_slowdown(episode))
for itr in range(n_iter):
tic = time.time()
print("********** Iteration %i ************" % itr)
processes = []
manager = Manager()
trajectories = manager.list([])
makespans = manager.list([])
average_completions = manager.list([])
average_slowdowns = manager.list([])
for i in range(n_episode):
algorithm = RLAlgorithm(agent, reward_giver, features_extract_func=features_extract_func,
features_normalize_func=features_normalize_func)
episode = Episode(machine_configs, jobs_configs, algorithm, None)
algorithm.reward_giver.attach(episode.simulation)
p = Process(target=multiprocessing_run,
args=(episode, trajectories, makespans, average_completions, average_slowdowns))
processes.append(p)
for p in processes:
p.start()
for p in processes:
p.join()
agent.log('makespan', np.mean(makespans), agent.global_step)
agent.log('average_completions', np.mean(average_completions), agent.global_step)
agent.log('average_slowdowns', np.mean(average_slowdowns), agent.global_step)
toc = time.time()
print(np.mean(makespans), toc - tic, np.mean(average_completions), np.mean(average_slowdowns))
all_observations = []
all_actions = []
all_rewards = []
for trajectory in trajectories:
observations = []
actions = []
rewards = []
for node in trajectory:
observations.append(node.observation)
actions.append(node.action)
rewards.append(node.reward)
all_observations.append(observations)
all_actions.append(actions)
all_rewards.append(rewards)
all_q_s, all_advantages = agent.estimate_return(all_rewards)
agent.update_parameters(all_observations, all_actions, all_advantages)
agent.save()

View File

@ -0,0 +1,130 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import os
import time
import numpy as np
import tensorflow as tf
from multiprocessing import Process, Manager
import sys
sys.path.append('..')
from core.machine import MachineConfig
from csimpy_exp.Non_DAG_exp.algorithm.random_algorithm import RandomAlgorithm
from csimpy_exp.Non_DAG_exp.algorithm.tetris import Tetris
from csimpy_exp.Non_DAG_exp.algorithm.first_fit import FirstFitAlgorithm
from csimpy_exp.Non_DAG_exp.algorithm.MCJS.CMDRL import RLAlgorithm
from csimpy_exp.Non_DAG_exp.algorithm.MCJS.agent import Agent
from csimpy_exp.Non_DAG_exp.algorithm.MCJS.brain import Brain
from csimpy_exp.Non_DAG_exp.algorithm.MCJS.reward_giver import MakespanRewardGiver
from csimpy_exp.Non_DAG_exp.utils.csv_reader import CSVReader
from csimpy_exp.Non_DAG_exp.utils.feature_functions import features_extract_func, features_normalize_func
from csimpy_exp.Non_DAG_exp.utils.tools import multiprocessing_run, average_completion, average_slowdown
from csimpy_exp.Non_DAG_exp.utils.episode import Episode
os.environ['CUDA_VISIBLE_DEVICES'] = ''
np.random.seed(41)
tf.random.set_random_seed(41)
# ************************ Parameters Setting Start ************************
machines_number = 5
jobs_len = 10
n_iter = 100
n_episode = 12
jobs_csv = './datasets/alibaba_clusterdata_v2017/jobs_Non_DAG.csv'
brain = Brain(6)
reward_giver = MakespanRewardGiver(-1)
features_extract_func = features_extract_func
features_normalize_func = features_normalize_func
name = '%s-%s-m%d' % (reward_giver.name, brain.name, machines_number)
model_dir = './agents/%s' % name
# ************************ Parameters Setting End ************************
if not os.path.isdir(model_dir):
os.makedirs(model_dir)
agent = Agent(name, brain, 1, reward_to_go=True, nn_baseline=True, normalize_advantages=True,
model_save_path='%s/model.ckpt' % model_dir)
machine_configs = [MachineConfig(64, 1, 1) for i in range(machines_number)]
csv_reader = CSVReader(jobs_csv)
jobs_configs = csv_reader.generate(0, jobs_len)
tic = time.time()
algorithm = RandomAlgorithm()
episode = Episode(machine_configs, jobs_configs, algorithm, None)
episode.run()
print(episode.env.now, time.time() - tic, average_completion(episode), average_slowdown(episode))
tic = time.time()
algorithm = FirstFitAlgorithm()
episode = Episode(machine_configs, jobs_configs, algorithm, None)
episode.run()
print(episode.env.now, time.time() - tic, average_completion(episode), average_slowdown(episode))
tic = time.time()
algorithm = Tetris()
episode = Episode(machine_configs, jobs_configs, algorithm, None)
episode.run()
print(episode.env.now, time.time() - tic, average_completion(episode), average_slowdown(episode))
for itr in range(n_iter):
tic = time.time()
print("********** Iteration %i ************" % itr)
processes = []
manager = Manager()
trajectories = manager.list([])
makespans = manager.list([])
average_completions = manager.list([])
average_slowdowns = manager.list([])
for i in range(n_episode):
algorithm = RLAlgorithm(agent, reward_giver, features_extract_func=features_extract_func,
features_normalize_func=features_normalize_func)
episode = Episode(machine_configs, jobs_configs, algorithm, None)
algorithm.reward_giver.attach(episode.simulation)
p = Process(target=multiprocessing_run,
args=(episode, trajectories, makespans, average_completions, average_slowdowns))
processes.append(p)
for p in processes:
p.start()
for p in processes:
p.join()
agent.log('makespan', np.mean(makespans), agent.global_step)
agent.log('average_completions', np.mean(average_completions), agent.global_step)
agent.log('average_slowdowns', np.mean(average_slowdowns), agent.global_step)
toc = time.time()
print(np.mean(makespans), toc - tic, np.mean(average_completions), np.mean(average_slowdowns))
all_observations = []
all_actions = []
all_rewards = []
for trajectory in trajectories:
observations = []
actions = []
rewards = []
for node in trajectory:
observations.append(node.observation)
actions.append(node.action)
rewards.append(node.reward)
all_observations.append(observations)
all_actions.append(actions)
all_rewards.append(rewards)
all_q_s, all_advantages = agent.estimate_return(all_rewards)
agent.update_parameters(all_observations, all_actions, all_advantages)
agent.save()

View File

@ -0,0 +1,58 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
from operator import attrgetter
import pandas as pd
import numpy as np
from typing import List
from core.job import JobConfig, TaskConfig
class CSVReader:
def __init__(self, filename: str):
self.filename = filename
self.job_configs = self._load_and_process_csv()
def _load_and_process_csv(self) -> List[JobConfig]:
df = pd.read_csv(self.filename, dtype={'task_id': int, 'job_id': int, 'instances_num': int})
job_task_map = {}
job_submit_time_map = {}
for _, row in df.iterrows():
task = TaskConfig(row.task_id, row.instances_num, row.cpu, row.memory, row.disk, row.duration)
job_task_map.setdefault(row.job_id, []).append(task)
job_submit_time_map[row.job_id] = row.submit_time
return sorted(
(JobConfig(job_id, submit_time, tasks) for job_id, (submit_time, tasks) in job_submit_time_map.items()),
key=attrgetter('submit_time')
)
def _get_statistics(self, job_configs: List[JobConfig]):
tasks = [task for job in job_configs for task in job.task_configs]
task_metrics = {
'number': [task.instances_number for task in tasks],
'duration': [task.duration for _ in range(task.instances_number) for task in tasks],
'cpu': [task.cpu for _ in range(task.instances_number) for task in tasks],
'memory': [task.memory for _ in range(task.instances_number) for task in tasks]
}
print(f"Jobs number: {len(job_configs)}")
print(f"Tasks number: {len(tasks)}")
for metric, values in task_metrics.items():
print(f"Task instances {metric} mean: {np.mean(values)}")
print(f"Task instances {metric} std: {np.std(values)}")
def generate(self, offset: int, number: int) -> List[JobConfig]:
subset = self.job_configs[offset: offset + number]
if subset:
submit_time_base = subset[0].submit_time
for job_config in subset:
job_config.submit_time -= submit_time_base
self._get_statistics(subset)
return subset

View File

@ -0,0 +1,25 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import simpy
from core.cluster import Cluster
from core.scheduler import Scheduler
from core.broker import Broker
from core.simulation import Simulation
class Episode:
def __init__(self, machine_configs, task_configs, algorithm, event_file):
self.env = simpy.Environment()
cluster = Cluster()
cluster.add_machines(machine_configs)
task_broker = Broker(self.env, task_configs)
scheduler = Scheduler(self.env, algorithm)
self.simulation = Simulation(self.env, cluster, task_broker, scheduler, event_file)
def run(self):
self.simulation.run()
self.env.run()

View File

@ -0,0 +1,32 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import numpy as np
def features_extract_func(task):
return [
task.task_config.cpu,
task.task_config.memory,
task.task_config.duration,
task.waiting_task_instances_number
]
def features_extract_func_ac(task):
return features_extract_func(task) + [
task.task_config.instances_number,
len(task.running_task_instances),
len(task.finished_task_instances)
]
def features_normalize_func(x):
mean = [0, 0, 0.65, 0.009, 74.0, 80.3]
std_dev = [64, 1, 0.23, 0.005, 108.0, 643.5]
y = (np.array(x) - np.array(mean)) / np.array(std_dev)
return y
def features_normalize_func_ac(x):
mean = [0, 0, 0.65, 0.009, 74.0, 80.3, 80.3, 80.3, 80.3]
std_dev = [64, 1, 0.23, 0.005, 108.0, 643.5, 643.5, 643.5, 643.5]
y = (np.array(x) - np.array(mean)) / np.array(std_dev)
return y

View File

@ -0,0 +1,100 @@
"""
Author: Weisen Pan
Date: 2023-10-24
"""
import time
import numpy as np
import tensorflow as tf
def average_metric(exp, metric_func):
"""
Calculate the average of a given metric for all tasks.
Args:
- exp: The simulation experiment.
- metric_func: Function to compute the desired metric.
Returns:
- float: Average of the metric across tasks.
"""
total = sum(metric_func(task) for job in exp.simulation.cluster.jobs for task in job.tasks)
number_of_tasks = sum(1 for job in exp.simulation.cluster.jobs for _ in job.tasks)
return total / number_of_tasks
def completion_metric(task):
"""
Compute the completion metric for a task.
Args:
- task: The task object.
Returns:
- float: Completion metric for the task.
"""
return task.finished_timestamp - task.started_timestamp
def slowdown_metric(task):
"""
Compute the slowdown metric for a task.
Args:
- task: The task object.
Returns:
- float: Slowdown metric for the task.
"""
return (task.finished_timestamp - task.started_timestamp) / task.task_config.duration
def average_completion(exp):
"""
Compute the average completion time for all tasks.
Args:
- exp: The simulation experiment.
Returns:
- float: Average completion time.
"""
return average_metric(exp, completion_metric)
def average_slowdown(exp):
"""
Compute the average slowdown for all tasks.
Args:
- exp: The simulation experiment.
Returns:
- float: Average slowdown.
"""
return average_metric(exp, slowdown_metric)
def multiprocessing_run(episode, trajectories, makespans, avg_completions, avg_slowdowns):
"""
Run an episode in a multiprocessing environment and gather results.
Args:
- episode: The simulation episode.
- trajectories: List to collect trajectories.
- makespans: List to collect makespans.
- avg_completions: List to collect average completions.
- avg_slowdowns: List to collect average slowdowns.
"""
# Set random seeds
np.random.seed(int(time.time()))
tf.random.set_random_seed(time.time())
# Execute the episode
episode.run()
# Gather and append results to respective lists
trajectories.append(episode.simulation.scheduler.algorithm.current_trajectory)
makespans.append(episode.simulation.env.now)
avg_completions.append(average_completion(episode))
avg_slowdowns.append(average_slowdown(episode))

Binary file not shown.

View File

@ -0,0 +1,2 @@
Alibaba Cluster Trace Program
https://github.com/alibaba/clusterdata

View File

@ -0,0 +1,2 @@
# ignore generated yaml directories
openb_pod_list*/

View File

@ -0,0 +1,82 @@
# Traces for [Kubernetes Scheduler Simulator](https://github.com/hkust-adsl/kubernetes-scheduler-simulator)
## Basics
This repo contains trace data for the [Kubernetes Scheduler Simulator](https://github.com/hkust-adsl/kubernetes-scheduler-simulator), which evaluates different scheduling policies in GPU-sharing clusters.
It includes the Fragmentation Gradient Descent (FGD) policy proposed in the [USENIX ATC 2023](https://www.usenix.org/conference/atc23) paper "[Beware of Fragmentation: Scheduling GPU-Sharing Workloads with Fragmentation Gradient Descent](https://www.usenix.org/conference/atc23/presentation/weng)", along with other baseline policies (e.g., Best-fit, Dot-product, GPU Packing, GPU Clustering, Random-fit).
## 🗄️ Traces
Key data is in the csv folder, while yaml files can be generated from this data.
### [openb_node_list_all_node.csv](./csv/openb_node_list_all_node.csv)
This file contains 1523 nodes of a heterogeneous GPU cluster in production, listing their CPU, main memory, GPU specifications and GPU types.
[openb_node_list_gpu_node.csv](./csv/openb_node_list_gpu_node.csv) is a subset excluding non-GPU nodes. [openb_node_list_gpu_node.yaml](./node_yaml/openb_node_list_gpu_node.yaml) contains the same data in YAML format.
Here's a sample output:
| | sn | cpu_milli | memory_mib | gpu | model |
|---:|:----------------|------------:|-------------:|------:|:--------|
| 0 | openb-node-0227 | 32000 | 262144 | 0 | nan |
| 1 | openb-node-0228 | 128000 | 786432 | 8 | G3 |
| 2 | openb-node-0229 | 96000 | 786432 | 8 | V100M32 |
- `cpu_milli`: Number of CPUs (in milli)
- `memory_mib`: Main memory (in MiB)
- `gpu`: Number of GPUs
- `model`: GPU type. G1, G2, G3 are undisclosed internal GPU codes.
### [openb_pod_list_default.csv](./csv/openb_pod_list_default.csv)
This file contains 8152 tasks submitted to the GPU cluster, listing their resource specifications, QoS, phase and creation/deletion/scheduled times.
The other openb_pod_list_*.csv files (excluding the gpuspec ones) are sampled from the default one, emphasizing certain types of workloads (e.g., CPU-only tasks, GPU-sharing tasks, multi-GPU tasks).
Trace files with `gpuspec` augment tasks with GPU type requirements. About 33% of GPU tasks in our production cluster have GPU type constraints (see [openb_pod_list_gpuspec33.csv](./csv/openb_pod_list_gpuspec33.csv) and Sec. 6.5 in the "[Beware of Fragmentation](https://www.usenix.org/conference/atc23/presentation/weng)" paper).
Here's a sample output:
| | name | cpu_milli | memory_mib | num_gpu | gpu_milli | gpu_spec | qos | pod_phase | creation_time | deletion_time | scheduled_time |
|---:|:---------------|------------:|-------------:|----------:|------------:|:----------------|:----------|:------------|----------------:|----------------:|-----------------:|
| 0 | openb-pod-0017 | 88000 | 327680 | 8 | 1000 | nan | Burstable | Succeeded | 9437497 | 10769854 | 9437497 |
| 1 | openb-pod-0022 | 4000 | 15258 | 1 | 220 | nan | BE | Running | 9679175 | 9973826 | 9679175 |
| 2 | openb-pod-0035 | 16000 | 32768 | 1 | 1000 | V100M16\|V100M32 | LS | Running | 9967058 | 9968575 | 9967063 |
- `cpu_milli`: Number of CPUs requested (in milli)
- `memory_mib`: Main memory requested (in MiB)
- `num_gpu`: Number of GPUs requested (integers from 0 to 8)
- `gpu_milli`: Detailed GPU requested for GPU-sharing workloads (i.e., `num_gpu==1`) (in milli).
- `gpu_spec`: Required GPU types, For example, `nan` means no GPU type constraints while `V100M16|V100M32` means the task can run on [NVIDIA V100](https://www.nvidia.com/en-us/data-center/v100/) with either 16GB VRAM or 32GB VRAM.
- `qos`: [Quality of Service](https://kubernetes.io/docs/concepts/workloads/pods/pod-qos/) (e.g., Burstable, Best Effort (BE), Latency Sensitive (LS))
- `pod_phrase`: Succeeded, Running, Pending, Failed
- `creation_time`: Timestamp of creation (in seconds)
- `deletion_time`: Timestamp of deletion (in seconds)
- `scheduled_time`: Timestamp of being scheduled (in seconds)
## 🛠 Usage
Generate the YAML files needed for the simulation experiment based on the original CSV files.
```bash
$ bash prepare_input.sh
```
For cluster scheduling simulation on Kubernetes, refer to the Kubernetes Scheduler Simulator repo: https://github.com/hkust-adsl/kubernetes-scheduler-simulator.
## 📝 Paper
Please cite our paper if it is helpful to your research.
```
@inproceedings{FGD2023,
title = {Beware of Fragmentation: Scheduling GPU-Sharing Workloads with Fragmentation Gradient Descent},
author = {Qizhen Weng and Lingyun Yang and Yinghao Yu and Wei Wang and Xiaochuan Tang and Guodong Yang and Liping Zhang},
booktitle = {2023 {USENIX} Annual Technical Conference},
year = {2023},
series = {{USENIX} {ATC} '23},
url = {https://www.usenix.org/conference/atc23/presentation/weng},
publisher = {{USENIX} Association}
}
```

View File

@ -0,0 +1,159 @@
import sys
import yaml
import pandas as pd
from pathlib import Path
USAGE_PROMPT="""Usage:
python3 pod_csv_to_yaml.py data/csv/openb_pod_list_gpuspec10.csv
"""
OUTPUT_DIR_DEFAULT="data/new_output"
MILLI = 1000
DATA_CREATION_TIME = "creation_time"
DATA_DELETION_TIME = "deletion_time"
ResourceName = "alibabacloud.com/gpu-milli" # GPU milli, i.e., 1000 == 1 GPU, for pod only, node is 1000 by default
CountName = "alibabacloud.com/gpu-count" # GPU number request (or allocatable), for pod and node
DeviceIndex = "alibabacloud.com/gpu-index" # Exists when the pod are assigned/predefined to a GPU device
ModelName = "alibabacloud.com/gpu-card-model" # GPU card model, for pod and node
AssumeTime = "alibabacloud.com/assume-time" # To retrieve the scheduling latency
CreationTime = "alibabacloud.com/creation-time" # creation timestamp
DeletionTime = "alibabacloud.com/deletion-time" # deletion timestamp
PodNsNameSep = "/"
DevIdSep = "-"
def generate_pod_yaml(workload_name='paib-pod-10',
workload_namespace='paib-gpu',
container_name='main',
container_image='tensorflow:latest',
container_requests={'cpu': '6000m'},
container_limits={'cpu': '6000m'},
node_selector_node_ip="",
annotations={},
labels={}):
pod_template = """
apiVersion: v1
kind: Pod
metadata:
name: single-pod
spec:
containers:
- name: php-redis
image: gcr.io/google-samples/gb-frontend:v4
imagePullPolicy: Always
resources:
requests:
cpu: 100m
limits:
cpu: 100m
restartPolicy: "OnFailure"
dnsPolicy: "Default"
"""
workload_yaml = yaml.safe_load(pod_template)
workload_yaml['metadata']['name'] = workload_name
workload_yaml['metadata']['namespace'] = workload_namespace
workload_yaml['spec']['containers'][0]['name'] = container_name
workload_yaml['spec']['containers'][0]['image'] = container_image
workload_yaml['spec']['containers'][0]['resources']['requests'] = container_requests
workload_yaml['spec']['containers'][0]['resources']['limits'] = container_limits
if len(node_selector_node_ip) > 0:
if 'nodeSelector' not in workload_yaml['spec']:
workload_yaml['spec']['nodeSelector'] = {}
workload_yaml['spec']['nodeSelector']['node-ip'] = node_selector_node_ip
elif 'nodeSelector' in workload_yaml['spec']:
if 'node-ip' in workload_yaml["spec"]["nodeSelector"]:
del workload_yaml['spec']['nodeSelector']['node-ip']
for k, v in annotations.items():
if 'annotations' not in workload_yaml['metadata']:
workload_yaml['metadata']['annotations'] = {}
if v is not None:
workload_yaml['metadata']['annotations'][k] = v # e.g., {"alibabacloud.com/gpu-index":"2-3-4"}
for k, v in labels.items():
workload_yaml['metadata'][k] = v
return workload_yaml
def output_pod(dfp, outfile='pod.yaml', node_select=False):
num_pod = len(dfp)
for index, row in dfp.iterrows():
if 'name' in row:
workload_name = row['name']
elif 'job_id' in row:
workload_name = f"job-{row['job_id']:04}" # float is not allowed
else:
exit("neither name nor job_id in row")
container_requests = {}
if 'cpu_milli' in row:
container_requests['cpu'] = "%dm" % (row['cpu_milli'])
elif 'cpu' in row:
container_requests['cpu'] = "%dm" % (row['cpu'] * MILLI)
elif 'num_cpu' in row:
container_requests['num_cpu'] = "%dm" % (row['num_cpu'] * MILLI)
else:
exit("neither cpu_milli nor cpu in row")
if 'memory_mib' in row:
container_requests['memory'] = "%dMi" % row['memory_mib']
container_limits = container_requests.copy()
host_node_ip = row['ip'] if node_select else ""
annotations = {}
if int(row['num_gpu']) != 0:
if node_select:
annotations[DeviceIndex] = row['gpu_index'] if type(row['gpu_index']) == str else ""
if 'gpu_milli' not in row:
annotations[ResourceName] = 1000
else:
annotations[ResourceName] = "%d" % (int(row['gpu_milli'])) if 0 < row['gpu_milli'] <= 1000 else "1000" if row['gpu_milli'] > 1000 else "0"
annotations[CountName] = "%d" % (int(row['num_gpu']))
if 'gpu_spec' in row:
gpu_req_val = [x for x in row['gpu_spec'].split('|') if len(x) > 0]
gpu_req_out = "|".join(x for x in gpu_req_val)
if len(gpu_req_out) > 0:
annotations[ModelName] = gpu_req_out
# annotations[CreationTime] = "%s" % row[DATA_CREATION_TIME] if DATA_CREATION_TIME in row else None
# annotations[DeletionTime] = "%s" % row[DATA_DELETION_TIME] if DATA_DELETION_TIME in row else None
pod_yaml = generate_pod_yaml(workload_name=workload_name, container_requests=container_requests,
container_limits=container_limits, node_selector_node_ip=host_node_ip,
annotations=annotations)
if index == 0:
with open(outfile, 'w') as file:
yaml.dump(pod_yaml, file)
else:
with open(outfile, 'a') as file:
file.writelines(['\n---\n\n'])
yaml.dump(pod_yaml, file)
if __name__ == '__main__':
if len(sys.argv) < 2:
exit(USAGE_PROMPT)
pod_csv_file = Path(sys.argv[1])
if not pod_csv_file.exists():
exit(f"CSV File: {pod_csv_file} does not exist")
dfp = pd.read_csv(pod_csv_file, dtype={'gpu_index': str})
if 'gpu_spec' in dfp:
dfp.gpu_spec = dfp.gpu_spec.fillna('')
output_dir = pod_csv_file.stem # .csv to ""
if len(output_dir) <= 0:
output_dir_path = Path(OUTPUT_DIR_DEFAULT)
else:
output_dir_path = Path(output_dir)
output_dir_path.mkdir(exist_ok=True)
pod_yaml_file = output_dir_path / (pod_csv_file.stem + '.yaml') # .csv to .yaml
output_pod(dfp, pod_yaml_file, node_select=False)
print("OUTPUT: %s (len: %d)" % (pod_yaml_file, len(dfp)))

View File

@ -0,0 +1,14 @@
#!/bin/bash
# Based on trace files in csv/, we will generate 23 trace folders (in parallel) that will be used as input folder for the simulator
for file in csv/openb_pod_list*; do
echo $file
# Create trace folders with the name of csv trace file (without .csv)
filename="${file##*/}"
OUTDIR="${filename%.*}"
mkdir -p $OUTDIR
# Copy node yaml (generated by openb_node_list_gpu_node.csv) to the trace folder
cp node_yaml/openb_node_list_gpu_node.yaml $OUTDIR/
# Transform csv file to yaml file and put it under trace folder
python3 pod_csv_to_yaml.py $file &
done
wait && date

View File

@ -0,0 +1,26 @@
**Below is for cluster trace released in 2017. There will be no more updates for this file.**
# Overview
The trace data, ClusterData201708, contains cluster information of a production cluster in 12 hours period (see note below), and contains about 1.3k machines that run both online service and batch jobs.
*note for 12 hours period: although the data for server and batch spans about 24hours, data for containers is refined to 12 hours. We will release another version in near future.*
# Trace data
The format of trace data is described in the [schema description](trace_201708.md), and defined in the specification file [schema.csv](schema.csv) in the repository.
# Downloading the trace
The data is stored in Alibaba Cloud Object Storage Service. You do not need to have an Alibaba account or sign up for Object Storage Service to download the data.
Users can run the following command to fetch data.
>wget -c --retry-connrefused --tries=0 --timeout=50 http://aliopentrace.oss-cn-beijing.aliyuncs.com/v2017Traces/alibaba-trace-2017.tar.gz
Included with the trace is a [SHA256SUM](SHA256SUM) file, which can be used to verify the integrity of a download, using the sha256sum command from GNU coreutils using a command like
```
sha256sum --check SHA256SUM
```

View File

@ -0,0 +1 @@
02c1fbc412fccc03b748c05ec18fc30fd93c0bc383eea47ac21146c4aa2235df trace_201708.tgz

View File

@ -0,0 +1,7 @@
,task_id,instances_num,task_type,job_id,status,start_time,end_time,cpu,memory,duration,disk,submit_time
0,1,1,A,1,Terminated,157297,157325,1.0,0,100,0,0
1,2_1,1,A,1,Terminated,157329,157376,1.0,0,100,0,0
2,3_1,1,A,1,Terminated,157322,157328,1.0,0,100,0,0
3,4_1,1,A,1,Terminated,157331,157376,1.0,0,100,0,0
4,5_3_4,1,A,1,Terminated,157213,157295,1.0,0,100,0,0
5,6_2_5,1,A,1,Terminated,157376,157381,1.0,0,100,0,0
1 task_id instances_num task_type job_id status start_time end_time cpu memory duration disk submit_time
2 0 1 1 A 1 Terminated 157297 157325 1.0 0 100 0 0
3 1 2_1 1 A 1 Terminated 157329 157376 1.0 0 100 0 0
4 2 3_1 1 A 1 Terminated 157322 157328 1.0 0 100 0 0
5 3 4_1 1 A 1 Terminated 157331 157376 1.0 0 100 0 0
6 4 5_3_4 1 A 1 Terminated 157213 157295 1.0 0 100 0 0
7 5 6_2_5 1 A 1 Terminated 157376 157381 1.0 0 100 0 0

Some files were not shown because too many files have changed in this diff Show More