- Incorporating last-minute changes of Morphemic: Extended TranslationContext with 'topLevelMetricNames' and 'loadAnnotatedDestinationToMetricContextNameMap' fields and methods.
- Moved M2MC field and methods to EMS-4-MOR project.
- Added null-checks in several getter methods (whose fields can be initialized to 'null' by Gson).
- Added 'printExtraInfo()' method (for printing extra TC info).

- Added MetasolverPlugin interface used during generation of Metasolver config. in ControlServiceCoordinator, and updated 'ControlServiceCoordinator.configureMetaSolver()' method to invoke MetasolverPlugin methods.
This commit is contained in:
ipatini 2023-09-20 20:26:14 +03:00
parent b79fb76e44
commit f24ac859d3
4 changed files with 107 additions and 41 deletions

View File

@ -17,6 +17,7 @@ import gr.iccs.imu.ems.brokercep.BrokerCepService;
import gr.iccs.imu.ems.brokercep.BrokerCepStatementSubscriber;
import gr.iccs.imu.ems.brokercep.event.EventMap;
import gr.iccs.imu.ems.control.collector.netdata.ServerNetdataCollector;
import gr.iccs.imu.ems.control.plugin.MetasolverPlugin;
import gr.iccs.imu.ems.control.plugin.PostTranslationPlugin;
import gr.iccs.imu.ems.control.plugin.TranslationContextPlugin;
import gr.iccs.imu.ems.control.properties.ControlServiceProperties;
@ -78,6 +79,8 @@ public class ControlServiceCoordinator implements InitializingBean {
private final List<TranslationContextPlugin> translationContextPlugins;
private final TranslationContextPrinter translationContextPrinter;
private final List<MetasolverPlugin> metasolverPlugins;
private final List<MetricVariableValuesService> mvvServiceImplementations;
private MetricVariableValuesService mvvService; // Will be populated in 'afterPropertiesSet()'
@ -115,6 +118,7 @@ public class ControlServiceCoordinator implements InitializingBean {
log.debug("ControlServiceCoordinator.afterPropertiesSet(): Post-translation plugins: {}", postTranslationPlugins);
log.debug("ControlServiceCoordinator.afterPropertiesSet(): TranslationContext plugins: {}", translationContextPlugins);
log.debug("ControlServiceCoordinator.afterPropertiesSet(): MetaSolver plugins: {}", metasolverPlugins);
}
private void initMvvService() {
@ -421,6 +425,7 @@ public class ControlServiceCoordinator implements InitializingBean {
// Translate application model into a TranslationContext object
log.info("ControlServiceCoordinator.translateAppModelAndStore(): Model translation: model-id={}", appModelId);
_TC = translator.translate(appModelId);
_TC.populateTopLevelMetricNames();
log.debug("ControlServiceCoordinator.translateAppModelAndStore(): Model translation: RESULTS: {}", _TC);
// Run post-translation plugins
@ -674,11 +679,16 @@ public class ControlServiceCoordinator implements InitializingBean {
log.debug("ControlServiceCoordinator.configureMetaSolver(): MetaSolver configuration: scaling-topics: {}", scalingTopics);
// Get top-level metric topics from _TC
Set<String> metricTopics = _TC.getTopLevelMetricNames(true).stream()
Set<String> topLevelMetrics = _TC.getTopLevelMetricNames(true);
log.debug("ControlServiceCoordinator.configureMetaSolver(): Top-Level metrics: {}", topLevelMetrics);
Set<String> metricTopics = topLevelMetrics.stream()
.filter(m -> !scalingTopics.contains(m))
.collect(Collectors.toSet());
log.debug("ControlServiceCoordinator.configureMetaSolver(): MetaSolver configuration: metric-topics: {}", metricTopics);
// Let Metasolver plugins modify topics sets
metasolverPlugins.forEach(p -> p.topicsCollected(_TC, scalingTopics, metricTopics));
// Prepare subscription configurations
String upperwareBrokerUrl = brokerCep != null ? brokerCep.getBrokerCepProperties().getBrokerUrlForClients() : null;
boolean usesAuthentication = brokerCep.getBrokerCepProperties().isAuthenticationEnabled();
@ -694,15 +704,18 @@ public class ControlServiceCoordinator implements InitializingBean {
}
List<Map<String, String>> subscriptionConfigs = new ArrayList<>();
for (String t : scalingTopics)
subscriptionConfigs.add(_prepareSubscriptionConfig(upperwareBrokerUrl, username, password, certificate, t, "", "SCALE"));
subscriptionConfigs.add(_prepareSubscriptionConfig(_TC, upperwareBrokerUrl, username, password, certificate, t, "", "SCALE"));
for (String t : metricTopics)
subscriptionConfigs.add(_prepareSubscriptionConfig(upperwareBrokerUrl, username, password, certificate, t, "", "MVV"));
subscriptionConfigs.add(_prepareSubscriptionConfig(_TC, upperwareBrokerUrl, username, password, certificate, t, "", "MVV"));
log.debug("ControlServiceCoordinator.configureMetaSolver(): MetaSolver subscriptions configuration: {}", subscriptionConfigs);
// Retrieve MVV to Current-Config MVV map
Map<String, String> mvvMap = _TC.getMvvCP();
log.debug("ControlServiceCoordinator.configureMetaSolver(): MetaSolver MVV configuration: {}", mvvMap);
// Let Metasolver plugins modify MVV map
metasolverPlugins.forEach(p -> p.mvvsCollected(_TC, mvvMap));
// Prepare MetaSolver configuration
Map<String,Object> msConfig = new HashMap<>();
msConfig.put("subscriptions", subscriptionConfigs);
@ -756,7 +769,7 @@ public class ControlServiceCoordinator implements InitializingBean {
return modelId;
}
protected Map<String, String> _prepareSubscriptionConfig(String url, String username, String password, String certificate, String topic, String clientId, String type) {
protected Map<String, String> _prepareSubscriptionConfig(TranslationContext _TC, String url, String username, String password, String certificate, String topic, String clientId, String type) {
Map<String, String> map = new HashMap<>();
map.put("url", url);
map.put("username", username);
@ -765,6 +778,10 @@ public class ControlServiceCoordinator implements InitializingBean {
map.put("topic", topic);
map.put("client-id", clientId);
map.put("type", type);
// Let Metasolver plugins modify subscription
metasolverPlugins.forEach(p -> p.prepareSubscription(_TC, map));
return map;
}

View File

@ -0,0 +1,27 @@
/*
* Copyright (C) 2017-2023 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package gr.iccs.imu.ems.control.plugin;
import gr.iccs.imu.ems.translate.TranslationContext;
import gr.iccs.imu.ems.util.Plugin;
import java.util.Map;
import java.util.Set;
/**
* Executed during Metasolver configuration generation
*/
public interface MetasolverPlugin extends Plugin {
default void topicsCollected(TranslationContext translationContext, Set<String> scalingTopics, Set<String> metricTopics) { }
default void prepareSubscription(TranslationContext translationContext, Map<String, String> subscriptionConfigMap) { }
default void mvvsCollected(TranslationContext translationContext, Map<String, String> mvvMap) { }
}

View File

@ -21,6 +21,7 @@ import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import java.io.Serializable;
import java.util.*;
@ -69,11 +70,6 @@ public class TranslationContext implements Serializable {
// Grouping-to-Topics map
private final Map<String, Set<String>> G2T = new HashMap<>();
// Metric-to-Metric Context map
@Getter
@JsonIgnore
private final transient Map<Metric, Set<MetricContext>> M2MC = new HashMap<>();
// Composite Metric Variables set
@Getter
@JsonIgnore
@ -114,10 +110,11 @@ public class TranslationContext implements Serializable {
private final Set<IfThenConstraint> ifThenConstraints = new LinkedHashSet<>();
// Load-annotated Metric
protected final Map<String, String> loadAnnotatedDestinationToMetricContextNameMap = new LinkedHashMap<>();
protected final Set<String> loadAnnotatedMetricsSet = new LinkedHashSet<>();
// Top-Level metric names
protected Set<String> topLevelMetricNames = new LinkedHashSet<>();
protected final Set<String> topLevelMetricNames = new LinkedHashSet<>();
// Export files
@Getter @Setter
@ -190,6 +187,7 @@ public class TranslationContext implements Serializable {
this.metricConstraints.addAll( cloneSet(_TC.metricConstraints) );
this.logicalConstraints.addAll( cloneSet(_TC.logicalConstraints) );
this.ifThenConstraints.addAll( cloneSet(_TC.ifThenConstraints) );
this.loadAnnotatedDestinationToMetricContextNameMap.putAll(_TC.loadAnnotatedDestinationToMetricContextNameMap);
this.loadAnnotatedMetricsSet.addAll(_TC.loadAnnotatedMetricsSet);
this.topLevelMetricNames.addAll(_TC.topLevelMetricNames);
this.exportFiles.addAll(_TC.exportFiles);
@ -279,48 +277,43 @@ public class TranslationContext implements Serializable {
return newGroupingsMap;
}
public MetricContext getMetricContextForMetric(Metric m) {
if (M2MC==null) return null;
Set<MetricContext> set = M2MC.get(m);
return set == null ? null : set.iterator().next();
}
public Set<MetricConstraint> getMetricConstraints() {
return new HashSet<>(metricConstraints);
return metricConstraints!=null ? new HashSet<>(metricConstraints) : new HashSet<>();
}
public Set<LogicalConstraint> getLogicalConstraints() {
return new HashSet<>(logicalConstraints);
return logicalConstraints!=null ? new HashSet<>(logicalConstraints) : new HashSet<>();
}
public HashSet<MetricVariable> getCompositeMetricVariables() {
return new HashSet<>(CMVar_1);
return CMVar_1!=null ? new HashSet<>(CMVar_1) : new HashSet<>();
}
public HashSet<String> getCompositeMetricVariableNames() {
return new HashSet<>(CMVar);
return CMVar!=null ? new HashSet<>(CMVar) : new HashSet<>();
}
public HashSet<MetricVariable> getRawMetricVariables() {
return new HashSet<>(RMVar_1);
return RMVar_1!=null ? new HashSet<>(RMVar_1) : new HashSet<>();
}
public HashSet<String> getRawMetricVariableNames() {
return new HashSet<>(RMVar);
return RMVar!=null ? new HashSet<>(RMVar) : new HashSet<>();
}
public boolean isMVV(String name) {
if (MVV==null) return false;
for (String mvv : MVV)
if (mvv.equals(name)) return true;
return false;
}
public Set<String> getMVV() {
return new HashSet<>(MVV);
return MVV!=null ? new HashSet<>(MVV) : new HashSet<>();
}
public Map<String,String> getMvvCP() {
return new HashMap<>(MvvCP);
return MvvCP!=null ? new HashMap<>(MvvCP) : new HashMap<>();
}
// ====================================================================================================================================================
@ -396,14 +389,6 @@ public class TranslationContext implements Serializable {
rules.forEach(rule -> addGroupingRulePair(grouping, topic, rule));
}
public void addMetricMetricContextPair(Metric m, MetricContext mc) {
_addPair(M2MC, m, mc);
}
public void addMetricMetricContextPairs(Metric m, List<MetricContext> mcs) {
_addPair(M2MC, m, mcs);
}
public void addCompositeMetricVariable(MetricVariable mv) {
CMVar.add(mv.getName());
CMVar_1.add(mv);
@ -672,6 +657,23 @@ public class TranslationContext implements Serializable {
// ====================================================================================================================================================
// Load-Metrics-related helper methods
public void addLoadAnnotatedDestinationNameToMetricContextName(@NonNull String metricContextName, @NonNull String destinationName) {
loadAnnotatedDestinationToMetricContextNameMap.put(destinationName, metricContextName);
}
public void addLoadAnnotatedDestinationNameToMetricContextName(@NonNull Map<String,String> map) {
loadAnnotatedDestinationToMetricContextNameMap.putAll(map);
}
public Map<String,String> getLoadAnnotatedDestinationNameToMetricContextNameMap() {
return loadAnnotatedDestinationToMetricContextNameMap!=null
? new HashMap<>(loadAnnotatedDestinationToMetricContextNameMap) : new HashMap<>();
}
public String getLoadAnnotatedDestinationMetricContextName(@NonNull String key) {
return getLoadAnnotatedDestinationNameToMetricContextNameMap().get(key);
}
public void addLoadAnnotatedMetric(@NonNull String metricName) {
loadAnnotatedMetricsSet.add(metricName);
}
@ -681,7 +683,7 @@ public class TranslationContext implements Serializable {
}
public Set<String> getLoadAnnotatedMetricsSet() {
return new HashSet<>(loadAnnotatedMetricsSet);
return loadAnnotatedMetricsSet!=null ? new HashSet<>(loadAnnotatedMetricsSet) : new HashSet<>();
}
// ====================================================================================================================================================
@ -692,11 +694,19 @@ public class TranslationContext implements Serializable {
}
public void populateTopLevelMetricNames() {
Set<String> set = DAG.getTopLevelNodes().stream()
if (getDAG()==null) {
log.warn("TranslationContext.populateTopLevelMetricNames(): DAG is NULL");
return;
}
log.trace("TranslationContext.populateTopLevelMetricNames(): DAG is *NOT* NULL");
Set<String> set = getDAG().getTopLevelNodes().stream()
.map(DAGNode::getElementName)
.collect(Collectors.toSet());
topLevelMetricNames.clear();
topLevelMetricNames.addAll(set);
log.trace("TranslationContext.populateTopLevelMetricNames(): set: {}", set);
synchronized (topLevelMetricNames) {
topLevelMetricNames.clear();
topLevelMetricNames.addAll(set);
}
}
public Set<String> getTopLevelMetricNames(boolean forcePopulate) {
@ -719,6 +729,9 @@ public class TranslationContext implements Serializable {
return clazz.cast(result);
}
public void printExtraInfo(Logger log) {
}
// ====================================================================================================================================================
/*public void prepareForSerialization() {

View File

@ -100,8 +100,6 @@ public class TranslationContextPrinter {
log.info("*********************************************************");
log.info("Topics-Connections map:\n{}", _TC.getTopicConnections());
log.info("*********************************************************");
log.info("Metric-to-Metric Context map:\n{}", map2string(_TC.getM2MC()));
log.info("*********************************************************");
log.info("MVV set:\n{}", _TC.getMVV());
log.info("*********************************************************");
log.info("MVV_CP map:\n{}", _TC.getMvvCP());
@ -114,9 +112,16 @@ public class TranslationContextPrinter {
log.info("*********************************************************");
log.info("Metric Constraints:\n{}", _TC.getMetricConstraints());
log.info("*********************************************************");
log.info("Load-Annotated Destination-to-Metric Context names map:\n{}",
_TC.getLoadAnnotatedDestinationNameToMetricContextNameMap());
log.info("*********************************************************");
log.info("Load-Annotated Metrics:\n{}", _TC.getLoadAnnotatedMetricsSet());
log.info("*********************************************************");
log.info("Top-Level Metric names:\n{}", _TC.getTopLevelMetricNames());
log.info("*********************************************************");
_TC.printExtraInfo(log);
log.info("*********************************************************");
log.info("Additional Results:\n{}", _TC.getAdditionalResults());
log.info("*********************************************************");
@ -124,7 +129,11 @@ public class TranslationContextPrinter {
log.info("*********************************************************");
}
public String prettifyG2R(Map<String, Map<String, Set<String>>> map, String startIdent) {
public static void separator() {
log.info("*********************************************************");
}
public static String prettifyG2R(Map<String, Map<String, Set<String>>> map, String startIdent) {
StringBuilder sb = new StringBuilder();
String ident2 = startIdent+" ";
String ident3 = startIdent+" ";
@ -147,7 +156,7 @@ public class TranslationContextPrinter {
return sb.toString();
}
protected Map<String,List<String>> map2string(Map map) {
public static Map<String,List<String>> map2string(Map map) {
if (map==null) return null;
Map<String,List<String>> newMap = new HashMap<>();
for (Object key : map.keySet()) {
@ -172,7 +181,7 @@ public class TranslationContextPrinter {
return newMap;
}
protected Collection<String> getFunctionNames(Collection<FunctionDefinition> col) {
public static Collection<String> getFunctionNames(Collection<FunctionDefinition> col) {
return col.stream()
.map(FunctionDefinition::getName)
.collect(Collectors.toList());