From 0127658ea9d9ebf1d0a469c9f526353f9c4f5dc9 Mon Sep 17 00:00:00 2001
From: xxzuo <1293378490@qq.com>
Date: Wed, 15 Apr 2026 00:45:09 +0800
Subject: [PATCH 1/3] [Feature][Server] support postgresql registry
---
README.md | 8 +
.../src/main/assembly/datavines-bin.xml | 1 +
.../datavines-registry-postgresql/pom.xml | 40 +
.../registry/plugin/PostgresqlMutex.java | 209 ++++
.../registry/plugin/PostgresqlRegistry.java | 113 +++
.../plugin/PostgresqlServerStateManager.java | 267 ++++++
.../registry/plugin/RegistryLock.java | 35 +
.../io.datavines.registry.api.Registry | 1 +
.../datavines-registry-plugins/pom.xml | 3 +-
datavines-server/pom.xml | 54 +-
.../server/api/config/MybatisPlusConfig.java | 20 +-
.../CatalogMetaDataFetchExecutorImpl.java | 25 +-
.../CatalogEntityMetricJobRelMapper.xml | 6 +-
.../resources/mapper/DataSourceMapper.xml | 6 +-
.../mapper/ErrorDataStorageMapper.xml | 8 +-
.../resources/mapper/JobExecutionMapper.xml | 10 +-
.../src/main/resources/mapper/JobMapper.xml | 14 +-
.../main/resources/mapper/SlaJobMapper.xml | 4 +-
scripts/sql/datavines-postgresql.sql | 892 ++++++++++++++++++
19 files changed, 1662 insertions(+), 54 deletions(-)
create mode 100644 datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/pom.xml
create mode 100644 datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlMutex.java
create mode 100644 datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlRegistry.java
create mode 100644 datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlServerStateManager.java
create mode 100644 datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/RegistryLock.java
create mode 100644 datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/resources/META-INF/services/io.datavines.registry.api.Registry
create mode 100644 scripts/sql/datavines-postgresql.sql
diff --git a/README.md b/README.md
index 61b49152f..894b17b3c 100644
--- a/README.md
+++ b/README.md
@@ -32,6 +32,14 @@ Need: Maven 3.6.1 and later
```sh
$ mvn clean package -Prelease -DskipTests
```
+
+## Metadata Database
+
+- The default `datavines-server` configuration uses `PostgreSQL`
+- Initialize a PostgreSQL metadata database with `scripts/sql/datavines-postgresql.sql`
+- If you want to run with `MySQL`, switch to the `mysql` Spring profile and initialize with `scripts/sql/datavines-mysql.sql`
+- The release package now contains both scripts in `${DATAVINES_HOME}/scripts/`
+
## Features
### Data Catalog
diff --git a/datavines-dist/src/main/assembly/datavines-bin.xml b/datavines-dist/src/main/assembly/datavines-bin.xml
index 0f9e8e6f6..47a392086 100644
--- a/datavines-dist/src/main/assembly/datavines-bin.xml
+++ b/datavines-dist/src/main/assembly/datavines-bin.xml
@@ -80,6 +80,7 @@
${basedir}/../scripts/sql
datavines-mysql.sql
+ datavines-postgresql.sql
./scripts
diff --git a/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/pom.xml b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/pom.xml
new file mode 100644
index 000000000..cf888af81
--- /dev/null
+++ b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/pom.xml
@@ -0,0 +1,40 @@
+
+
+
+
+
+ datavines-registry-plugins
+ io.datavines
+ 1.0.0-SNAPSHOT
+
+ 4.0.0
+
+ datavines-registry-postgresql
+
+
+
+ com.zaxxer
+ HikariCP
+
+
+
+
diff --git a/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlMutex.java b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlMutex.java
new file mode 100644
index 000000000..3dee45892
--- /dev/null
+++ b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlMutex.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.datavines.registry.plugin;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.datavines.common.utils.ConnectionUtils;
+import io.datavines.common.utils.NetUtils;
+import io.datavines.common.utils.ThreadUtils;
+import io.datavines.registry.api.ServerInfo;
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class PostgresqlMutex {
+
+ public static final long LOCK_ACQUIRE_INTERVAL = 1000;
+
+ private final long expireTimeWindow = 600;
+
+ private Connection connection;
+
+ private final Properties properties;
+
+ private final ServerInfo serverInfo;
+
+ private final ConcurrentHashMap lockHoldMap;
+
+ public PostgresqlMutex(Connection connection, Properties properties) throws SQLException {
+ this.connection = connection;
+ this.properties = properties;
+ this.serverInfo = new ServerInfo(NetUtils.getHost(), Integer.valueOf((String) properties.get("server.port")));
+ this.lockHoldMap = new ConcurrentHashMap<>();
+ ScheduledExecutorService lockTermUpdateThreadPool = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("RegistryLockRefreshThread").setDaemon(true).build());
+
+ lockTermUpdateThreadPool.scheduleWithFixedDelay(
+ new LockTermRefreshTask(lockHoldMap),
+ 2,
+ 2,
+ TimeUnit.SECONDS);
+
+ clearExpireLock();
+ }
+
+ public boolean acquire(String lockKey, long time) {
+ RegistryLock lock = lockHoldMap.computeIfAbsent(lockKey, key -> {
+ RegistryLock registryLock = null;
+ int count = 1;
+ if (time > 0) {
+ count = Math.max(1, (int) (time * 1000 / LOCK_ACQUIRE_INTERVAL));
+ }
+ while (count > 0) {
+ try {
+ registryLock = executeInsert(key);
+ log.debug("Acquire the lock success, {}", key);
+ count = 0;
+ } catch (SQLException e) {
+ log.error("Acquire the lock error, {}, try again!", e.getLocalizedMessage());
+ try {
+ clearExpireLock();
+ } catch (SQLException ex) {
+ log.error("clear expire lock error : ", ex);
+ }
+ ThreadUtils.sleep(LOCK_ACQUIRE_INTERVAL);
+ count--;
+ }
+ }
+
+ return registryLock;
+ });
+
+ return lock != null;
+ }
+
+ public boolean release(String lockKey) throws SQLException {
+ RegistryLock registryLock = lockHoldMap.get(lockKey);
+ if (registryLock != null) {
+ try {
+ executeDelete(lockKey);
+ lockHoldMap.remove(lockKey);
+ } catch (SQLException e) {
+ log.error(String.format("Release lock: %s error", lockKey), e);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public void close() throws SQLException {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ private RegistryLock executeInsert(String key) throws SQLException {
+ checkConnection();
+ Timestamp updateTime = new Timestamp(System.currentTimeMillis());
+ PreparedStatement preparedStatement = connection.prepareStatement(
+ "insert into dv_registry_lock (lock_key,lock_owner,update_time) values (?,?,?)");
+ preparedStatement.setString(1, key);
+ preparedStatement.setString(2, this.serverInfo.getAddr());
+ preparedStatement.setTimestamp(3, updateTime);
+ preparedStatement.executeUpdate();
+ preparedStatement.close();
+ return new RegistryLock(key, this.serverInfo.getAddr(), updateTime);
+ }
+
+ private RegistryLock executeUpdate(String key) throws SQLException {
+ checkConnection();
+ Timestamp updateTime = new Timestamp(System.currentTimeMillis());
+ PreparedStatement preparedStatement = connection.prepareStatement(
+ "update dv_registry_lock set update_time = ? where lock_key = ?");
+ preparedStatement.setTimestamp(1, updateTime);
+ preparedStatement.setString(2, key);
+ preparedStatement.executeUpdate();
+ preparedStatement.close();
+ return new RegistryLock(key, this.serverInfo.getAddr(), updateTime);
+ }
+
+ private void executeDelete(String key) throws SQLException {
+ checkConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(
+ "delete from dv_registry_lock where lock_key = ?");
+ preparedStatement.setString(1, key);
+ if (preparedStatement.executeUpdate() > 0) {
+ lockHoldMap.remove(key);
+ }
+ preparedStatement.close();
+ }
+
+ private void clearExpireLock() throws SQLException {
+ checkConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(
+ "delete from dv_registry_lock where update_time < ?");
+ preparedStatement.setTimestamp(1, new Timestamp(System.currentTimeMillis() - expireTimeWindow));
+ preparedStatement.executeUpdate();
+ preparedStatement.close();
+ lockHoldMap.values().removeIf((v -> v.getUpdateTime().getTime() < (System.currentTimeMillis() - expireTimeWindow)));
+ }
+
+ private void checkConnection() throws SQLException {
+ if (connection == null || connection.isClosed()) {
+ connection = ConnectionUtils.getConnection(properties);
+ }
+ }
+
+ @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+ class LockTermRefreshTask implements Runnable {
+
+ private final Map lockHoldMap;
+
+ @Override
+ public void run() {
+ try {
+ if (lockHoldMap.isEmpty()) {
+ return;
+ }
+
+ List lockKeys = new ArrayList<>();
+ for (RegistryLock lock : lockHoldMap.values()) {
+ if (lock != null) {
+ lockKeys.add(lock.getLockKey());
+ }
+ }
+ lockKeys.forEach(lockKey -> {
+ try {
+ RegistryLock registryLock = executeUpdate(lockKey);
+ lockHoldMap.put(lockKey, registryLock);
+ } catch (SQLException e) {
+ log.warn("Update the lock: {} term failed.", lockKey);
+ }
+ });
+
+ clearExpireLock();
+ } catch (Exception e) {
+ log.error("Update lock term error", e);
+ }
+ }
+ }
+}
diff --git a/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlRegistry.java b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlRegistry.java
new file mode 100644
index 000000000..d214f3306
--- /dev/null
+++ b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlRegistry.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.datavines.registry.plugin;
+
+import io.datavines.common.utils.ConnectionUtils;
+import io.datavines.registry.api.ConnectionListener;
+import io.datavines.registry.api.Registry;
+import io.datavines.registry.api.ServerInfo;
+import io.datavines.registry.api.SubscribeListener;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+
+@Slf4j
+public class PostgresqlRegistry implements Registry {
+
+ private PostgresqlMutex postgresqlMutex;
+
+ private PostgresqlServerStateManager postgresqlServerStateManager;
+
+ @Override
+ public void init(Properties properties) throws Exception {
+
+ Connection connection = ConnectionUtils.getConnection(properties);
+
+ if (connection == null) {
+ throw new Exception("can not create connection");
+ }
+
+ try {
+ postgresqlMutex = new PostgresqlMutex(connection, properties);
+ postgresqlServerStateManager = new PostgresqlServerStateManager(connection, properties);
+ } catch (SQLException exception) {
+ log.error("init postgresql mutex error: " + exception.getLocalizedMessage());
+ }
+ }
+
+ @Override
+ public boolean acquire(String key, long timeout) {
+ try {
+ return postgresqlMutex.acquire(key, timeout);
+ } catch (Exception e) {
+ log.warn("acquire lock error: ", e);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean release(String key) {
+ try {
+ return postgresqlMutex.release(key);
+ } catch (Exception e) {
+ log.warn("acquire lock error: ", e);
+ return false;
+ }
+ }
+
+ @Override
+ public void subscribe(String key, SubscribeListener subscribeListener) {
+ try {
+ postgresqlServerStateManager.registry(subscribeListener);
+ } catch (Exception e) {
+ log.warn("subscribe error: ", e);
+ }
+ }
+
+ @Override
+ public void unSubscribe(String key) {
+ try {
+ postgresqlServerStateManager.unRegistry();
+ } catch (Exception e) {
+ log.warn("unSubscribe error: ", e);
+ }
+ }
+
+ @Override
+ public void addConnectionListener(ConnectionListener connectionListener) {
+
+ }
+
+ @Override
+ public List getActiveServerList() {
+ return postgresqlServerStateManager.getActiveServerList();
+ }
+
+ @Override
+ public void close() throws SQLException {
+ postgresqlMutex.close();
+ postgresqlServerStateManager.close();
+ }
+
+ @Override
+ public String getPluginName() {
+ return "postgresql";
+ }
+}
diff --git a/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlServerStateManager.java b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlServerStateManager.java
new file mode 100644
index 000000000..842b591b1
--- /dev/null
+++ b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlServerStateManager.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.datavines.registry.plugin;
+
+import io.datavines.common.utils.ConnectionUtils;
+import io.datavines.common.utils.NetUtils;
+import io.datavines.common.utils.Stopper;
+import io.datavines.registry.api.Event;
+import io.datavines.registry.api.ServerInfo;
+import io.datavines.registry.api.SubscribeListener;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class PostgresqlServerStateManager {
+
+ private Connection connection;
+
+ private final ConcurrentHashMap liveServerMap = new ConcurrentHashMap<>();
+
+ private Set deadServers = new HashSet<>();
+
+ private SubscribeListener subscribeListener;
+
+ private final ServerInfo serverInfo;
+
+ private final Properties properties;
+
+ public PostgresqlServerStateManager(Connection connection, Properties properties) throws SQLException {
+ this.connection = connection;
+ this.properties = properties;
+ serverInfo = new ServerInfo(NetUtils.getHost(), Integer.valueOf((String) properties.get("server.port")),
+ new Timestamp(System.currentTimeMillis()), new Timestamp(System.currentTimeMillis()));
+ ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
+ executorService.scheduleAtFixedRate(new HeartBeater(), 2, 2, TimeUnit.SECONDS);
+ executorService.scheduleAtFixedRate(new ServerChecker(), 5, 10, TimeUnit.SECONDS);
+ }
+
+ public void registry(SubscribeListener subscribeListener) throws SQLException {
+
+ if (isExists(serverInfo)) {
+ executeUpdate(serverInfo);
+ } else {
+ executeInsert(serverInfo);
+ }
+
+ liveServerMap.putAll(fetchServers());
+ this.subscribeListener = subscribeListener;
+ refreshServer();
+ }
+
+ public void unRegistry() throws SQLException {
+ executeDelete(serverInfo);
+ }
+
+ public void refreshServer() throws SQLException {
+ ConcurrentHashMap newServers = fetchServers();
+ Set offlineServer = new HashSet<>();
+ if (newServers == null) {
+ return;
+ }
+ Set onlineServer = new HashSet<>();
+ newServers.forEach((k, v) -> {
+ long updateTime = v.getUpdateTime().getTime();
+ long now = System.currentTimeMillis();
+ if (now - updateTime > 20000) {
+ offlineServer.add(k);
+ } else {
+ onlineServer.add(k);
+ }
+ });
+
+ liveServerMap.forEach((k, v) -> {
+ if (newServers.get(k) == null) {
+ offlineServer.add(k);
+ }
+ });
+
+ offlineServer.forEach(x -> {
+ if (!deadServers.contains(x) && !x.equals(serverInfo.getAddr())) {
+ String[] values = x.split(":");
+ try {
+ executeDelete(new ServerInfo(values[0], Integer.valueOf(values[1])));
+ liveServerMap.remove(x);
+ } catch (SQLException e) {
+ log.error("delete server info error", e);
+ }
+ subscribeListener.notify(Event.builder().key(x).type(Event.Type.REMOVE).build());
+ }
+ });
+
+ deadServers.addAll(offlineServer);
+
+ deadServers = deadServers
+ .stream()
+ .filter(x -> !onlineServer.contains(x))
+ .collect(Collectors.toSet());
+
+ onlineServer.forEach(x -> {
+ if (liveServerMap.isEmpty()) {
+ return;
+ }
+ if (liveServerMap.get(x) == null && !x.equals(serverInfo.getAddr())) {
+ liveServerMap.put(x, newServers.get(x));
+ subscribeListener.notify(Event.builder().key(x).type(Event.Type.ADD).build());
+ }
+ });
+ }
+
+ private void executeInsert(ServerInfo serverInfo) throws SQLException {
+ checkConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(
+ "insert into dv_server (host, port) values (?, ?)");
+ preparedStatement.setString(1, serverInfo.getHost());
+ preparedStatement.setInt(2, serverInfo.getServerPort());
+ preparedStatement.executeUpdate();
+ preparedStatement.close();
+ }
+
+ private void executeUpdate(ServerInfo serverInfo) throws SQLException {
+ checkConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(
+ "update dv_server set update_time = ? where host = ? and port = ?");
+ preparedStatement.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
+ preparedStatement.setString(2, serverInfo.getHost());
+ preparedStatement.setInt(3, serverInfo.getServerPort());
+ preparedStatement.executeUpdate();
+ preparedStatement.close();
+ }
+
+ private void executeDelete(ServerInfo serverInfo) throws SQLException {
+ checkConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(
+ "delete from dv_server where host = ? and port = ?");
+ preparedStatement.setString(1, serverInfo.getHost());
+ preparedStatement.setInt(2, serverInfo.getServerPort());
+ preparedStatement.executeUpdate();
+ preparedStatement.close();
+ }
+
+ private boolean isExists(ServerInfo serverInfo) throws SQLException {
+ checkConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(
+ "select * from dv_server where host= ? and port= ?",
+ ResultSet.TYPE_SCROLL_INSENSITIVE,
+ ResultSet.CONCUR_READ_ONLY);
+ preparedStatement.setString(1, serverInfo.getHost());
+ preparedStatement.setInt(2, serverInfo.getServerPort());
+ ResultSet resultSet = preparedStatement.executeQuery();
+
+ if (resultSet == null) {
+ preparedStatement.close();
+ return false;
+ }
+ boolean result = resultSet.first();
+ resultSet.close();
+ preparedStatement.close();
+ return result;
+ }
+
+ private ConcurrentHashMap fetchServers() throws SQLException {
+ checkConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement("select * from dv_server");
+ ResultSet resultSet = preparedStatement.executeQuery();
+
+ if (resultSet == null) {
+ preparedStatement.close();
+ return null;
+ }
+
+ ConcurrentHashMap map = new ConcurrentHashMap<>();
+ while (resultSet.next()) {
+ String host = resultSet.getString("host");
+ int port = resultSet.getInt("port");
+ Timestamp updateTime = resultSet.getTimestamp("update_time");
+ Timestamp createTime = resultSet.getTimestamp("create_time");
+ map.put(host + ":" + port, new ServerInfo(host, port, createTime, updateTime));
+ }
+ resultSet.close();
+ preparedStatement.close();
+ return map;
+ }
+
+ public List getActiveServerList() {
+ List activeServerList = new ArrayList<>();
+ liveServerMap.forEach((k, v) -> {
+ String[] values = k.split(":");
+ if (values.length == 2) {
+ activeServerList.add(v);
+ }
+ });
+ return activeServerList;
+ }
+
+ class HeartBeater implements Runnable {
+
+ @Override
+ public void run() {
+ if (Stopper.isRunning()) {
+ try {
+ if (isExists(serverInfo)) {
+ executeUpdate(serverInfo);
+ } else {
+ executeInsert(serverInfo);
+ }
+ } catch (SQLException e) {
+ log.error("heartbeat error", e);
+ }
+ }
+ }
+ }
+
+ class ServerChecker implements Runnable {
+
+ @Override
+ public void run() {
+ if (Stopper.isRunning()) {
+ try {
+ refreshServer();
+ } catch (SQLException e) {
+ log.error("server check error", e);
+ }
+ }
+ }
+ }
+
+ private void checkConnection() throws SQLException {
+ if (connection == null || connection.isClosed()) {
+ connection = ConnectionUtils.getConnection(properties);
+ }
+ }
+
+ public void close() throws SQLException {
+ if (connection != null && !connection.isClosed()) {
+ connection.close();
+ }
+ }
+}
diff --git a/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/RegistryLock.java b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/RegistryLock.java
new file mode 100644
index 000000000..dfb36d7c9
--- /dev/null
+++ b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/RegistryLock.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.datavines.registry.plugin;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.sql.Timestamp;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class RegistryLock {
+
+ private String lockKey;
+
+ private String lockOwner;
+
+ private Timestamp updateTime;
+}
diff --git a/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/resources/META-INF/services/io.datavines.registry.api.Registry b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/resources/META-INF/services/io.datavines.registry.api.Registry
new file mode 100644
index 000000000..b193e993c
--- /dev/null
+++ b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/resources/META-INF/services/io.datavines.registry.api.Registry
@@ -0,0 +1 @@
+io.datavines.registry.plugin.PostgresqlRegistry
diff --git a/datavines-registry/datavines-registry-plugins/pom.xml b/datavines-registry/datavines-registry-plugins/pom.xml
index 7d1069175..e9a4b3821 100644
--- a/datavines-registry/datavines-registry-plugins/pom.xml
+++ b/datavines-registry/datavines-registry-plugins/pom.xml
@@ -32,6 +32,7 @@
pom
datavines-registry-mysql
+ datavines-registry-postgresql
datavines-registry-zookeeper
@@ -49,4 +50,4 @@
-
\ No newline at end of file
+
diff --git a/datavines-server/pom.xml b/datavines-server/pom.xml
index b05280921..287cddbd0 100644
--- a/datavines-server/pom.xml
+++ b/datavines-server/pom.xml
@@ -1,21 +1,21 @@
${project.version}
-
- io.datavines
- datavines-engine-spark-executor
- ${project.version}
-
+
+ io.datavines
+ datavines-engine-spark-executor
+ ${project.version}
+
io.datavines
@@ -320,12 +320,12 @@
${project.version}
-
- io.datavines
- datavines-engine-local-executor
- ${project.version}
-
-
+
+ io.datavines
+ datavines-engine-local-executor
+ ${project.version}
+
+
slf4j-log4j12
org.slf4j
@@ -342,12 +342,12 @@
${project.version}
-
- io.datavines
- datavines-engine-flink-executor
- ${project.version}
-
-
+
+ io.datavines
+ datavines-engine-flink-executor
+ ${project.version}
+
+
org.apache.hadoop
hadoop-common
@@ -366,9 +366,9 @@
-
- io.datavines
- datavines-connector-all
+
+ io.datavines
+ datavines-connector-all
${project.version}
@@ -509,6 +509,12 @@
${project.version}
+
+ io.datavines
+ datavines-registry-postgresql
+ ${project.version}
+
+
io.datavines
datavines-registry-zookeeper
diff --git a/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java b/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java
index db5c999ca..3922590be 100644
--- a/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java
+++ b/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java
@@ -19,16 +19,32 @@
import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
+import com.zaxxer.hikari.HikariDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import javax.sql.DataSource;
+
@Configuration
public class MybatisPlusConfig {
@Bean
- public MybatisPlusInterceptor mybatisPlusInterceptor() {
+ public MybatisPlusInterceptor mybatisPlusInterceptor(DataSource dataSource) {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
- interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
+ interceptor.addInnerInterceptor(new PaginationInnerInterceptor(resolveDbType(dataSource)));
return interceptor;
}
+
+ private DbType resolveDbType(DataSource dataSource) {
+ if (dataSource instanceof HikariDataSource) {
+ HikariDataSource hikariDataSource = (HikariDataSource) dataSource;
+ String jdbcUrl = hikariDataSource.getJdbcUrl();
+ String driverClassName = hikariDataSource.getDriverClassName();
+ if ((jdbcUrl != null && jdbcUrl.contains("postgresql"))
+ || (driverClassName != null && driverClassName.contains("postgresql"))) {
+ return DbType.POSTGRE_SQL;
+ }
+ }
+ return DbType.MYSQL;
+ }
}
diff --git a/datavines-server/src/main/java/io/datavines/server/scheduler/metadata/task/CatalogMetaDataFetchExecutorImpl.java b/datavines-server/src/main/java/io/datavines/server/scheduler/metadata/task/CatalogMetaDataFetchExecutorImpl.java
index 606e1e44e..854ce2b88 100644
--- a/datavines-server/src/main/java/io/datavines/server/scheduler/metadata/task/CatalogMetaDataFetchExecutorImpl.java
+++ b/datavines-server/src/main/java/io/datavines/server/scheduler/metadata/task/CatalogMetaDataFetchExecutorImpl.java
@@ -80,7 +80,7 @@ public CatalogMetaDataFetchExecutorImpl(CommonTaskRequest request) {
this.dataSource = request.getDataSource();
this.connectorFactory = PluginDiscovery.getMultiKeyPluginDiscovery(ConnectorFactory.class, ConnectorFactory::getPluginNames)
-
+
.getOrCreatePlugin(dataSource.getType());
this.instanceService = SpringApplicationContext.getBean(CatalogEntityInstanceService.class);
@@ -181,8 +181,7 @@ private void executeFetchDataSource() throws SQLException {
if (CollectionUtils.isNotEmpty(createDatabaseEntityList)) {
for (String database : createDatabaseEntityList) {
DatabaseInfo databaseInfo = databaseInfoMap.get(database);
- if ("sys".equals(databaseInfo.getName()) || "information_schema".equals(databaseInfo.getName()) ||
- "performance_schema".equals(databaseInfo.getName()) || "mysql".equals(databaseInfo.getName())) {
+ if (isSystemDatabase(dataSource.getType(), databaseInfo.getName())) {
continue;
}
@@ -712,4 +711,24 @@ private boolean isTypeChange(String oldType, String newType) {
return StringUtils.isNotEmpty(newType) && StringUtils.isNotEmpty(oldType) && !oldType.equals(newType);
}
+
+ private boolean isSystemDatabase(String dataSourceType, String databaseName) {
+ if (StringUtils.isEmpty(databaseName)) {
+ return true;
+ }
+
+ String normalizedDataSourceType = StringUtils.isEmpty(dataSourceType) ? "" : dataSourceType.toLowerCase();
+ String normalizedDatabaseName = databaseName.toLowerCase();
+
+ if ("postgresql".equals(normalizedDataSourceType) || "postgres".equals(normalizedDataSourceType)) {
+ return "postgres".equals(normalizedDatabaseName)
+ || "template0".equals(normalizedDatabaseName)
+ || "template1".equals(normalizedDatabaseName);
+ }
+
+ return "sys".equals(normalizedDatabaseName)
+ || "information_schema".equals(normalizedDatabaseName)
+ || "performance_schema".equals(normalizedDatabaseName)
+ || "mysql".equals(normalizedDatabaseName);
+ }
}
diff --git a/datavines-server/src/main/resources/mapper/CatalogEntityMetricJobRelMapper.xml b/datavines-server/src/main/resources/mapper/CatalogEntityMetricJobRelMapper.xml
index 70e9a8bfe..61ebc17fe 100644
--- a/datavines-server/src/main/resources/mapper/CatalogEntityMetricJobRelMapper.xml
+++ b/datavines-server/src/main/resources/mapper/CatalogEntityMetricJobRelMapper.xml
@@ -25,7 +25,7 @@
@@ -33,8 +33,8 @@
-
\ No newline at end of file
+
diff --git a/datavines-server/src/main/resources/mapper/DataSourceMapper.xml b/datavines-server/src/main/resources/mapper/DataSourceMapper.xml
index ff6f5d358..a8e211b31 100644
--- a/datavines-server/src/main/resources/mapper/DataSourceMapper.xml
+++ b/datavines-server/src/main/resources/mapper/DataSourceMapper.xml
@@ -25,13 +25,13 @@
-
\ No newline at end of file
+
diff --git a/datavines-server/src/main/resources/mapper/ErrorDataStorageMapper.xml b/datavines-server/src/main/resources/mapper/ErrorDataStorageMapper.xml
index eecabf55d..50508bd42 100644
--- a/datavines-server/src/main/resources/mapper/ErrorDataStorageMapper.xml
+++ b/datavines-server/src/main/resources/mapper/ErrorDataStorageMapper.xml
@@ -37,17 +37,17 @@
-
- select p.status as `name`, count(1) as `value` from () p GROUP BY p.status
+ select p.status as name, count(1) as value from () p GROUP BY p.status
@@ -140,8 +140,8 @@
select p.job_id,
count(1) as total_count,
- sum(if(status = 6, 1, 0)) as fail_count,
- sum(if(status = 7, 1, 0)) as success_count,
+ sum(case when status = 6 then 1 else 0 end) as fail_count,
+ sum(case when status = 7 then 1 else 0 end) as success_count,
max(start_time) as last_job_execution_time,
min(start_time) as first_job_execution_time
from dv_job_execution p
diff --git a/datavines-server/src/main/resources/mapper/JobMapper.xml b/datavines-server/src/main/resources/mapper/JobMapper.xml
index f3ef9475b..3b2d61e86 100644
--- a/datavines-server/src/main/resources/mapper/JobMapper.xml
+++ b/datavines-server/src/main/resources/mapper/JobMapper.xml
@@ -25,11 +25,11 @@
- select p.id, p.name, p.schema_name ,p.table_name,p.column_name, p.type, u.username as updater, p.update_time, s.cron_expression from () p left join `dv_user` u on u.id = p.create_by
- left join `dv_job_schedule` s on p.id = s.job_id and s.status = 1
+ select p.id, p.name, p.schema_name ,p.table_name,p.column_name, p.type, u.username as updater, p.update_time, s.cron_expression from () p left join dv_user u on u.id = p.create_by
+ left join dv_job_schedule s on p.id = s.job_id and s.status = 1
- LOWER(p.`name`) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%')
+ LOWER(p.name) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%')
order by p.update_time desc
@@ -38,11 +38,11 @@
select p.id, p.name, p.schema_name ,p.table_name,p.column_name, p.type, u.username as updater, p.update_time, s.cron_expression
- from () p left join `dv_user` u on u.id = p.update_by
- left join `dv_job_schedule` s on p.id = s.job_id and s.status = 1
+ from () p left join dv_user u on u.id = p.update_by
+ left join dv_job_schedule s on p.id = s.job_id and s.status = 1
- AND LOWER(p.`name`) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%')
+ AND LOWER(p.name) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%')
AND LOWER(p.schema_name) LIKE CONCAT(CONCAT('%', LOWER(#{schemaSearch})), '%')
@@ -64,4 +64,4 @@
-
\ No newline at end of file
+
diff --git a/datavines-server/src/main/resources/mapper/SlaJobMapper.xml b/datavines-server/src/main/resources/mapper/SlaJobMapper.xml
index 3fb68c2a5..14687ec30 100644
--- a/datavines-server/src/main/resources/mapper/SlaJobMapper.xml
+++ b/datavines-server/src/main/resources/mapper/SlaJobMapper.xml
@@ -63,10 +63,10 @@
ON dsj.job_id = dj.id
- LOWER(dj.`name`) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%')
+ LOWER(dj.name) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%')
ORDER BY dsj.update_time DESC
-
\ No newline at end of file
+
diff --git a/scripts/sql/datavines-postgresql.sql b/scripts/sql/datavines-postgresql.sql
new file mode 100644
index 000000000..0db97cf67
--- /dev/null
+++ b/scripts/sql/datavines-postgresql.sql
@@ -0,0 +1,892 @@
+DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS CASCADE;
+DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS CASCADE;
+DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS CASCADE;
+DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS CASCADE;
+DROP TABLE IF EXISTS QRTZ_TRIGGERS CASCADE;
+DROP TABLE IF EXISTS QRTZ_JOB_DETAILS CASCADE;
+DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS CASCADE;
+DROP TABLE IF EXISTS QRTZ_CALENDARS CASCADE;
+DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS CASCADE;
+DROP TABLE IF EXISTS QRTZ_LOCKS CASCADE;
+DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE CASCADE;
+CREATE TABLE QRTZ_JOB_DETAILS (
+ SCHED_NAME varchar(120) NOT NULL,
+ JOB_NAME varchar(200) NOT NULL,
+ JOB_GROUP varchar(200) NOT NULL,
+ DESCRIPTION varchar(250) DEFAULT NULL,
+ JOB_CLASS_NAME varchar(250) NOT NULL,
+ IS_DURABLE varchar(1) NOT NULL,
+ IS_NONCONCURRENT varchar(1) NOT NULL,
+ IS_UPDATE_DATA varchar(1) NOT NULL,
+ REQUESTS_RECOVERY varchar(1) NOT NULL,
+ JOB_DATA bytea,
+ PRIMARY KEY (SCHED_NAME, JOB_NAME, JOB_GROUP)
+);
+CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS (SCHED_NAME, REQUESTS_RECOVERY);
+CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS (SCHED_NAME, JOB_GROUP);
+CREATE TABLE QRTZ_TRIGGERS (
+ SCHED_NAME varchar(120) NOT NULL,
+ TRIGGER_NAME varchar(200) NOT NULL,
+ TRIGGER_GROUP varchar(200) NOT NULL,
+ JOB_NAME varchar(200) NOT NULL,
+ JOB_GROUP varchar(200) NOT NULL,
+ DESCRIPTION varchar(250) DEFAULT NULL,
+ NEXT_FIRE_TIME bigint DEFAULT NULL,
+ PREV_FIRE_TIME bigint DEFAULT NULL,
+ PRIORITY integer DEFAULT NULL,
+ TRIGGER_STATE varchar(16) NOT NULL,
+ TRIGGER_TYPE varchar(8) NOT NULL,
+ START_TIME bigint NOT NULL,
+ END_TIME bigint DEFAULT NULL,
+ CALENDAR_NAME varchar(200) DEFAULT NULL,
+ MISFIRE_INSTR smallint DEFAULT NULL,
+ JOB_DATA bytea,
+ PRIMARY KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
+ CONSTRAINT QRTZ_TRIGGERS_ibfk_1
+ FOREIGN KEY (SCHED_NAME, JOB_NAME, JOB_GROUP)
+ REFERENCES QRTZ_JOB_DETAILS (SCHED_NAME, JOB_NAME, JOB_GROUP)
+);
+CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS (SCHED_NAME, JOB_NAME, JOB_GROUP);
+CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS (SCHED_NAME, JOB_GROUP);
+CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS (SCHED_NAME, CALENDAR_NAME);
+CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_GROUP);
+CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_STATE);
+CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, TRIGGER_STATE);
+CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_GROUP, TRIGGER_STATE);
+CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS (SCHED_NAME, NEXT_FIRE_TIME);
+CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_STATE, NEXT_FIRE_TIME);
+CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS (SCHED_NAME, MISFIRE_INSTR, NEXT_FIRE_TIME);
+CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS (SCHED_NAME, MISFIRE_INSTR, NEXT_FIRE_TIME, TRIGGER_STATE);
+CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS (SCHED_NAME, MISFIRE_INSTR, NEXT_FIRE_TIME, TRIGGER_GROUP, TRIGGER_STATE);
+CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
+ SCHED_NAME varchar(120) NOT NULL,
+ TRIGGER_NAME varchar(200) NOT NULL,
+ TRIGGER_GROUP varchar(200) NOT NULL,
+ REPEAT_COUNT bigint NOT NULL,
+ REPEAT_INTERVAL bigint NOT NULL,
+ TIMES_TRIGGERED bigint NOT NULL,
+ PRIMARY KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
+ CONSTRAINT QRTZ_SIMPLE_TRIGGERS_ibfk_1
+ FOREIGN KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
+);
+CREATE TABLE QRTZ_CRON_TRIGGERS (
+ SCHED_NAME varchar(120) NOT NULL,
+ TRIGGER_NAME varchar(200) NOT NULL,
+ TRIGGER_GROUP varchar(200) NOT NULL,
+ CRON_EXPRESSION varchar(120) NOT NULL,
+ TIME_ZONE_ID varchar(80) DEFAULT NULL,
+ PRIMARY KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
+ CONSTRAINT QRTZ_CRON_TRIGGERS_ibfk_1
+ FOREIGN KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
+);
+CREATE TABLE QRTZ_SIMPROP_TRIGGERS (
+ SCHED_NAME varchar(120) NOT NULL,
+ TRIGGER_NAME varchar(200) NOT NULL,
+ TRIGGER_GROUP varchar(200) NOT NULL,
+ STR_PROP_1 varchar(512) DEFAULT NULL,
+ STR_PROP_2 varchar(512) DEFAULT NULL,
+ STR_PROP_3 varchar(512) DEFAULT NULL,
+ INT_PROP_1 integer DEFAULT NULL,
+ INT_PROP_2 integer DEFAULT NULL,
+ LONG_PROP_1 bigint DEFAULT NULL,
+ LONG_PROP_2 bigint DEFAULT NULL,
+ DEC_PROP_1 decimal(13,4) DEFAULT NULL,
+ DEC_PROP_2 decimal(13,4) DEFAULT NULL,
+ BOOL_PROP_1 varchar(1) DEFAULT NULL,
+ BOOL_PROP_2 varchar(1) DEFAULT NULL,
+ PRIMARY KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
+ CONSTRAINT QRTZ_SIMPROP_TRIGGERS_ibfk_1
+ FOREIGN KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
+);
+CREATE TABLE QRTZ_BLOB_TRIGGERS (
+ SCHED_NAME varchar(120) NOT NULL,
+ TRIGGER_NAME varchar(200) NOT NULL,
+ TRIGGER_GROUP varchar(200) NOT NULL,
+ BLOB_DATA bytea,
+ PRIMARY KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP),
+ CONSTRAINT QRTZ_BLOB_TRIGGERS_ibfk_1
+ FOREIGN KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP)
+);
+CREATE TABLE QRTZ_FIRED_TRIGGERS (
+ SCHED_NAME varchar(120) NOT NULL,
+ ENTRY_ID varchar(200) NOT NULL,
+ TRIGGER_NAME varchar(200) NOT NULL,
+ TRIGGER_GROUP varchar(200) NOT NULL,
+ INSTANCE_NAME varchar(200) NOT NULL,
+ FIRED_TIME bigint NOT NULL,
+ SCHED_TIME bigint NOT NULL,
+ PRIORITY integer NOT NULL,
+ STATE varchar(16) NOT NULL,
+ JOB_NAME varchar(200) DEFAULT NULL,
+ JOB_GROUP varchar(200) DEFAULT NULL,
+ IS_NONCONCURRENT varchar(1) DEFAULT NULL,
+ REQUESTS_RECOVERY varchar(1) DEFAULT NULL,
+ PRIMARY KEY (SCHED_NAME, ENTRY_ID)
+);
+CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS (SCHED_NAME, INSTANCE_NAME);
+CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS (SCHED_NAME, INSTANCE_NAME, REQUESTS_RECOVERY);
+CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS (SCHED_NAME, JOB_NAME, JOB_GROUP);
+CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS (SCHED_NAME, JOB_GROUP);
+CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP);
+CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS (SCHED_NAME, TRIGGER_GROUP);
+CREATE TABLE QRTZ_CALENDARS (
+ SCHED_NAME varchar(120) NOT NULL,
+ CALENDAR_NAME varchar(200) NOT NULL,
+ CALENDAR bytea NOT NULL,
+ PRIMARY KEY (SCHED_NAME, CALENDAR_NAME)
+);
+CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
+ SCHED_NAME varchar(120) NOT NULL,
+ TRIGGER_GROUP varchar(200) NOT NULL,
+ PRIMARY KEY (SCHED_NAME, TRIGGER_GROUP)
+);
+CREATE TABLE QRTZ_LOCKS (
+ SCHED_NAME varchar(120) NOT NULL,
+ LOCK_NAME varchar(40) NOT NULL,
+ PRIMARY KEY (SCHED_NAME, LOCK_NAME)
+);
+CREATE TABLE QRTZ_SCHEDULER_STATE (
+ SCHED_NAME varchar(120) NOT NULL,
+ INSTANCE_NAME varchar(200) NOT NULL,
+ LAST_CHECKIN_TIME bigint NOT NULL,
+ CHECKIN_INTERVAL bigint NOT NULL,
+ PRIMARY KEY (SCHED_NAME, INSTANCE_NAME)
+);
+-- ----------------------------
+-- Table structure for dv_actual_values
+-- ----------------------------
+DROP TABLE IF EXISTS dv_actual_values;
+CREATE TABLE dv_actual_values (
+ id bigserial,
+ job_execution_id bigint DEFAULT NULL,
+ metric_name varchar(255) DEFAULT NULL,
+ unique_code varchar(255) DEFAULT NULL,
+ actual_value decimal(20,4) DEFAULT NULL,
+ data_time timestamp DEFAULT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_common_task_command
+-- ----------------------------
+DROP TABLE IF EXISTS dv_common_task_command;
+CREATE TABLE dv_common_task_command (
+ id bigserial,
+ task_id bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_catalog_entity_definition
+-- ----------------------------
+DROP TABLE IF EXISTS dv_catalog_entity_definition;
+CREATE TABLE dv_catalog_entity_definition (
+ id bigserial,
+ uuid varchar(64) NOT NULL,
+ name varchar(255) NOT NULL,
+ description varchar(255) DEFAULT NULL,
+ properties text,
+ super_uuid varchar(64) NOT NULL DEFAULT '-1',
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ updated_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (uuid)
+);
+
+-- ----------------------------
+-- Table structure for dv_catalog_entity_instance
+-- ----------------------------
+DROP TABLE IF EXISTS dv_catalog_entity_instance;
+CREATE TABLE dv_catalog_entity_instance (
+ id bigserial,
+ uuid varchar(64) NOT NULL,
+ type varchar(127) NOT NULL,
+ datasource_id bigint NOT NULL,
+ fully_qualified_name varchar(255) NOT NULL,
+ display_name varchar(255) NOT NULL,
+ description varchar(1024) DEFAULT NULL,
+ properties text,
+ owner varchar(255) DEFAULT NULL,
+ version varchar(64) NOT NULL DEFAULT '1.0',
+ status varchar(255) DEFAULT 'active',
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ PRIMARY KEY (id),
+ UNIQUE (uuid),
+ UNIQUE (datasource_id,fully_qualified_name,status)
+);
+CREATE INDEX full_idx_display_name_description ON dv_catalog_entity_instance (display_name,description);
+
+-- ----------------------------
+-- Table structure for dv_catalog_entity_metric_job_rel
+-- ----------------------------
+DROP TABLE IF EXISTS dv_catalog_entity_metric_job_rel;
+CREATE TABLE dv_catalog_entity_metric_job_rel (
+ id bigserial,
+ entity_uuid varchar(64) NOT NULL,
+ metric_job_id bigint NOT NULL,
+ metric_job_type varchar(255) NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (entity_uuid,metric_job_id,metric_job_type)
+);
+
+-- ----------------------------
+-- Table structure for dv_catalog_entity_profile
+-- ----------------------------
+DROP TABLE IF EXISTS dv_catalog_entity_profile;
+CREATE TABLE dv_catalog_entity_profile (
+ id bigserial,
+ entity_uuid varchar(64) NOT NULL,
+ metric_name varchar(255) NOT NULL,
+ actual_value text NOT NULL,
+ actual_value_type varchar(255) DEFAULT NULL,
+ data_date varchar(255) DEFAULT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (entity_uuid,metric_name,data_date)
+);
+
+-- ----------------------------
+-- Table structure for dv_catalog_entity_rel
+-- ----------------------------
+DROP TABLE IF EXISTS dv_catalog_entity_rel;
+CREATE TABLE dv_catalog_entity_rel (
+ id bigserial,
+ entity1_uuid varchar(64) NOT NULL,
+ entity2_uuid varchar(64) NOT NULL,
+ type varchar(64) NOT NULL,
+ source_type varchar(64) DEFAULT NULL,
+ related_script text DEFAULT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ PRIMARY KEY (id),
+ UNIQUE (entity1_uuid,entity2_uuid,type)
+);
+CREATE INDEX idx_entity2_uuid ON dv_catalog_entity_rel (entity2_uuid);
+
+-- ----------------------------
+-- Table structure for dv_catalog_entity_tag_rel
+-- ----------------------------
+DROP TABLE IF EXISTS dv_catalog_entity_tag_rel;
+CREATE TABLE dv_catalog_entity_tag_rel (
+ id bigserial,
+ entity_uuid varchar(64) NOT NULL,
+ tag_uuid varchar(64) NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (entity_uuid,tag_uuid)
+);
+CREATE INDEX dv_catalog_entity_tag_rel_idx_entity2_uuid ON dv_catalog_entity_tag_rel (tag_uuid);
+
+-- ----------------------------
+-- Table structure for dv_catalog_schema_change
+-- ----------------------------
+DROP TABLE IF EXISTS dv_catalog_schema_change;
+CREATE TABLE dv_catalog_schema_change (
+ id bigserial,
+ parent_uuid varchar(64) NOT NULL,
+ entity_uuid varchar(64) NOT NULL,
+ change_type varchar(64) NOT NULL,
+ database_name varchar(255) DEFAULT NULL,
+ table_name varchar(255) DEFAULT NULL,
+ column_name varchar(255) DEFAULT NULL,
+ change_before text DEFAULT NULL,
+ change_after text DEFAULT NULL,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_catalog_tag
+-- ----------------------------
+DROP TABLE IF EXISTS dv_catalog_tag;
+CREATE TABLE dv_catalog_tag (
+ id bigserial,
+ uuid varchar(64) NOT NULL,
+ category_uuid varchar(64) NOT NULL,
+ name varchar(256) NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (uuid,category_uuid,name)
+);
+
+-- ----------------------------
+-- Table structure for dv_catalog_tag_category
+-- ----------------------------
+DROP TABLE IF EXISTS dv_catalog_tag_category;
+CREATE TABLE dv_catalog_tag_category (
+ id bigserial,
+ uuid varchar(64) NOT NULL,
+ name varchar(256) NOT NULL,
+ workspace_id bigint NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (uuid,name)
+);
+
+-- ----------------------------
+-- Table structure for dv_common_task
+-- ----------------------------
+DROP TABLE IF EXISTS dv_common_task;
+CREATE TABLE dv_common_task (
+ id bigserial,
+ task_type varchar(128) DEFAULT NULL,
+ type varchar(128) DEFAULT NULL,
+ datasource_id bigint NOT NULL DEFAULT '-1',
+ database_name varchar(128) DEFAULT NULL,
+ table_name varchar(128) DEFAULT NULL,
+ status integer DEFAULT NULL,
+ parameter text,
+ execute_host varchar(255) DEFAULT NULL,
+ submit_time timestamp DEFAULT NULL,
+ schedule_time timestamp DEFAULT NULL,
+ start_time timestamp DEFAULT NULL,
+ end_time timestamp DEFAULT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_common_task_schedule
+-- ----------------------------
+DROP TABLE IF EXISTS dv_common_task_schedule;
+CREATE TABLE dv_common_task_schedule (
+ id bigserial,
+ task_type varchar(128) DEFAULT NULL,
+ type varchar(255) NOT NULL,
+ param text,
+ datasource_id bigint NOT NULL,
+ cron_expression varchar(255) DEFAULT NULL,
+ status smallint DEFAULT NULL,
+ start_time timestamp DEFAULT NULL,
+ end_time timestamp DEFAULT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_command
+-- ----------------------------
+DROP TABLE IF EXISTS dv_command;
+CREATE TABLE dv_command (
+ id bigserial,
+ type smallint NOT NULL DEFAULT '0',
+ parameter text,
+ execute_host varchar(255),
+ job_execution_id bigint NOT NULL,
+ priority integer DEFAULT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_datasource
+-- ----------------------------
+DROP TABLE IF EXISTS dv_datasource;
+CREATE TABLE dv_datasource (
+ id bigserial,
+ uuid varchar(64) NOT NULL,
+ name varchar(255) NOT NULL,
+ category varchar(255) DEFAULT 'database',
+ type varchar(255) NOT NULL,
+ param text NOT NULL,
+ param_code text NULL,
+ workspace_id bigint NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (name)
+);
+
+-- ----------------------------
+-- Table structure for dv_env
+-- ----------------------------
+DROP TABLE IF EXISTS dv_env;
+CREATE TABLE dv_env (
+ id bigserial,
+ name varchar(255) NOT NULL,
+ env text NOT NULL,
+ workspace_id bigint NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (name)
+);
+
+DROP TABLE IF EXISTS dv_access_token;
+CREATE TABLE dv_access_token (
+ id bigserial,
+ workspace_id bigint NOT NULL,
+ user_id bigint NOT NULL,
+ token varchar(1024) NOT NULL,
+ expire_time timestamp NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_error_data_storage
+-- ----------------------------
+DROP TABLE IF EXISTS dv_error_data_storage;
+CREATE TABLE dv_error_data_storage (
+ id bigserial,
+ name varchar(255) NOT NULL,
+ type varchar(255) NOT NULL,
+ param text NOT NULL,
+ workspace_id bigint NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (name,workspace_id)
+);
+
+-- ----------------------------
+-- Table structure for dv_issue
+-- ----------------------------
+DROP TABLE IF EXISTS dv_issue;
+CREATE TABLE dv_issue (
+ id bigserial,
+ title varchar(1024) DEFAULT NULL,
+ content text NOT NULL,
+ status varchar(255) NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_job
+-- ----------------------------
+DROP TABLE IF EXISTS dv_job;
+CREATE TABLE dv_job (
+ id bigserial,
+ name varchar(255) DEFAULT NULL,
+ type integer NOT NULL DEFAULT '0',
+ datasource_id bigint NOT NULL,
+ datasource_id_2 bigint DEFAULT NULL,
+ schema_name varchar(128) DEFAULT NULL,
+ table_name varchar(128) DEFAULT NULL,
+ column_name varchar(128) DEFAULT NULL,
+ selected_column text,
+ metric_type varchar(255) DEFAULT NULL,
+ execute_platform_type varchar(128) DEFAULT NULL,
+ execute_platform_parameter text,
+ engine_type varchar(128) DEFAULT NULL,
+ engine_parameter text,
+ error_data_storage_id bigint DEFAULT NULL,
+ is_error_data_output_to_datasource smallint DEFAULT '0',
+ error_data_output_to_datasource_database varchar(255) DEFAULT NULL,
+ parameter text,
+ retry_times integer DEFAULT NULL,
+ retry_interval integer DEFAULT NULL,
+ timeout integer DEFAULT NULL,
+ timeout_strategy integer DEFAULT NULL,
+ pre_sql text DEFAULT NULL,
+ post_sql text DEFAULT NULL,
+ tenant_code bigint DEFAULT NULL,
+ env bigint DEFAULT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (name,datasource_id,schema_name,table_name,column_name)
+);
+
+-- ----------------------------
+-- Table structure for dv_job_execution
+-- ----------------------------
+DROP TABLE IF EXISTS dv_job_execution;
+CREATE TABLE dv_job_execution (
+ id bigserial,
+ name varchar(255) NOT NULL,
+ job_id bigint NOT NULL DEFAULT '-1',
+ job_type integer NOT NULL DEFAULT '0',
+ schema_name varchar(128) DEFAULT NULL,
+ table_name varchar(128) DEFAULT NULL,
+ column_name varchar(128) DEFAULT NULL,
+ metric_type varchar(255) DEFAULT NULL,
+ datasource_id bigint NOT NULL DEFAULT '-1',
+ execute_platform_type varchar(128) DEFAULT NULL,
+ execute_platform_parameter text,
+ engine_type varchar(128) DEFAULT NULL,
+ engine_parameter text,
+ error_data_storage_type varchar(128) DEFAULT NULL,
+ error_data_storage_parameter text,
+ error_data_file_name varchar(255) DEFAULT NULL,
+ parameter text NOT NULL,
+ status integer DEFAULT NULL,
+ retry_times integer DEFAULT NULL,
+ retry_interval integer DEFAULT NULL,
+ timeout integer DEFAULT NULL,
+ timeout_strategy integer DEFAULT NULL,
+ pre_sql text DEFAULT NULL,
+ post_sql text DEFAULT NULL,
+ tenant_code varchar(255) DEFAULT NULL,
+ execute_host varchar(255) DEFAULT NULL,
+ application_id varchar(255) DEFAULT NULL,
+ application_tag varchar(255) DEFAULT NULL,
+ process_id integer DEFAULT NULL,
+ execute_file_path varchar(255) DEFAULT NULL,
+ log_path varchar(255) DEFAULT NULL,
+ env text,
+ submit_time timestamp DEFAULT NULL,
+ schedule_time timestamp DEFAULT NULL,
+ start_time timestamp DEFAULT NULL,
+ end_time timestamp DEFAULT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_job_execution_result
+-- ----------------------------
+DROP TABLE IF EXISTS dv_job_execution_result;
+CREATE TABLE dv_job_execution_result (
+ id bigserial,
+ job_execution_id bigint DEFAULT NULL,
+ metric_unique_key varchar(255) DEFAULT NULL,
+ metric_type varchar(255) DEFAULT NULL,
+ metric_dimension varchar(255) DEFAULT NULL,
+ metric_name varchar(255) DEFAULT NULL,
+ database_name varchar(128) DEFAULT NULL,
+ table_name varchar(128) DEFAULT NULL,
+ column_name varchar(128) DEFAULT NULL,
+ actual_value decimal(20,4) DEFAULT NULL,
+ expected_value decimal(20,4) DEFAULT NULL,
+ expected_type varchar(255) DEFAULT NULL,
+ result_formula varchar(255) DEFAULT NULL,
+ operator varchar(255) DEFAULT NULL,
+ threshold decimal(20,4) DEFAULT NULL,
+ score decimal(20,4) DEFAULT 0,
+ state integer NOT NULL DEFAULT '0',
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (job_execution_id,metric_unique_key)
+);
+
+-- ----------------------------
+-- Table structure for dv_job_quality_report
+-- ----------------------------
+DROP TABLE IF EXISTS dv_job_quality_report;
+CREATE TABLE dv_job_quality_report (
+ id bigserial,
+ datasource_id bigint DEFAULT NULL,
+ entity_level varchar(128) DEFAULT NULL,
+ database_name varchar(128) DEFAULT NULL,
+ table_name varchar(128) DEFAULT NULL,
+ column_name varchar(128) DEFAULT NULL,
+ score decimal(20,4) DEFAULT NULL,
+ report_date date DEFAULT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_job_execution_result_report_rel
+-- ----------------------------
+DROP TABLE IF EXISTS dv_job_execution_result_report_rel;
+CREATE TABLE dv_job_execution_result_report_rel (
+ id bigserial,
+ quality_report_id bigint NOT NULL,
+ job_execution_result_id bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (job_execution_result_id,quality_report_id)
+);
+
+-- ----------------------------
+-- Table structure for dv_job_issue_rel
+-- ----------------------------
+DROP TABLE IF EXISTS dv_job_issue_rel;
+CREATE TABLE dv_job_issue_rel (
+ id bigserial,
+ job_id bigint NOT NULL,
+ issue_id bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (job_id,issue_id)
+);
+CREATE INDEX dv_job_issue_rel_idx_entity2_uuid ON dv_job_issue_rel (issue_id);
+
+-- ----------------------------
+-- Table structure for dv_job_schedule
+-- ----------------------------
+DROP TABLE IF EXISTS dv_job_schedule;
+CREATE TABLE dv_job_schedule (
+ id bigserial,
+ type varchar(255) NOT NULL,
+ param text,
+ job_id bigint NOT NULL,
+ cron_expression varchar(255) DEFAULT NULL,
+ status smallint DEFAULT NULL,
+ start_time timestamp DEFAULT NULL,
+ end_time timestamp DEFAULT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_server
+-- ----------------------------
+DROP TABLE IF EXISTS dv_server;
+CREATE TABLE dv_server (
+ id serial,
+ host varchar(255) NOT NULL,
+ port integer NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (host,port)
+);
+
+DROP TABLE IF EXISTS dv_registry_lock;
+CREATE TABLE dv_registry_lock
+(
+ id bigserial,
+ lock_key varchar(256) NOT NULL,
+ lock_owner varchar(256) NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (lock_key)
+);
+CREATE INDEX idx_upt ON dv_registry_lock (update_time);
+
+-- ----------------------------
+-- Table structure for dv_sla
+-- ----------------------------
+DROP TABLE IF EXISTS dv_sla;
+CREATE TABLE dv_sla (
+ id bigserial,
+ workspace_id bigint NOT NULL,
+ name varchar(255) NOT NULL,
+ description varchar(255) NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_sla_job
+-- ----------------------------
+DROP TABLE IF EXISTS dv_sla_job;
+CREATE TABLE dv_sla_job (
+ id bigserial,
+ workspace_id bigint NOT NULL,
+ sla_id bigint NOT NULL,
+ job_id bigint NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (workspace_id,sla_id,job_id)
+);
+
+-- ----------------------------
+-- Table structure for dv_sla_notification
+-- ----------------------------
+DROP TABLE IF EXISTS dv_sla_notification;
+CREATE TABLE dv_sla_notification (
+ id bigserial,
+ type varchar(40) NOT NULL,
+ workspace_id bigint NOT NULL,
+ sla_id bigint NOT NULL,
+ sender_id bigint NOT NULL,
+ config text,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_sla_sender
+-- ----------------------------
+DROP TABLE IF EXISTS dv_sla_sender;
+CREATE TABLE dv_sla_sender (
+ id bigserial,
+ type varchar(40) NOT NULL,
+ name varchar(255) NOT NULL,
+ workspace_id bigint NOT NULL,
+ config text NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_tenant
+-- ----------------------------
+DROP TABLE IF EXISTS dv_tenant;
+CREATE TABLE dv_tenant (
+ id bigserial,
+ tenant varchar(255) NOT NULL,
+ workspace_id bigint NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (tenant)
+);
+
+-- ----------------------------
+-- Table structure for dv_user
+-- ----------------------------
+DROP TABLE IF EXISTS dv_user;
+CREATE TABLE dv_user (
+ id bigserial,
+ username varchar(255) NOT NULL,
+ password varchar(255) NOT NULL,
+ email varchar(255) NOT NULL,
+ phone varchar(127) DEFAULT NULL,
+ admin smallint NOT NULL DEFAULT '0',
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (username)
+);
+
+-- ----------------------------
+-- Table structure for dv_user_workspace
+-- ----------------------------
+DROP TABLE IF EXISTS dv_user_workspace;
+CREATE TABLE dv_user_workspace (
+ id bigserial,
+ user_id bigint NOT NULL,
+ workspace_id bigint NOT NULL,
+ role_id bigint DEFAULT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+-- ----------------------------
+-- Table structure for dv_workspace
+-- ----------------------------
+DROP TABLE IF EXISTS dv_workspace;
+CREATE TABLE dv_workspace (
+ id bigserial,
+ name varchar(255) NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE (name)
+);
+
+-- ----------------------------
+-- Table structure for dv_config
+-- ----------------------------
+DROP TABLE IF EXISTS dv_config;
+CREATE TABLE dv_config (
+ id bigserial,
+ workspace_id bigint NOT NULL,
+ var_key varchar(255) NOT NULL,
+ var_value text NOT NULL,
+ is_default smallint NOT NULL,
+ create_by bigint NOT NULL,
+ create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ update_by bigint NOT NULL,
+ update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id)
+);
+
+INSERT INTO dv_config VALUES ('1', '-1', 'data.quality.jar.name', '/libs/datavines-engine-spark-core-1.0.0-SNAPSHOT.jar', '1', '1', '2023-09-02 16:52:56', '1', '2023-09-03 09:56:12');
+INSERT INTO dv_config VALUES ('2', '-1', 'yarn.mode', 'standalone', '1', '1', '2023-09-02 18:28:59', '1', '2023-09-03 12:46:24');
+INSERT INTO dv_config VALUES ('3', '-1', 'yarn.application.status.address', 'http://%s:%s/ws/v1/cluster/apps/%s', '1', '1', '2023-09-03 09:57:01', '1', '2023-09-03 09:57:01');
+INSERT INTO dv_config VALUES ('4', '-1', 'yarn.resource.manager.http.address.port', '8088', '1', '1', '2023-09-03 09:57:34', '1', '2023-09-03 09:57:34');
+INSERT INTO dv_config VALUES ('5', '-1', 'yarn.resource.manager.ha.ids', '192.168.0.x,192.168.0.x', '1', '1', '2023-09-03 09:58:17', '1', '2023-09-03 09:58:17');
+INSERT INTO dv_config VALUES ('7', '-1', 'max.cpu.load.avg', '10', '1', '1', '2023-09-03 09:59:06', '1', '2023-09-03 09:59:06');
+INSERT INTO dv_config VALUES ('8', '-1', 'reserved.memory', '0.3f', '1', '1', '2023-09-03 09:59:28', '1', '2023-09-03 09:59:28');
+INSERT INTO dv_config VALUES ('9', '-1', 'file.max.length', '10000000', '1', '1', '2023-09-03 14:57:33', '1', '2023-09-03 14:57:33');
+INSERT INTO dv_config VALUES ('10', '-1', 'error.data.dir', '/tmp/datavines/error-data', '1', '1', '2023-09-03 14:58:01', '1', '2023-09-03 14:58:01');
+INSERT INTO dv_config VALUES ('11', '-1', 'validate.result.data.dir', '/tmp/datavines/validate-result-data', '1', '1', '2023-09-03 14:58:29', '1', '2023-09-03 14:58:29');
+INSERT INTO dv_config VALUES ('12', '-1', 'local.execution.threshold', '1000', '1', '1', '2023-09-03 15:02:38', '1', '2023-09-03 15:02:38');
+INSERT INTO dv_config VALUES ('13', '-1', 'spark.execution.threshold', '1000', '1', '1', '2023-09-03 15:02:38', '1', '2023-09-03 15:02:38');
+INSERT INTO dv_config VALUES ('14', '-1', 'livy.uri', 'http://localhost:8998/batches', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('15', '-1', 'livy.task.appId.retry.count', '3', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('16', '-1', 'livy.need.kerberos', 'false', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('17', '-1', 'livy.server.auth.kerberos.principal', 'livy/kerberos.principal', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('18', '-1', 'livy.server.auth.kerberos.keytab', '/path/to/livy/keytab/file', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('19', '-1', 'livy.task.proxyUser', 'root', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('20', '-1', 'livy.task.jar.lib.path', 'hdfs:///datavines/lib', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('21', '-1', 'livy.execution.threshold', '1000', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('22', '-1', 'livy.task.jars', CONCAT('datavines-common-1.0.0-SNAPSHOT.jar,datavines-spi-1.0.0-SNAPSHOT.jar,'
+ 'datavines-engine-spark-api-1.0.0-SNAPSHOT.jar,datavines-engine-spark-connector-jdbc-1.0.0-SNAPSHOT.jar,'
+ 'datavines-engine-core-1.0.0-SNAPSHOT.jar,datavines-engine-common-1.0.0-SNAPSHOT.jar,datavines-engine-spark-transform-sql-1.0.0-SNAPSHOT.jar,'
+ 'datavines-engine-api-1.0.0-SNAPSHOT.jar,mysql-connector-j-8.4.0.jar,httpclient-4.4.1.jar,'
+ 'httpcore-4.4.1.jar,postgresql-42.2.6.jar,presto-jdbc-0.283.jar,trino-jdbc-407.jar,clickhouse-jdbc-0.1.53.jar,'
+ 'mongo-java-driver-3.9.0.jar,mongo-spark-connector_2.11-2.4.0.jar,datavines-engine-spark-connector-mongodb-1.0.0-SNAPSHOT.jar'),
+ '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('23', '-1', 'profile.execute.engine', 'local', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('24', '-1', 'spark.engine.parameter.deploy.mode', 'cluster', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('25', '-1', 'spark.engine.parameter.num.executors', '1', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('26', '-1', 'spark.engine.parameter.driver.cores', '1', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('27', '-1', 'spark.engine.parameter.driver.memory', '512M', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('28', '-1', 'spark.engine.parameter.executor.cores', '1', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('29', '-1', 'spark.engine.parameter.executor.memory', '512M', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38');
+INSERT INTO dv_config VALUES ('30', '-1', 'datavines.fqdn', 'http://127.0.0.1:5600', '1', '1', '2024-05-21 15:15:38', '1', '2024-05-21 15:15:38');
+INSERT INTO dv_config VALUES ('31', '-1', 'data.quality.flink.jar.name', '/libs/datavines-engine-flink-core-1.0.0-SNAPSHOT.jar', '1', '1', '2025-02-02 11:43:04', '1', '2025-02-02 11:43:04');
+
+INSERT INTO dv_user (id, username, password, email, phone, admin) VALUES ('1', 'admin', '$2a$10$9ZcicUYFl/.knBi9SE53U.Nml8bfNeArxr35HQshxXzimbA6Ipgqq', 'admin@gmail.com', NULL, '0');
+INSERT INTO dv_workspace (id, name, create_by, update_by) VALUES ('1', 'admin''s default', '1', '1');
+INSERT INTO dv_user_workspace (id, user_id, workspace_id, role_id,create_by,update_by) VALUES ('1', '1', '1', '1','1', '1');
+
From ef8ce7c4dce83816582b0f19c3ad545446387ef9 Mon Sep 17 00:00:00 2001
From: xxzuo <1293378490@qq.com>
Date: Wed, 29 Apr 2026 01:11:21 +0800
Subject: [PATCH 2/3] [Feature][Server] support postgresql registry
---
bin/datavines-daemon.sh | 14 ++--
.../server/api/config/MybatisPlusConfig.java | 13 ++++
.../repository/mapper/JobExecutionMapper.java | 16 ++---
.../mapper/JobQualityReportMapper.java | 40 +++++++++++
.../service/impl/JobExecutionServiceImpl.java | 1 -
.../impl/JobQualityReportServiceImpl.java | 69 ++++++++++---------
.../resources/mapper/JobExecutionMapper.xml | 65 +++++++++++++++--
scripts/sql/datavines-postgresql.sql | 5 +-
8 files changed, 163 insertions(+), 60 deletions(-)
diff --git a/bin/datavines-daemon.sh b/bin/datavines-daemon.sh
index 6b2b645eb..45a7a6e29 100644
--- a/bin/datavines-daemon.sh
+++ b/bin/datavines-daemon.sh
@@ -16,7 +16,7 @@
# limitations under the License.
#
-usage="Usage: datavines-daemon.sh (start|start_container|start_with_jmx|stop|restart_with_jmx|status) <''|mysql>"
+usage="Usage: datavines-daemon.sh (start|start_container|start_with_jmx|stop|restart_with_jmx|status) <''|postgres|mysql>"
# if no args specified, show usage
if [ $# -le 0 ]; then
@@ -32,12 +32,12 @@ shift
springProfileActive=
if [ -n "$profile" ]; then
- if [ "$profile" = "mysql" ]; then
- springProfileActive="-Dspring.profiles.active=mysql"
- else
- echo "Error: No profile named \`$profile' was found."
- exit 1
- fi
+ if [ "$profile" = "postgres" ] || [ "$profile" = "mysql" ]; then
+ springProfileActive="-Dspring.profiles.active=$profile"
+ else
+ echo "Error: No profile named \`$profile' was found."
+ exit 1
+ fi
fi
echo "Begin $startStop DataVinesServer $profile......"
diff --git a/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java b/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java
index 3922590be..191553719 100644
--- a/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java
+++ b/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java
@@ -20,10 +20,13 @@
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import com.zaxxer.hikari.HikariDataSource;
+import org.apache.ibatis.mapping.DatabaseIdProvider;
+import org.apache.ibatis.mapping.VendorDatabaseIdProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
+import java.util.Properties;
@Configuration
public class MybatisPlusConfig {
@@ -35,6 +38,16 @@ public MybatisPlusInterceptor mybatisPlusInterceptor(DataSource dataSource) {
return interceptor;
}
+ @Bean
+ public DatabaseIdProvider databaseIdProvider() {
+ VendorDatabaseIdProvider databaseIdProvider = new VendorDatabaseIdProvider();
+ Properties properties = new Properties();
+ properties.setProperty("MySQL", "mysql");
+ properties.setProperty("PostgreSQL", "postgresql");
+ databaseIdProvider.setProperties(properties);
+ return databaseIdProvider;
+ }
+
private DbType resolveDbType(DataSource dataSource) {
if (dataSource instanceof HikariDataSource) {
HikariDataSource hikariDataSource = (HikariDataSource) dataSource;
diff --git a/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobExecutionMapper.java b/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobExecutionMapper.java
index 5dc196100..49c19708d 100644
--- a/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobExecutionMapper.java
+++ b/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobExecutionMapper.java
@@ -39,14 +39,14 @@ public interface JobExecutionMapper extends BaseMapper {
IPage getJobExecutionPage(Page page,
@Param("searchVal") String searchVal,
@Param("jobId") Long jobId,
- @Param("datasourceId") Long datasourceId,
- @Param("status") Integer status,
- @Param("metricType") String metricType, @Param("schemaName") String schemaName,
- @Param("tableName") String tableName, @Param("columnName") String columnName,
- @Param("startTime") String startTime, @Param("endTime") String endTime,
- @Param("schemaSearch") String schemaSearch,
- @Param("tableSearch") String tableSearch,
- @Param("columnSearch") String columnSearch);
+ @Param("datasourceId") Long datasourceId,
+ @Param("status") Integer status,
+ @Param("metricType") String metricType, @Param("schemaName") String schemaName,
+ @Param("tableName") String tableName, @Param("columnName") String columnName,
+ @Param("startTime") String startTime, @Param("endTime") String endTime,
+ @Param("schemaSearch") String schemaSearch,
+ @Param("tableSearch") String tableSearch,
+ @Param("columnSearch") String columnSearch);
List getJobExecutionAggPie(@Param("datasourceId") Long datasourceId,
@Param("metricType") String metricType, @Param("schemaName") String schemaName,
diff --git a/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobQualityReportMapper.java b/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobQualityReportMapper.java
index ff26b5afe..e9a5740fa 100644
--- a/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobQualityReportMapper.java
+++ b/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobQualityReportMapper.java
@@ -17,6 +17,8 @@
package io.datavines.server.repository.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.datavines.server.repository.entity.JobQualityReport;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@@ -27,21 +29,59 @@
@Mapper
public interface JobQualityReportMapper extends BaseMapper {
+ @Select(value = "SELECT datasource_id, database_name, table_name, '--' as column_name, avg(score) as score, CAST(#{reportDate} AS DATE) as report_date, 'table' as entity_level" +
+ " from dv_job_quality_report where report_date = CAST(#{reportDate} AS DATE) and datasource_id = #{datasourceId} and entity_level = 'column'" +
+ " group by datasource_id, database_name, table_name", databaseId = "postgresql")
+ @Select(value = "SELECT datasource_id, database_name, table_name, '--' as column_name, avg(score) as score, #{reportDate} as report_date, 'table' as entity_level" +
+ " from dv_job_quality_report where report_date = #{reportDate} and datasource_id = #{datasourceId} and entity_level = 'column'" +
+ " group by datasource_id, database_name, table_name", databaseId = "mysql")
@Select("SELECT datasource_id, database_name, table_name, '--' as column_name, avg(score) as score, #{reportDate} as report_date, 'table' as entity_level" +
" from dv_job_quality_report where report_date = #{reportDate} and datasource_id = #{datasourceId} and entity_level = 'column'" +
" group by datasource_id, database_name, table_name")
List listTableScoreGroupByDatasource(@Param("datasourceId") Long datasourceId,
@Param("reportDate") String reportDate);
+ @Select(value = "SELECT datasource_id, database_name, '--' as table_name, '--' as column_name, avg(score) as score, CAST(#{reportDate} AS DATE) as report_date, 'database' as entity_level" +
+ " from dv_job_quality_report where report_date = CAST(#{reportDate} AS DATE) and datasource_id = #{datasourceId} and entity_level = 'table'" +
+ " group by datasource_id, database_name", databaseId = "postgresql")
+ @Select(value = "SELECT datasource_id, database_name, '--' as table_name, '--' as column_name, avg(score) as score, #{reportDate} as report_date, 'database' as entity_level" +
+ " from dv_job_quality_report where report_date = #{reportDate} and datasource_id = #{datasourceId} and entity_level = 'table'" +
+ " group by datasource_id, database_name", databaseId = "mysql")
@Select("SELECT datasource_id, database_name, '--' as table_name, '--' as column_name, avg(score) as score, #{reportDate} as report_date, 'database' as entity_level" +
" from dv_job_quality_report where report_date = #{reportDate} and datasource_id = #{datasourceId} and entity_level = 'table'" +
" group by datasource_id, database_name")
List listDbScoreGroupByDatasource(@Param("datasourceId") Long datasourceId,
@Param("reportDate") String reportDate);
+ @Select(value = "SELECT datasource_id, '--' as database_name, '--' as table_name, '--' as column_name, avg(score) as score, CAST(#{reportDate} AS DATE) as report_date, 'datasource' as entity_level" +
+ " from dv_job_quality_report where report_date = CAST(#{reportDate} AS DATE) and datasource_id = #{datasourceId} and entity_level = 'database'" +
+ " group by datasource_id", databaseId = "postgresql")
+ @Select(value = "SELECT datasource_id, '--' as database_name, '--' as table_name, '--' as column_name, avg(score) as score, #{reportDate} as report_date, 'datasource' as entity_level" +
+ " from dv_job_quality_report where report_date = #{reportDate} and datasource_id = #{datasourceId} and entity_level = 'database'" +
+ " group by datasource_id", databaseId = "mysql")
@Select("SELECT datasource_id, '--' as database_name, '--' as table_name, '--' as column_name, avg(score) as score, #{reportDate} as report_date, 'datasource' as entity_level" +
" from dv_job_quality_report where report_date = #{reportDate} and datasource_id = #{datasourceId} and entity_level = 'database'" +
" group by datasource_id")
List listDatasourceScoreGroupByDatasource(@Param("datasourceId") Long datasourceId,
@Param("reportDate") String reportDate);
+
+ List listScoreByCondition(@Param("datasourceId") Long datasourceId,
+ @Param("entityLevel") String entityLevel,
+ @Param("schemaName") String schemaName,
+ @Param("tableName") String tableName,
+ @Param("reportDate") String reportDate);
+
+ List listScoreTrendByCondition(@Param("datasourceId") Long datasourceId,
+ @Param("entityLevel") String entityLevel,
+ @Param("schemaName") String schemaName,
+ @Param("tableName") String tableName,
+ @Param("startDate") String startDate,
+ @Param("endDate") String endDate);
+
+ IPage getQualityReportPage(Page page,
+ @Param("datasourceId") Long datasourceId,
+ @Param("schemaName") String schemaName,
+ @Param("tableName") String tableName,
+ @Param("reportDate") String reportDate,
+ @Param("entityLevel") String entityLevel);
}
diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExecutionServiceImpl.java b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExecutionServiceImpl.java
index 37fb35f63..f14ffcc62 100644
--- a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExecutionServiceImpl.java
+++ b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExecutionServiceImpl.java
@@ -367,7 +367,6 @@ public List getJobExecutionAggPie(JobExecutionDashboardPara
}
startDateStr += " 00:00:00";
endDateStr += " 23:59:59";
-
List items =
baseMapper.getJobExecutionAggPie(dashboardParam.getDatasourceId(), dashboardParam.getMetricType(),
dashboardParam.getSchemaName(), dashboardParam.getTableName(), dashboardParam.getColumnName(),
diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobQualityReportServiceImpl.java b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobQualityReportServiceImpl.java
index ec1862bec..ea860e8b0 100644
--- a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobQualityReportServiceImpl.java
+++ b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobQualityReportServiceImpl.java
@@ -225,12 +225,10 @@ public boolean generateQualityReport(Long datasourceId) {
@Override
public JobQualityReportScore getScoreByCondition(JobQualityReportDashboardParam dashboardParam) {
- LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
if (dashboardParam == null) {
throw new DataVinesException("param can not be null");
}
- queryWrapper.eq(JobQualityReport::getDatasourceId, dashboardParam.getDatasourceId());
String entityLevel = DATASOURCE;
if (StringUtils.isNotEmpty(dashboardParam.getSchemaName())) {
@@ -243,29 +241,31 @@ public JobQualityReportScore getScoreByCondition(JobQualityReportDashboardParam
switch (entityLevel) {
case DATASOURCE:
- queryWrapper.eq(JobQualityReport::getEntityLevel, DATASOURCE);
+ entityLevel = DATASOURCE;
break;
case DATABASE:
- queryWrapper.eq(JobQualityReport::getEntityLevel, DATABASE);
- queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getSchemaName()), JobQualityReport::getDatabaseName, dashboardParam.getSchemaName());
+ entityLevel = DATABASE;
break;
case TABLE:
- queryWrapper.eq(JobQualityReport::getEntityLevel, TABLE);
- queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getSchemaName()), JobQualityReport::getDatabaseName, dashboardParam.getSchemaName());
- queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getTableName()), JobQualityReport::getTableName, dashboardParam.getTableName());
+ entityLevel = TABLE;
break;
default:
break;
}
+ String reportDate;
if (StringUtils.isEmpty(dashboardParam.getReportDate())) {
- String yesterday = DateUtils.format(DateUtils.addDays(DateUtils.getCurrentDate(),-1),DateUtils.YYYY_MM_DD);
- queryWrapper.eq(JobQualityReport::getReportDate, yesterday);
+ reportDate = DateUtils.format(DateUtils.addDays(DateUtils.getCurrentDate(),-1),DateUtils.YYYY_MM_DD);
} else {
- queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getReportDate()), JobQualityReport::getReportDate, dashboardParam.getReportDate());
+ reportDate = dashboardParam.getReportDate();
}
- List jobQualityReports = jobQualityReportMapper.selectList(queryWrapper);
+ List jobQualityReports = jobQualityReportMapper.listScoreByCondition(
+ dashboardParam.getDatasourceId(),
+ entityLevel,
+ dashboardParam.getSchemaName(),
+ dashboardParam.getTableName(),
+ reportDate);
if (CollectionUtils.isEmpty(jobQualityReports)) {
return null;
}
@@ -312,9 +312,6 @@ public JobQualityReportScoreTrend getScoreTrendByCondition(JobQualityReportDashb
currentDate = currentDate.plusDays(1);
}
- LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
- queryWrapper.eq(JobQualityReport::getDatasourceId, dashboardParam.getDatasourceId());
-
String entityLevel = DATASOURCE;
if (StringUtils.isNotEmpty(dashboardParam.getSchemaName())) {
@@ -327,24 +324,25 @@ public JobQualityReportScoreTrend getScoreTrendByCondition(JobQualityReportDashb
switch (entityLevel) {
case DATASOURCE:
- queryWrapper.eq(JobQualityReport::getEntityLevel, DATASOURCE);
+ entityLevel = DATASOURCE;
break;
case DATABASE:
- queryWrapper.eq(JobQualityReport::getEntityLevel, DATABASE);
- queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getSchemaName()), JobQualityReport::getDatabaseName, dashboardParam.getSchemaName());
+ entityLevel = DATABASE;
break;
case TABLE:
- queryWrapper.eq(JobQualityReport::getEntityLevel, TABLE);
- queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getSchemaName()), JobQualityReport::getDatabaseName, dashboardParam.getSchemaName());
- queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getTableName()), JobQualityReport::getTableName, dashboardParam.getTableName());
+ entityLevel = TABLE;
break;
default:
break;
}
- queryWrapper.between(JobQualityReport::getReportDate, startDateStr, endDateStr);
- queryWrapper.orderByAsc(JobQualityReport::getReportDate);
- List reportList = list(queryWrapper);
+ List reportList = jobQualityReportMapper.listScoreTrendByCondition(
+ dashboardParam.getDatasourceId(),
+ entityLevel,
+ dashboardParam.getSchemaName(),
+ dashboardParam.getTableName(),
+ startDateStr,
+ endDateStr);
Map date2Score = new HashMap<>();
if (CollectionUtils.isNotEmpty(reportList)) {
@@ -375,24 +373,27 @@ public JobQualityReportScoreTrend getScoreTrendByCondition(JobQualityReportDashb
@Override
public IPage getQualityReportPage(JobQualityReportDashboardParam dashboardParam) {
Page page = new Page<>(dashboardParam.getPageNumber(), dashboardParam.getPageSize());
- LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
- queryWrapper.eq(JobQualityReport::getDatasourceId, dashboardParam.getDatasourceId());
- queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getSchemaName()),JobQualityReport::getDatabaseName,dashboardParam.getSchemaName());
- queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getTableName()),JobQualityReport::getTableName,dashboardParam.getTableName());
+ String reportDate;
if (StringUtils.isEmpty(dashboardParam.getReportDate())) {
- String yesterday = DateUtils.format(DateUtils.addDays(DateUtils.getCurrentDate(),-1),DateUtils.YYYY_MM_DD);
- queryWrapper.eq(JobQualityReport::getReportDate, yesterday);
+ reportDate = DateUtils.format(DateUtils.addDays(DateUtils.getCurrentDate(),-1),DateUtils.YYYY_MM_DD);
} else {
- queryWrapper.eq(JobQualityReport::getReportDate, dashboardParam.getReportDate());
+ reportDate = dashboardParam.getReportDate();
}
+ String entityLevel;
if (StringUtils.isNotEmpty(dashboardParam.getTableName())) {
- queryWrapper.eq(JobQualityReport::getEntityLevel, COLUMN);
+ entityLevel = COLUMN;
} else {
- queryWrapper.eq(JobQualityReport::getEntityLevel, TABLE);
+ entityLevel = TABLE;
}
- return page(page, queryWrapper).convert(jobQualityReport -> {
+ return jobQualityReportMapper.getQualityReportPage(
+ page,
+ dashboardParam.getDatasourceId(),
+ dashboardParam.getSchemaName(),
+ dashboardParam.getTableName(),
+ reportDate,
+ entityLevel).convert(jobQualityReport -> {
JobQualityReportVO jobQualityReportVO = new JobQualityReportVO();
BeanUtils.copyProperties(jobQualityReport, jobQualityReportVO);
return jobQualityReportVO;
diff --git a/datavines-server/src/main/resources/mapper/JobExecutionMapper.xml b/datavines-server/src/main/resources/mapper/JobExecutionMapper.xml
index c23d66f78..190226766 100644
--- a/datavines-server/src/main/resources/mapper/JobExecutionMapper.xml
+++ b/datavines-server/src/main/resources/mapper/JobExecutionMapper.xml
@@ -39,10 +39,24 @@
and column_name = #{columnName}
- and update_time >= #{startTime}
+
+
+ and update_time >= CAST(#{startTime} AS timestamp)
+
+
+ and update_time >= #{startTime}
+
+
- and update_time <= #{endTime}
+
+
+ and update_time <= CAST(#{endTime} AS timestamp)
+
+
+ and update_time <= #{endTime}
+
+
and datasource_id = #{datasourceId}
@@ -70,10 +84,24 @@
and column_name = #{columnName}
- and update_time >= #{startTime}
+
+
+ and update_time >= CAST(#{startTime} AS timestamp)
+
+
+ and update_time >= #{startTime}
+
+
- and update_time <= #{endTime}
+
+
+ and update_time <= CAST(#{endTime} AS timestamp)
+
+
+ and update_time <= #{endTime}
+
+
and datasource_id = #{datasourceId}
@@ -82,7 +110,16 @@
- select TO_CHAR(create_time, 'YYYY-MM-DD') AS create_date, status from dv_job_execution
+ select
+
+
+ TO_CHAR(create_time, 'YYYY-MM-DD')
+
+
+ DATE_FORMAT(create_time, '%Y-%m-%d')
+
+
+ AS create_date, status from dv_job_execution
and metric_type = #{metricType}
@@ -97,10 +134,24 @@
and column_name = #{columnName}
- and create_time >= #{startTime}
+
+
+ and create_time >= CAST(#{startTime} AS timestamp)
+
+
+ and create_time >= #{startTime}
+
+
- and create_time <= #{endTime}
+
+
+ and create_time <= CAST(#{endTime} AS timestamp)
+
+
+ and create_time <= #{endTime}
+
+
and datasource_id = #{datasourceId}
diff --git a/scripts/sql/datavines-postgresql.sql b/scripts/sql/datavines-postgresql.sql
index 0db97cf67..4d9a4300a 100644
--- a/scripts/sql/datavines-postgresql.sql
+++ b/scripts/sql/datavines-postgresql.sql
@@ -383,7 +383,7 @@ CREATE TABLE dv_common_task_schedule (
param text,
datasource_id bigint NOT NULL,
cron_expression varchar(255) DEFAULT NULL,
- status smallint DEFAULT NULL,
+ status boolean DEFAULT NULL,
start_time timestamp DEFAULT NULL,
end_time timestamp DEFAULT NULL,
create_by bigint NOT NULL,
@@ -663,7 +663,7 @@ CREATE TABLE dv_job_schedule (
param text,
job_id bigint NOT NULL,
cron_expression varchar(255) DEFAULT NULL,
- status smallint DEFAULT NULL,
+ status boolean DEFAULT NULL,
start_time timestamp DEFAULT NULL,
end_time timestamp DEFAULT NULL,
create_by bigint NOT NULL,
@@ -889,4 +889,3 @@ INSERT INTO dv_config VALUES ('31', '-1', 'data.quality.flink.jar.name', '/libs/
INSERT INTO dv_user (id, username, password, email, phone, admin) VALUES ('1', 'admin', '$2a$10$9ZcicUYFl/.knBi9SE53U.Nml8bfNeArxr35HQshxXzimbA6Ipgqq', 'admin@gmail.com', NULL, '0');
INSERT INTO dv_workspace (id, name, create_by, update_by) VALUES ('1', 'admin''s default', '1', '1');
INSERT INTO dv_user_workspace (id, user_id, workspace_id, role_id,create_by,update_by) VALUES ('1', '1', '1', '1','1', '1');
-
From 4a0b8c84de8892e194dff959f1993d70a0233808 Mon Sep 17 00:00:00 2001
From: xxzuo <1293378490@qq.com>
Date: Wed, 29 Apr 2026 01:11:57 +0800
Subject: [PATCH 3/3] [Feature][Server] support postgresql registry
---
.../mapper/JobQualityReportMapper.xml | 119 ++++++++++++++++++
1 file changed, 119 insertions(+)
create mode 100644 datavines-server/src/main/resources/mapper/JobQualityReportMapper.xml
diff --git a/datavines-server/src/main/resources/mapper/JobQualityReportMapper.xml b/datavines-server/src/main/resources/mapper/JobQualityReportMapper.xml
new file mode 100644
index 000000000..202b030c1
--- /dev/null
+++ b/datavines-server/src/main/resources/mapper/JobQualityReportMapper.xml
@@ -0,0 +1,119 @@
+
+
+
+
+
+
+ id,
+ datasource_id,
+ entity_level,
+ database_name,
+ table_name,
+ column_name,
+ score,
+ report_date,
+ create_time,
+ update_time
+
+
+
+
+ datasource_id = #{datasourceId}
+ and entity_level = #{entityLevel}
+
+ and database_name = #{schemaName}
+
+
+ and table_name = #{tableName}
+
+
+
+
+
+
+
+ and report_date = CAST(#{reportDate} AS date)
+
+
+ and report_date = #{reportDate}
+
+
+
+
+
+
+
+ and report_date between CAST(#{startDate} AS date) and CAST(#{endDate} AS date)
+
+
+ and report_date between #{startDate} and #{endDate}
+
+
+
+
+
+ select
+ from dv_job_quality_report
+
+ datasource_id = #{datasourceId}
+ and entity_level = #{entityLevel}
+
+ and database_name = #{schemaName}
+
+
+ and table_name = #{tableName}
+
+
+
+
+
+
+ select
+ from dv_job_quality_report
+
+ datasource_id = #{datasourceId}
+ and entity_level = #{entityLevel}
+
+ and database_name = #{schemaName}
+
+
+ and table_name = #{tableName}
+
+
+
+ order by report_date asc
+
+
+
+ select
+ from dv_job_quality_report
+
+ datasource_id = #{datasourceId}
+
+ and database_name = #{schemaName}
+
+
+ and table_name = #{tableName}
+
+ and entity_level = #{entityLevel}
+
+
+
+