Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ t.*
Chart.lock
yarn.lock
package-lock.json
datavines-server/src/main/resources/static
datavines-server/src/main/resources/static
design/
14 changes: 0 additions & 14 deletions datavines-dist/src/main/assembly/datavines-bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,7 @@
<useProjectArtifact>true</useProjectArtifact>
<includes>
<include>io.datavines:datavines-common</include>
<include>io.datavines:datavines-spi</include>
<include>io.datavines:datavines-engine-spark-api</include>
<include>io.datavines:datavines-engine-spark-connector-jdbc</include>
<include>io.datavines:datavines-engine-spark-connector-mongodb</include>
<include>io.datavines:datavines-engine-core</include>
<include>io.datavines:datavines-engine-common</include>
<include>io.datavines:datavines-engine-spark-transform-sql</include>
<include>io.datavines:datavines-engine-api</include>
<include>com.mysql:mysql-connector-j</include>
<include>org.apache.httpcomponents:httpclient</include>
<include>org.apache.httpcomponents:httpcore</include>
Expand All @@ -127,14 +120,7 @@
<useProjectArtifact>true</useProjectArtifact>
<includes>
<include>io.datavines:datavines-common</include>
<include>io.datavines:datavines-spi</include>
<include>io.datavines:datavines-engine-flink-api</include>
<include>io.datavines:datavines-engine-flink-connector-jdbc</include>
<!-- <include>io.datavines:datavines-engine-flink-connector-mongodb</include>-->
<include>io.datavines:datavines-engine-core</include>
<include>io.datavines:datavines-engine-common</include>
<include>io.datavines:datavines-engine-flink-transform-sql</include>
<include>io.datavines:datavines-engine-api</include>
<include>com.mysql:mysql-connector-j</include>
<include>org.apache.httpcomponents:httpclient</include>
<include>org.apache.httpcomponents:httpcore</include>
Expand Down
31 changes: 0 additions & 31 deletions datavines-engine/datavines-engine-api/pom.xml

This file was deleted.

38 changes: 0 additions & 38 deletions datavines-engine/datavines-engine-common/.gitignore

This file was deleted.

38 changes: 0 additions & 38 deletions datavines-engine/datavines-engine-common/pom.xml

This file was deleted.

10 changes: 2 additions & 8 deletions datavines-engine/datavines-engine-config/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,8 @@

<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-engine-api</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-engine-common</artifactId>
<artifactId>datavines-engine-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
11 changes: 1 addition & 10 deletions datavines-engine/datavines-engine-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,4 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>datavines-engine-core</artifactId>

<dependencies>
<dependency>
<groupId>io.datavines</groupId>
<artifactId>datavines-engine-api</artifactId>
</dependency>

</dependencies>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,35 @@
*/
package io.datavines.engine.core;

import io.datavines.common.config.DataVinesJobConfig;
import io.datavines.common.entity.ProcessResult;
import io.datavines.common.enums.ExecutionStatus;

import org.apache.commons.lang3.exception.ExceptionUtils;

import java.util.List;
import io.datavines.common.config.CheckResult;
import io.datavines.common.config.ConfigRuntimeException;
import io.datavines.engine.api.component.Component;
import io.datavines.engine.api.env.Execution;
import io.datavines.engine.api.env.RuntimeEnvironment;
import io.datavines.engine.core.config.ConfigParser;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public abstract class BaseDataVinesBootstrap {

private static final Logger logger = LoggerFactory.getLogger(BaseDataVinesBootstrap.class);

private Execution execution;

protected abstract RuntimeEnvironment createRuntimeEnvironment(DataVinesJobConfig config) throws Exception;

protected abstract List<Component> createSources(DataVinesJobConfig config) throws Exception;

protected abstract List<Component> createTransforms(DataVinesJobConfig config) throws Exception;

protected abstract List<Component> createSinks(DataVinesJobConfig config) throws Exception;

public ProcessResult execute(String[] args) {
if (args.length == 1) {
try {
Expand Down Expand Up @@ -65,12 +73,14 @@ public void stop() {

private void parseConfigAndExecute(String configFile) throws Exception {
ConfigParser configParser = new ConfigParser(configFile);
List<Component> sources = configParser.getSourcePlugins();
List<Component> transforms = configParser.getTransformPlugins();
List<Component> sinks = configParser.getSinkPlugins();
execution = configParser.getRuntimeEnvironment().getExecution();
DataVinesJobConfig jobConfig = configParser.getConfig();
RuntimeEnvironment runtimeEnvironment = createRuntimeEnvironment(jobConfig);
List<Component> sources = createSources(jobConfig);
List<Component> transforms = createTransforms(jobConfig);
List<Component> sinks = createSinks(jobConfig);
execution = runtimeEnvironment.getExecution();
checkConfig(sources, transforms, sinks);
prepare(configParser.getRuntimeEnvironment(), sources, transforms, sinks);
prepare(runtimeEnvironment, sources, transforms, sinks);
if (execution == null) {
throw new Exception("can not create execution , please check the config");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,13 @@
*/
package io.datavines.engine.core.config;

import io.datavines.common.config.Config;
import io.datavines.common.config.ConfigRuntimeException;
import io.datavines.common.config.DataVinesJobConfig;
import io.datavines.common.config.EnvConfig;
import io.datavines.engine.api.component.Component;
import io.datavines.engine.api.env.RuntimeEnvironment;
import io.datavines.engine.core.utils.JsonUtils;
import io.datavines.spi.PluginDiscovery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

import static io.datavines.engine.api.EngineConstants.PLUGIN_TYPE;
import static io.datavines.engine.api.EngineConstants.TYPE;

public class ConfigParser {

private static final Logger logger = LoggerFactory.getLogger(ConfigParser.class);
Expand All @@ -43,13 +33,10 @@ public class ConfigParser {

private final EnvConfig envConfig;

private final RuntimeEnvironment env;

public ConfigParser(String configFile){
this.configFile = configFile;
this.config = load();
this.envConfig = config.getEnvConfig();
this.env = createRuntimeEnvironment();
}

private DataVinesJobConfig load() {
Expand All @@ -67,60 +54,11 @@ private DataVinesJobConfig load() {
return config;
}

private RuntimeEnvironment createRuntimeEnvironment() {
RuntimeEnvironment env = PluginDiscovery.getMultiKeyPluginDiscovery(RuntimeEnvironment.class, RuntimeEnvironment::getPluginNames)

.getNewPlugin(envConfig.getEngine());
Config config = new Config(envConfig.getConfig());
config.put(TYPE, envConfig.getType());
env.setConfig(config);
env.prepare();
return env;
}

public RuntimeEnvironment getRuntimeEnvironment() {
return env;
}

public List<Component> getSourcePlugins() {
List<Component> sourcePluginList = new ArrayList<>();
config.getSourceParameters().forEach(sourceConfig -> {
String pluginName = String.format("%s-%s-%s-source", envConfig.getEngine(), envConfig.getType(), sourceConfig.getPlugin());
Component component = PluginDiscovery.getMultiKeyPluginDiscovery(Component.class, Component::getPluginNames)

.getNewPlugin(pluginName);
sourceConfig.getConfig().put(PLUGIN_TYPE, sourceConfig.getType());
component.setConfig(new Config(sourceConfig.getConfig()));
sourcePluginList.add(component);
});
return sourcePluginList;
}

public List<Component> getSinkPlugins() {
List<Component> sinkPluginList = new ArrayList<>();
config.getSinkParameters().forEach(sinkConfig -> {
String pluginName = String.format("%s-%s-%s-sink", envConfig.getEngine(), envConfig.getType(), sinkConfig.getPlugin());
Component component = PluginDiscovery.getMultiKeyPluginDiscovery(Component.class, Component::getPluginNames)

.getNewPlugin(pluginName);
sinkConfig.getConfig().put(PLUGIN_TYPE, sinkConfig.getType());
component.setConfig(new Config(sinkConfig.getConfig()));
sinkPluginList.add(component);
});
return sinkPluginList;
public DataVinesJobConfig getConfig() {
return config;
}

public List<Component> getTransformPlugins() {
List<Component> transformPluginList = new ArrayList<>();
config.getTransformParameters().forEach(transformConfig -> {
String pluginName = String.format("%s-%s-%s-transform", envConfig.getEngine(), envConfig.getType(), transformConfig.getPlugin());
Component component = PluginDiscovery.getMultiKeyPluginDiscovery(Component.class, Component::getPluginNames)

.getNewPlugin(pluginName);
transformConfig.getConfig().put(PLUGIN_TYPE, transformConfig.getType());
component.setConfig(new Config(transformConfig.getConfig()));
transformPluginList.add(component);
});
return transformPluginList;
public EnvConfig getEnvConfig() {
return envConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.core.enums;

public enum ConnectorType {

JDBC("jdbc"),
MYSQL("mysql"),
FILE("file"),
MONGODB("mongodb");

private final String code;

ConnectorType(String code) {
this.code = code;
}

public static ConnectorType of(String code) {
for (ConnectorType connectorType : values()) {
if (connectorType.code.equalsIgnoreCase(code)) {
return connectorType;
}
}
throw new IllegalArgumentException("Unknown connector type: " + code);
}
}
Loading
Loading