From 0127658ea9d9ebf1d0a469c9f526353f9c4f5dc9 Mon Sep 17 00:00:00 2001 From: xxzuo <1293378490@qq.com> Date: Wed, 15 Apr 2026 00:45:09 +0800 Subject: [PATCH 1/3] [Feature][Server] support postgresql registry --- README.md | 8 + .../src/main/assembly/datavines-bin.xml | 1 + .../datavines-registry-postgresql/pom.xml | 40 + .../registry/plugin/PostgresqlMutex.java | 209 ++++ .../registry/plugin/PostgresqlRegistry.java | 113 +++ .../plugin/PostgresqlServerStateManager.java | 267 ++++++ .../registry/plugin/RegistryLock.java | 35 + .../io.datavines.registry.api.Registry | 1 + .../datavines-registry-plugins/pom.xml | 3 +- datavines-server/pom.xml | 54 +- .../server/api/config/MybatisPlusConfig.java | 20 +- .../CatalogMetaDataFetchExecutorImpl.java | 25 +- .../CatalogEntityMetricJobRelMapper.xml | 6 +- .../resources/mapper/DataSourceMapper.xml | 6 +- .../mapper/ErrorDataStorageMapper.xml | 8 +- .../resources/mapper/JobExecutionMapper.xml | 10 +- .../src/main/resources/mapper/JobMapper.xml | 14 +- .../main/resources/mapper/SlaJobMapper.xml | 4 +- scripts/sql/datavines-postgresql.sql | 892 ++++++++++++++++++ 19 files changed, 1662 insertions(+), 54 deletions(-) create mode 100644 datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/pom.xml create mode 100644 datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlMutex.java create mode 100644 datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlRegistry.java create mode 100644 datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlServerStateManager.java create mode 100644 datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/RegistryLock.java create mode 100644 datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/resources/META-INF/services/io.datavines.registry.api.Registry create mode 100644 scripts/sql/datavines-postgresql.sql diff --git a/README.md b/README.md index 61b49152f..894b17b3c 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,14 @@ Need: Maven 3.6.1 and later ```sh $ mvn clean package -Prelease -DskipTests ``` + +## Metadata Database + +- The default `datavines-server` configuration uses `PostgreSQL` +- Initialize a PostgreSQL metadata database with `scripts/sql/datavines-postgresql.sql` +- If you want to run with `MySQL`, switch to the `mysql` Spring profile and initialize with `scripts/sql/datavines-mysql.sql` +- The release package now contains both scripts in `${DATAVINES_HOME}/scripts/` + ## Features ### Data Catalog diff --git a/datavines-dist/src/main/assembly/datavines-bin.xml b/datavines-dist/src/main/assembly/datavines-bin.xml index 0f9e8e6f6..47a392086 100644 --- a/datavines-dist/src/main/assembly/datavines-bin.xml +++ b/datavines-dist/src/main/assembly/datavines-bin.xml @@ -80,6 +80,7 @@ ${basedir}/../scripts/sql datavines-mysql.sql + datavines-postgresql.sql ./scripts diff --git a/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/pom.xml b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/pom.xml new file mode 100644 index 000000000..cf888af81 --- /dev/null +++ b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/pom.xml @@ -0,0 +1,40 @@ + + + + + + datavines-registry-plugins + io.datavines + 1.0.0-SNAPSHOT + + 4.0.0 + + datavines-registry-postgresql + + + + com.zaxxer + HikariCP + + + + diff --git a/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlMutex.java b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlMutex.java new file mode 100644 index 000000000..3dee45892 --- /dev/null +++ b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlMutex.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.registry.plugin; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.datavines.common.utils.ConnectionUtils; +import io.datavines.common.utils.NetUtils; +import io.datavines.common.utils.ThreadUtils; +import io.datavines.registry.api.ServerInfo; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class PostgresqlMutex { + + public static final long LOCK_ACQUIRE_INTERVAL = 1000; + + private final long expireTimeWindow = 600; + + private Connection connection; + + private final Properties properties; + + private final ServerInfo serverInfo; + + private final ConcurrentHashMap lockHoldMap; + + public PostgresqlMutex(Connection connection, Properties properties) throws SQLException { + this.connection = connection; + this.properties = properties; + this.serverInfo = new ServerInfo(NetUtils.getHost(), Integer.valueOf((String) properties.get("server.port"))); + this.lockHoldMap = new ConcurrentHashMap<>(); + ScheduledExecutorService lockTermUpdateThreadPool = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("RegistryLockRefreshThread").setDaemon(true).build()); + + lockTermUpdateThreadPool.scheduleWithFixedDelay( + new LockTermRefreshTask(lockHoldMap), + 2, + 2, + TimeUnit.SECONDS); + + clearExpireLock(); + } + + public boolean acquire(String lockKey, long time) { + RegistryLock lock = lockHoldMap.computeIfAbsent(lockKey, key -> { + RegistryLock registryLock = null; + int count = 1; + if (time > 0) { + count = Math.max(1, (int) (time * 1000 / LOCK_ACQUIRE_INTERVAL)); + } + while (count > 0) { + try { + registryLock = executeInsert(key); + log.debug("Acquire the lock success, {}", key); + count = 0; + } catch (SQLException e) { + log.error("Acquire the lock error, {}, try again!", e.getLocalizedMessage()); + try { + clearExpireLock(); + } catch (SQLException ex) { + log.error("clear expire lock error : ", ex); + } + ThreadUtils.sleep(LOCK_ACQUIRE_INTERVAL); + count--; + } + } + + return registryLock; + }); + + return lock != null; + } + + public boolean release(String lockKey) throws SQLException { + RegistryLock registryLock = lockHoldMap.get(lockKey); + if (registryLock != null) { + try { + executeDelete(lockKey); + lockHoldMap.remove(lockKey); + } catch (SQLException e) { + log.error(String.format("Release lock: %s error", lockKey), e); + return false; + } + } + + return true; + } + + public void close() throws SQLException { + if (connection != null) { + connection.close(); + } + } + + private RegistryLock executeInsert(String key) throws SQLException { + checkConnection(); + Timestamp updateTime = new Timestamp(System.currentTimeMillis()); + PreparedStatement preparedStatement = connection.prepareStatement( + "insert into dv_registry_lock (lock_key,lock_owner,update_time) values (?,?,?)"); + preparedStatement.setString(1, key); + preparedStatement.setString(2, this.serverInfo.getAddr()); + preparedStatement.setTimestamp(3, updateTime); + preparedStatement.executeUpdate(); + preparedStatement.close(); + return new RegistryLock(key, this.serverInfo.getAddr(), updateTime); + } + + private RegistryLock executeUpdate(String key) throws SQLException { + checkConnection(); + Timestamp updateTime = new Timestamp(System.currentTimeMillis()); + PreparedStatement preparedStatement = connection.prepareStatement( + "update dv_registry_lock set update_time = ? where lock_key = ?"); + preparedStatement.setTimestamp(1, updateTime); + preparedStatement.setString(2, key); + preparedStatement.executeUpdate(); + preparedStatement.close(); + return new RegistryLock(key, this.serverInfo.getAddr(), updateTime); + } + + private void executeDelete(String key) throws SQLException { + checkConnection(); + PreparedStatement preparedStatement = connection.prepareStatement( + "delete from dv_registry_lock where lock_key = ?"); + preparedStatement.setString(1, key); + if (preparedStatement.executeUpdate() > 0) { + lockHoldMap.remove(key); + } + preparedStatement.close(); + } + + private void clearExpireLock() throws SQLException { + checkConnection(); + PreparedStatement preparedStatement = connection.prepareStatement( + "delete from dv_registry_lock where update_time < ?"); + preparedStatement.setTimestamp(1, new Timestamp(System.currentTimeMillis() - expireTimeWindow)); + preparedStatement.executeUpdate(); + preparedStatement.close(); + lockHoldMap.values().removeIf((v -> v.getUpdateTime().getTime() < (System.currentTimeMillis() - expireTimeWindow))); + } + + private void checkConnection() throws SQLException { + if (connection == null || connection.isClosed()) { + connection = ConnectionUtils.getConnection(properties); + } + } + + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class LockTermRefreshTask implements Runnable { + + private final Map lockHoldMap; + + @Override + public void run() { + try { + if (lockHoldMap.isEmpty()) { + return; + } + + List lockKeys = new ArrayList<>(); + for (RegistryLock lock : lockHoldMap.values()) { + if (lock != null) { + lockKeys.add(lock.getLockKey()); + } + } + lockKeys.forEach(lockKey -> { + try { + RegistryLock registryLock = executeUpdate(lockKey); + lockHoldMap.put(lockKey, registryLock); + } catch (SQLException e) { + log.warn("Update the lock: {} term failed.", lockKey); + } + }); + + clearExpireLock(); + } catch (Exception e) { + log.error("Update lock term error", e); + } + } + } +} diff --git a/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlRegistry.java b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlRegistry.java new file mode 100644 index 000000000..d214f3306 --- /dev/null +++ b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlRegistry.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.registry.plugin; + +import io.datavines.common.utils.ConnectionUtils; +import io.datavines.registry.api.ConnectionListener; +import io.datavines.registry.api.Registry; +import io.datavines.registry.api.ServerInfo; +import io.datavines.registry.api.SubscribeListener; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; + +@Slf4j +public class PostgresqlRegistry implements Registry { + + private PostgresqlMutex postgresqlMutex; + + private PostgresqlServerStateManager postgresqlServerStateManager; + + @Override + public void init(Properties properties) throws Exception { + + Connection connection = ConnectionUtils.getConnection(properties); + + if (connection == null) { + throw new Exception("can not create connection"); + } + + try { + postgresqlMutex = new PostgresqlMutex(connection, properties); + postgresqlServerStateManager = new PostgresqlServerStateManager(connection, properties); + } catch (SQLException exception) { + log.error("init postgresql mutex error: " + exception.getLocalizedMessage()); + } + } + + @Override + public boolean acquire(String key, long timeout) { + try { + return postgresqlMutex.acquire(key, timeout); + } catch (Exception e) { + log.warn("acquire lock error: ", e); + return false; + } + } + + @Override + public boolean release(String key) { + try { + return postgresqlMutex.release(key); + } catch (Exception e) { + log.warn("acquire lock error: ", e); + return false; + } + } + + @Override + public void subscribe(String key, SubscribeListener subscribeListener) { + try { + postgresqlServerStateManager.registry(subscribeListener); + } catch (Exception e) { + log.warn("subscribe error: ", e); + } + } + + @Override + public void unSubscribe(String key) { + try { + postgresqlServerStateManager.unRegistry(); + } catch (Exception e) { + log.warn("unSubscribe error: ", e); + } + } + + @Override + public void addConnectionListener(ConnectionListener connectionListener) { + + } + + @Override + public List getActiveServerList() { + return postgresqlServerStateManager.getActiveServerList(); + } + + @Override + public void close() throws SQLException { + postgresqlMutex.close(); + postgresqlServerStateManager.close(); + } + + @Override + public String getPluginName() { + return "postgresql"; + } +} diff --git a/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlServerStateManager.java b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlServerStateManager.java new file mode 100644 index 000000000..842b591b1 --- /dev/null +++ b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/PostgresqlServerStateManager.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.registry.plugin; + +import io.datavines.common.utils.ConnectionUtils; +import io.datavines.common.utils.NetUtils; +import io.datavines.common.utils.Stopper; +import io.datavines.registry.api.Event; +import io.datavines.registry.api.ServerInfo; +import io.datavines.registry.api.SubscribeListener; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Slf4j +public class PostgresqlServerStateManager { + + private Connection connection; + + private final ConcurrentHashMap liveServerMap = new ConcurrentHashMap<>(); + + private Set deadServers = new HashSet<>(); + + private SubscribeListener subscribeListener; + + private final ServerInfo serverInfo; + + private final Properties properties; + + public PostgresqlServerStateManager(Connection connection, Properties properties) throws SQLException { + this.connection = connection; + this.properties = properties; + serverInfo = new ServerInfo(NetUtils.getHost(), Integer.valueOf((String) properties.get("server.port")), + new Timestamp(System.currentTimeMillis()), new Timestamp(System.currentTimeMillis())); + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2); + executorService.scheduleAtFixedRate(new HeartBeater(), 2, 2, TimeUnit.SECONDS); + executorService.scheduleAtFixedRate(new ServerChecker(), 5, 10, TimeUnit.SECONDS); + } + + public void registry(SubscribeListener subscribeListener) throws SQLException { + + if (isExists(serverInfo)) { + executeUpdate(serverInfo); + } else { + executeInsert(serverInfo); + } + + liveServerMap.putAll(fetchServers()); + this.subscribeListener = subscribeListener; + refreshServer(); + } + + public void unRegistry() throws SQLException { + executeDelete(serverInfo); + } + + public void refreshServer() throws SQLException { + ConcurrentHashMap newServers = fetchServers(); + Set offlineServer = new HashSet<>(); + if (newServers == null) { + return; + } + Set onlineServer = new HashSet<>(); + newServers.forEach((k, v) -> { + long updateTime = v.getUpdateTime().getTime(); + long now = System.currentTimeMillis(); + if (now - updateTime > 20000) { + offlineServer.add(k); + } else { + onlineServer.add(k); + } + }); + + liveServerMap.forEach((k, v) -> { + if (newServers.get(k) == null) { + offlineServer.add(k); + } + }); + + offlineServer.forEach(x -> { + if (!deadServers.contains(x) && !x.equals(serverInfo.getAddr())) { + String[] values = x.split(":"); + try { + executeDelete(new ServerInfo(values[0], Integer.valueOf(values[1]))); + liveServerMap.remove(x); + } catch (SQLException e) { + log.error("delete server info error", e); + } + subscribeListener.notify(Event.builder().key(x).type(Event.Type.REMOVE).build()); + } + }); + + deadServers.addAll(offlineServer); + + deadServers = deadServers + .stream() + .filter(x -> !onlineServer.contains(x)) + .collect(Collectors.toSet()); + + onlineServer.forEach(x -> { + if (liveServerMap.isEmpty()) { + return; + } + if (liveServerMap.get(x) == null && !x.equals(serverInfo.getAddr())) { + liveServerMap.put(x, newServers.get(x)); + subscribeListener.notify(Event.builder().key(x).type(Event.Type.ADD).build()); + } + }); + } + + private void executeInsert(ServerInfo serverInfo) throws SQLException { + checkConnection(); + PreparedStatement preparedStatement = connection.prepareStatement( + "insert into dv_server (host, port) values (?, ?)"); + preparedStatement.setString(1, serverInfo.getHost()); + preparedStatement.setInt(2, serverInfo.getServerPort()); + preparedStatement.executeUpdate(); + preparedStatement.close(); + } + + private void executeUpdate(ServerInfo serverInfo) throws SQLException { + checkConnection(); + PreparedStatement preparedStatement = connection.prepareStatement( + "update dv_server set update_time = ? where host = ? and port = ?"); + preparedStatement.setTimestamp(1, new Timestamp(System.currentTimeMillis())); + preparedStatement.setString(2, serverInfo.getHost()); + preparedStatement.setInt(3, serverInfo.getServerPort()); + preparedStatement.executeUpdate(); + preparedStatement.close(); + } + + private void executeDelete(ServerInfo serverInfo) throws SQLException { + checkConnection(); + PreparedStatement preparedStatement = connection.prepareStatement( + "delete from dv_server where host = ? and port = ?"); + preparedStatement.setString(1, serverInfo.getHost()); + preparedStatement.setInt(2, serverInfo.getServerPort()); + preparedStatement.executeUpdate(); + preparedStatement.close(); + } + + private boolean isExists(ServerInfo serverInfo) throws SQLException { + checkConnection(); + PreparedStatement preparedStatement = connection.prepareStatement( + "select * from dv_server where host= ? and port= ?", + ResultSet.TYPE_SCROLL_INSENSITIVE, + ResultSet.CONCUR_READ_ONLY); + preparedStatement.setString(1, serverInfo.getHost()); + preparedStatement.setInt(2, serverInfo.getServerPort()); + ResultSet resultSet = preparedStatement.executeQuery(); + + if (resultSet == null) { + preparedStatement.close(); + return false; + } + boolean result = resultSet.first(); + resultSet.close(); + preparedStatement.close(); + return result; + } + + private ConcurrentHashMap fetchServers() throws SQLException { + checkConnection(); + PreparedStatement preparedStatement = connection.prepareStatement("select * from dv_server"); + ResultSet resultSet = preparedStatement.executeQuery(); + + if (resultSet == null) { + preparedStatement.close(); + return null; + } + + ConcurrentHashMap map = new ConcurrentHashMap<>(); + while (resultSet.next()) { + String host = resultSet.getString("host"); + int port = resultSet.getInt("port"); + Timestamp updateTime = resultSet.getTimestamp("update_time"); + Timestamp createTime = resultSet.getTimestamp("create_time"); + map.put(host + ":" + port, new ServerInfo(host, port, createTime, updateTime)); + } + resultSet.close(); + preparedStatement.close(); + return map; + } + + public List getActiveServerList() { + List activeServerList = new ArrayList<>(); + liveServerMap.forEach((k, v) -> { + String[] values = k.split(":"); + if (values.length == 2) { + activeServerList.add(v); + } + }); + return activeServerList; + } + + class HeartBeater implements Runnable { + + @Override + public void run() { + if (Stopper.isRunning()) { + try { + if (isExists(serverInfo)) { + executeUpdate(serverInfo); + } else { + executeInsert(serverInfo); + } + } catch (SQLException e) { + log.error("heartbeat error", e); + } + } + } + } + + class ServerChecker implements Runnable { + + @Override + public void run() { + if (Stopper.isRunning()) { + try { + refreshServer(); + } catch (SQLException e) { + log.error("server check error", e); + } + } + } + } + + private void checkConnection() throws SQLException { + if (connection == null || connection.isClosed()) { + connection = ConnectionUtils.getConnection(properties); + } + } + + public void close() throws SQLException { + if (connection != null && !connection.isClosed()) { + connection.close(); + } + } +} diff --git a/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/RegistryLock.java b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/RegistryLock.java new file mode 100644 index 000000000..dfb36d7c9 --- /dev/null +++ b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/java/io/datavines/registry/plugin/RegistryLock.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.datavines.registry.plugin; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.sql.Timestamp; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class RegistryLock { + + private String lockKey; + + private String lockOwner; + + private Timestamp updateTime; +} diff --git a/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/resources/META-INF/services/io.datavines.registry.api.Registry b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/resources/META-INF/services/io.datavines.registry.api.Registry new file mode 100644 index 000000000..b193e993c --- /dev/null +++ b/datavines-registry/datavines-registry-plugins/datavines-registry-postgresql/src/main/resources/META-INF/services/io.datavines.registry.api.Registry @@ -0,0 +1 @@ +io.datavines.registry.plugin.PostgresqlRegistry diff --git a/datavines-registry/datavines-registry-plugins/pom.xml b/datavines-registry/datavines-registry-plugins/pom.xml index 7d1069175..e9a4b3821 100644 --- a/datavines-registry/datavines-registry-plugins/pom.xml +++ b/datavines-registry/datavines-registry-plugins/pom.xml @@ -32,6 +32,7 @@ pom datavines-registry-mysql + datavines-registry-postgresql datavines-registry-zookeeper @@ -49,4 +50,4 @@ - \ No newline at end of file + diff --git a/datavines-server/pom.xml b/datavines-server/pom.xml index b05280921..287cddbd0 100644 --- a/datavines-server/pom.xml +++ b/datavines-server/pom.xml @@ -1,21 +1,21 @@ ${project.version} - - io.datavines - datavines-engine-spark-executor - ${project.version} - + + io.datavines + datavines-engine-spark-executor + ${project.version} + io.datavines @@ -320,12 +320,12 @@ ${project.version} - - io.datavines - datavines-engine-local-executor - ${project.version} - - + + io.datavines + datavines-engine-local-executor + ${project.version} + + slf4j-log4j12 org.slf4j @@ -342,12 +342,12 @@ ${project.version} - - io.datavines - datavines-engine-flink-executor - ${project.version} - - + + io.datavines + datavines-engine-flink-executor + ${project.version} + + org.apache.hadoop hadoop-common @@ -366,9 +366,9 @@ - - io.datavines - datavines-connector-all + + io.datavines + datavines-connector-all ${project.version} @@ -509,6 +509,12 @@ ${project.version} + + io.datavines + datavines-registry-postgresql + ${project.version} + + io.datavines datavines-registry-zookeeper diff --git a/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java b/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java index db5c999ca..3922590be 100644 --- a/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java +++ b/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java @@ -19,16 +19,32 @@ import com.baomidou.mybatisplus.annotation.DbType; import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor; +import com.zaxxer.hikari.HikariDataSource; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import javax.sql.DataSource; + @Configuration public class MybatisPlusConfig { @Bean - public MybatisPlusInterceptor mybatisPlusInterceptor() { + public MybatisPlusInterceptor mybatisPlusInterceptor(DataSource dataSource) { MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); - interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL)); + interceptor.addInnerInterceptor(new PaginationInnerInterceptor(resolveDbType(dataSource))); return interceptor; } + + private DbType resolveDbType(DataSource dataSource) { + if (dataSource instanceof HikariDataSource) { + HikariDataSource hikariDataSource = (HikariDataSource) dataSource; + String jdbcUrl = hikariDataSource.getJdbcUrl(); + String driverClassName = hikariDataSource.getDriverClassName(); + if ((jdbcUrl != null && jdbcUrl.contains("postgresql")) + || (driverClassName != null && driverClassName.contains("postgresql"))) { + return DbType.POSTGRE_SQL; + } + } + return DbType.MYSQL; + } } diff --git a/datavines-server/src/main/java/io/datavines/server/scheduler/metadata/task/CatalogMetaDataFetchExecutorImpl.java b/datavines-server/src/main/java/io/datavines/server/scheduler/metadata/task/CatalogMetaDataFetchExecutorImpl.java index 606e1e44e..854ce2b88 100644 --- a/datavines-server/src/main/java/io/datavines/server/scheduler/metadata/task/CatalogMetaDataFetchExecutorImpl.java +++ b/datavines-server/src/main/java/io/datavines/server/scheduler/metadata/task/CatalogMetaDataFetchExecutorImpl.java @@ -80,7 +80,7 @@ public CatalogMetaDataFetchExecutorImpl(CommonTaskRequest request) { this.dataSource = request.getDataSource(); this.connectorFactory = PluginDiscovery.getMultiKeyPluginDiscovery(ConnectorFactory.class, ConnectorFactory::getPluginNames) - + .getOrCreatePlugin(dataSource.getType()); this.instanceService = SpringApplicationContext.getBean(CatalogEntityInstanceService.class); @@ -181,8 +181,7 @@ private void executeFetchDataSource() throws SQLException { if (CollectionUtils.isNotEmpty(createDatabaseEntityList)) { for (String database : createDatabaseEntityList) { DatabaseInfo databaseInfo = databaseInfoMap.get(database); - if ("sys".equals(databaseInfo.getName()) || "information_schema".equals(databaseInfo.getName()) || - "performance_schema".equals(databaseInfo.getName()) || "mysql".equals(databaseInfo.getName())) { + if (isSystemDatabase(dataSource.getType(), databaseInfo.getName())) { continue; } @@ -712,4 +711,24 @@ private boolean isTypeChange(String oldType, String newType) { return StringUtils.isNotEmpty(newType) && StringUtils.isNotEmpty(oldType) && !oldType.equals(newType); } + + private boolean isSystemDatabase(String dataSourceType, String databaseName) { + if (StringUtils.isEmpty(databaseName)) { + return true; + } + + String normalizedDataSourceType = StringUtils.isEmpty(dataSourceType) ? "" : dataSourceType.toLowerCase(); + String normalizedDatabaseName = databaseName.toLowerCase(); + + if ("postgresql".equals(normalizedDataSourceType) || "postgres".equals(normalizedDataSourceType)) { + return "postgres".equals(normalizedDatabaseName) + || "template0".equals(normalizedDatabaseName) + || "template1".equals(normalizedDatabaseName); + } + + return "sys".equals(normalizedDatabaseName) + || "information_schema".equals(normalizedDatabaseName) + || "performance_schema".equals(normalizedDatabaseName) + || "mysql".equals(normalizedDatabaseName); + } } diff --git a/datavines-server/src/main/resources/mapper/CatalogEntityMetricJobRelMapper.xml b/datavines-server/src/main/resources/mapper/CatalogEntityMetricJobRelMapper.xml index 70e9a8bfe..61ebc17fe 100644 --- a/datavines-server/src/main/resources/mapper/CatalogEntityMetricJobRelMapper.xml +++ b/datavines-server/src/main/resources/mapper/CatalogEntityMetricJobRelMapper.xml @@ -25,7 +25,7 @@ @@ -33,8 +33,8 @@ - \ No newline at end of file + diff --git a/datavines-server/src/main/resources/mapper/DataSourceMapper.xml b/datavines-server/src/main/resources/mapper/DataSourceMapper.xml index ff6f5d358..a8e211b31 100644 --- a/datavines-server/src/main/resources/mapper/DataSourceMapper.xml +++ b/datavines-server/src/main/resources/mapper/DataSourceMapper.xml @@ -25,13 +25,13 @@ - \ No newline at end of file + diff --git a/datavines-server/src/main/resources/mapper/ErrorDataStorageMapper.xml b/datavines-server/src/main/resources/mapper/ErrorDataStorageMapper.xml index eecabf55d..50508bd42 100644 --- a/datavines-server/src/main/resources/mapper/ErrorDataStorageMapper.xml +++ b/datavines-server/src/main/resources/mapper/ErrorDataStorageMapper.xml @@ -37,17 +37,17 @@ - select p.id, p.name, p.type, p.param, p.workspace_id, u.username as updater, p.update_time from () p - left join `dv_user` u on u.id = p.update_by + left join dv_user u on u.id = p.update_by - LOWER(p.`name`) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%') + LOWER(p.name) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%') - \ No newline at end of file + diff --git a/datavines-server/src/main/resources/mapper/JobExecutionMapper.xml b/datavines-server/src/main/resources/mapper/JobExecutionMapper.xml index d2f770f58..c23d66f78 100644 --- a/datavines-server/src/main/resources/mapper/JobExecutionMapper.xml +++ b/datavines-server/src/main/resources/mapper/JobExecutionMapper.xml @@ -82,7 +82,7 @@ - select DATE_FORMAT(create_time, '%Y-%m-%d') AS create_date, status from dv_job_execution + select TO_CHAR(create_time, 'YYYY-MM-DD') AS create_date, status from dv_job_execution and metric_type = #{metricType} @@ -113,7 +113,7 @@ from () p - LOWER(p.`name`) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%') + LOWER(p.name) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%') AND LOWER(p.schema_name) LIKE CONCAT(CONCAT('%', LOWER(#{schemaSearch})), '%') @@ -130,7 +130,7 @@ select p.job_id, count(1) as total_count, - sum(if(status = 6, 1, 0)) as fail_count, - sum(if(status = 7, 1, 0)) as success_count, + sum(case when status = 6 then 1 else 0 end) as fail_count, + sum(case when status = 7 then 1 else 0 end) as success_count, max(start_time) as last_job_execution_time, min(start_time) as first_job_execution_time from dv_job_execution p diff --git a/datavines-server/src/main/resources/mapper/JobMapper.xml b/datavines-server/src/main/resources/mapper/JobMapper.xml index f3ef9475b..3b2d61e86 100644 --- a/datavines-server/src/main/resources/mapper/JobMapper.xml +++ b/datavines-server/src/main/resources/mapper/JobMapper.xml @@ -25,11 +25,11 @@ select p.id, p.name, p.schema_name ,p.table_name,p.column_name, p.type, u.username as updater, p.update_time, s.cron_expression - from () p left join `dv_user` u on u.id = p.update_by - left join `dv_job_schedule` s on p.id = s.job_id and s.status = 1 + from () p left join dv_user u on u.id = p.update_by + left join dv_job_schedule s on p.id = s.job_id and s.status = 1 - AND LOWER(p.`name`) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%') + AND LOWER(p.name) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%') AND LOWER(p.schema_name) LIKE CONCAT(CONCAT('%', LOWER(#{schemaSearch})), '%') @@ -64,4 +64,4 @@ - \ No newline at end of file + diff --git a/datavines-server/src/main/resources/mapper/SlaJobMapper.xml b/datavines-server/src/main/resources/mapper/SlaJobMapper.xml index 3fb68c2a5..14687ec30 100644 --- a/datavines-server/src/main/resources/mapper/SlaJobMapper.xml +++ b/datavines-server/src/main/resources/mapper/SlaJobMapper.xml @@ -63,10 +63,10 @@ ON dsj.job_id = dj.id - LOWER(dj.`name`) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%') + LOWER(dj.name) LIKE CONCAT(CONCAT('%', LOWER(#{searchVal})), '%') ORDER BY dsj.update_time DESC - \ No newline at end of file + diff --git a/scripts/sql/datavines-postgresql.sql b/scripts/sql/datavines-postgresql.sql new file mode 100644 index 000000000..0db97cf67 --- /dev/null +++ b/scripts/sql/datavines-postgresql.sql @@ -0,0 +1,892 @@ +DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS CASCADE; +DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS CASCADE; +DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS CASCADE; +DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS CASCADE; +DROP TABLE IF EXISTS QRTZ_TRIGGERS CASCADE; +DROP TABLE IF EXISTS QRTZ_JOB_DETAILS CASCADE; +DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS CASCADE; +DROP TABLE IF EXISTS QRTZ_CALENDARS CASCADE; +DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS CASCADE; +DROP TABLE IF EXISTS QRTZ_LOCKS CASCADE; +DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE CASCADE; +CREATE TABLE QRTZ_JOB_DETAILS ( + SCHED_NAME varchar(120) NOT NULL, + JOB_NAME varchar(200) NOT NULL, + JOB_GROUP varchar(200) NOT NULL, + DESCRIPTION varchar(250) DEFAULT NULL, + JOB_CLASS_NAME varchar(250) NOT NULL, + IS_DURABLE varchar(1) NOT NULL, + IS_NONCONCURRENT varchar(1) NOT NULL, + IS_UPDATE_DATA varchar(1) NOT NULL, + REQUESTS_RECOVERY varchar(1) NOT NULL, + JOB_DATA bytea, + PRIMARY KEY (SCHED_NAME, JOB_NAME, JOB_GROUP) +); +CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS (SCHED_NAME, REQUESTS_RECOVERY); +CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS (SCHED_NAME, JOB_GROUP); +CREATE TABLE QRTZ_TRIGGERS ( + SCHED_NAME varchar(120) NOT NULL, + TRIGGER_NAME varchar(200) NOT NULL, + TRIGGER_GROUP varchar(200) NOT NULL, + JOB_NAME varchar(200) NOT NULL, + JOB_GROUP varchar(200) NOT NULL, + DESCRIPTION varchar(250) DEFAULT NULL, + NEXT_FIRE_TIME bigint DEFAULT NULL, + PREV_FIRE_TIME bigint DEFAULT NULL, + PRIORITY integer DEFAULT NULL, + TRIGGER_STATE varchar(16) NOT NULL, + TRIGGER_TYPE varchar(8) NOT NULL, + START_TIME bigint NOT NULL, + END_TIME bigint DEFAULT NULL, + CALENDAR_NAME varchar(200) DEFAULT NULL, + MISFIRE_INSTR smallint DEFAULT NULL, + JOB_DATA bytea, + PRIMARY KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP), + CONSTRAINT QRTZ_TRIGGERS_ibfk_1 + FOREIGN KEY (SCHED_NAME, JOB_NAME, JOB_GROUP) + REFERENCES QRTZ_JOB_DETAILS (SCHED_NAME, JOB_NAME, JOB_GROUP) +); +CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS (SCHED_NAME, JOB_NAME, JOB_GROUP); +CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS (SCHED_NAME, JOB_GROUP); +CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS (SCHED_NAME, CALENDAR_NAME); +CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_GROUP); +CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_STATE); +CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, TRIGGER_STATE); +CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_GROUP, TRIGGER_STATE); +CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS (SCHED_NAME, NEXT_FIRE_TIME); +CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_STATE, NEXT_FIRE_TIME); +CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS (SCHED_NAME, MISFIRE_INSTR, NEXT_FIRE_TIME); +CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS (SCHED_NAME, MISFIRE_INSTR, NEXT_FIRE_TIME, TRIGGER_STATE); +CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS (SCHED_NAME, MISFIRE_INSTR, NEXT_FIRE_TIME, TRIGGER_GROUP, TRIGGER_STATE); +CREATE TABLE QRTZ_SIMPLE_TRIGGERS ( + SCHED_NAME varchar(120) NOT NULL, + TRIGGER_NAME varchar(200) NOT NULL, + TRIGGER_GROUP varchar(200) NOT NULL, + REPEAT_COUNT bigint NOT NULL, + REPEAT_INTERVAL bigint NOT NULL, + TIMES_TRIGGERED bigint NOT NULL, + PRIMARY KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP), + CONSTRAINT QRTZ_SIMPLE_TRIGGERS_ibfk_1 + FOREIGN KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) + REFERENCES QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) +); +CREATE TABLE QRTZ_CRON_TRIGGERS ( + SCHED_NAME varchar(120) NOT NULL, + TRIGGER_NAME varchar(200) NOT NULL, + TRIGGER_GROUP varchar(200) NOT NULL, + CRON_EXPRESSION varchar(120) NOT NULL, + TIME_ZONE_ID varchar(80) DEFAULT NULL, + PRIMARY KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP), + CONSTRAINT QRTZ_CRON_TRIGGERS_ibfk_1 + FOREIGN KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) + REFERENCES QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) +); +CREATE TABLE QRTZ_SIMPROP_TRIGGERS ( + SCHED_NAME varchar(120) NOT NULL, + TRIGGER_NAME varchar(200) NOT NULL, + TRIGGER_GROUP varchar(200) NOT NULL, + STR_PROP_1 varchar(512) DEFAULT NULL, + STR_PROP_2 varchar(512) DEFAULT NULL, + STR_PROP_3 varchar(512) DEFAULT NULL, + INT_PROP_1 integer DEFAULT NULL, + INT_PROP_2 integer DEFAULT NULL, + LONG_PROP_1 bigint DEFAULT NULL, + LONG_PROP_2 bigint DEFAULT NULL, + DEC_PROP_1 decimal(13,4) DEFAULT NULL, + DEC_PROP_2 decimal(13,4) DEFAULT NULL, + BOOL_PROP_1 varchar(1) DEFAULT NULL, + BOOL_PROP_2 varchar(1) DEFAULT NULL, + PRIMARY KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP), + CONSTRAINT QRTZ_SIMPROP_TRIGGERS_ibfk_1 + FOREIGN KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) + REFERENCES QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) +); +CREATE TABLE QRTZ_BLOB_TRIGGERS ( + SCHED_NAME varchar(120) NOT NULL, + TRIGGER_NAME varchar(200) NOT NULL, + TRIGGER_GROUP varchar(200) NOT NULL, + BLOB_DATA bytea, + PRIMARY KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP), + CONSTRAINT QRTZ_BLOB_TRIGGERS_ibfk_1 + FOREIGN KEY (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) + REFERENCES QRTZ_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP) +); +CREATE TABLE QRTZ_FIRED_TRIGGERS ( + SCHED_NAME varchar(120) NOT NULL, + ENTRY_ID varchar(200) NOT NULL, + TRIGGER_NAME varchar(200) NOT NULL, + TRIGGER_GROUP varchar(200) NOT NULL, + INSTANCE_NAME varchar(200) NOT NULL, + FIRED_TIME bigint NOT NULL, + SCHED_TIME bigint NOT NULL, + PRIORITY integer NOT NULL, + STATE varchar(16) NOT NULL, + JOB_NAME varchar(200) DEFAULT NULL, + JOB_GROUP varchar(200) DEFAULT NULL, + IS_NONCONCURRENT varchar(1) DEFAULT NULL, + REQUESTS_RECOVERY varchar(1) DEFAULT NULL, + PRIMARY KEY (SCHED_NAME, ENTRY_ID) +); +CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS (SCHED_NAME, INSTANCE_NAME); +CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS (SCHED_NAME, INSTANCE_NAME, REQUESTS_RECOVERY); +CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS (SCHED_NAME, JOB_NAME, JOB_GROUP); +CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS (SCHED_NAME, JOB_GROUP); +CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP); +CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS (SCHED_NAME, TRIGGER_GROUP); +CREATE TABLE QRTZ_CALENDARS ( + SCHED_NAME varchar(120) NOT NULL, + CALENDAR_NAME varchar(200) NOT NULL, + CALENDAR bytea NOT NULL, + PRIMARY KEY (SCHED_NAME, CALENDAR_NAME) +); +CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS ( + SCHED_NAME varchar(120) NOT NULL, + TRIGGER_GROUP varchar(200) NOT NULL, + PRIMARY KEY (SCHED_NAME, TRIGGER_GROUP) +); +CREATE TABLE QRTZ_LOCKS ( + SCHED_NAME varchar(120) NOT NULL, + LOCK_NAME varchar(40) NOT NULL, + PRIMARY KEY (SCHED_NAME, LOCK_NAME) +); +CREATE TABLE QRTZ_SCHEDULER_STATE ( + SCHED_NAME varchar(120) NOT NULL, + INSTANCE_NAME varchar(200) NOT NULL, + LAST_CHECKIN_TIME bigint NOT NULL, + CHECKIN_INTERVAL bigint NOT NULL, + PRIMARY KEY (SCHED_NAME, INSTANCE_NAME) +); +-- ---------------------------- +-- Table structure for dv_actual_values +-- ---------------------------- +DROP TABLE IF EXISTS dv_actual_values; +CREATE TABLE dv_actual_values ( + id bigserial, + job_execution_id bigint DEFAULT NULL, + metric_name varchar(255) DEFAULT NULL, + unique_code varchar(255) DEFAULT NULL, + actual_value decimal(20,4) DEFAULT NULL, + data_time timestamp DEFAULT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_common_task_command +-- ---------------------------- +DROP TABLE IF EXISTS dv_common_task_command; +CREATE TABLE dv_common_task_command ( + id bigserial, + task_id bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_catalog_entity_definition +-- ---------------------------- +DROP TABLE IF EXISTS dv_catalog_entity_definition; +CREATE TABLE dv_catalog_entity_definition ( + id bigserial, + uuid varchar(64) NOT NULL, + name varchar(255) NOT NULL, + description varchar(255) DEFAULT NULL, + properties text, + super_uuid varchar(64) NOT NULL DEFAULT '-1', + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (uuid) +); + +-- ---------------------------- +-- Table structure for dv_catalog_entity_instance +-- ---------------------------- +DROP TABLE IF EXISTS dv_catalog_entity_instance; +CREATE TABLE dv_catalog_entity_instance ( + id bigserial, + uuid varchar(64) NOT NULL, + type varchar(127) NOT NULL, + datasource_id bigint NOT NULL, + fully_qualified_name varchar(255) NOT NULL, + display_name varchar(255) NOT NULL, + description varchar(1024) DEFAULT NULL, + properties text, + owner varchar(255) DEFAULT NULL, + version varchar(64) NOT NULL DEFAULT '1.0', + status varchar(255) DEFAULT 'active', + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + PRIMARY KEY (id), + UNIQUE (uuid), + UNIQUE (datasource_id,fully_qualified_name,status) +); +CREATE INDEX full_idx_display_name_description ON dv_catalog_entity_instance (display_name,description); + +-- ---------------------------- +-- Table structure for dv_catalog_entity_metric_job_rel +-- ---------------------------- +DROP TABLE IF EXISTS dv_catalog_entity_metric_job_rel; +CREATE TABLE dv_catalog_entity_metric_job_rel ( + id bigserial, + entity_uuid varchar(64) NOT NULL, + metric_job_id bigint NOT NULL, + metric_job_type varchar(255) NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (entity_uuid,metric_job_id,metric_job_type) +); + +-- ---------------------------- +-- Table structure for dv_catalog_entity_profile +-- ---------------------------- +DROP TABLE IF EXISTS dv_catalog_entity_profile; +CREATE TABLE dv_catalog_entity_profile ( + id bigserial, + entity_uuid varchar(64) NOT NULL, + metric_name varchar(255) NOT NULL, + actual_value text NOT NULL, + actual_value_type varchar(255) DEFAULT NULL, + data_date varchar(255) DEFAULT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (entity_uuid,metric_name,data_date) +); + +-- ---------------------------- +-- Table structure for dv_catalog_entity_rel +-- ---------------------------- +DROP TABLE IF EXISTS dv_catalog_entity_rel; +CREATE TABLE dv_catalog_entity_rel ( + id bigserial, + entity1_uuid varchar(64) NOT NULL, + entity2_uuid varchar(64) NOT NULL, + type varchar(64) NOT NULL, + source_type varchar(64) DEFAULT NULL, + related_script text DEFAULT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + PRIMARY KEY (id), + UNIQUE (entity1_uuid,entity2_uuid,type) +); +CREATE INDEX idx_entity2_uuid ON dv_catalog_entity_rel (entity2_uuid); + +-- ---------------------------- +-- Table structure for dv_catalog_entity_tag_rel +-- ---------------------------- +DROP TABLE IF EXISTS dv_catalog_entity_tag_rel; +CREATE TABLE dv_catalog_entity_tag_rel ( + id bigserial, + entity_uuid varchar(64) NOT NULL, + tag_uuid varchar(64) NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (entity_uuid,tag_uuid) +); +CREATE INDEX dv_catalog_entity_tag_rel_idx_entity2_uuid ON dv_catalog_entity_tag_rel (tag_uuid); + +-- ---------------------------- +-- Table structure for dv_catalog_schema_change +-- ---------------------------- +DROP TABLE IF EXISTS dv_catalog_schema_change; +CREATE TABLE dv_catalog_schema_change ( + id bigserial, + parent_uuid varchar(64) NOT NULL, + entity_uuid varchar(64) NOT NULL, + change_type varchar(64) NOT NULL, + database_name varchar(255) DEFAULT NULL, + table_name varchar(255) DEFAULT NULL, + column_name varchar(255) DEFAULT NULL, + change_before text DEFAULT NULL, + change_after text DEFAULT NULL, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_catalog_tag +-- ---------------------------- +DROP TABLE IF EXISTS dv_catalog_tag; +CREATE TABLE dv_catalog_tag ( + id bigserial, + uuid varchar(64) NOT NULL, + category_uuid varchar(64) NOT NULL, + name varchar(256) NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (uuid,category_uuid,name) +); + +-- ---------------------------- +-- Table structure for dv_catalog_tag_category +-- ---------------------------- +DROP TABLE IF EXISTS dv_catalog_tag_category; +CREATE TABLE dv_catalog_tag_category ( + id bigserial, + uuid varchar(64) NOT NULL, + name varchar(256) NOT NULL, + workspace_id bigint NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (uuid,name) +); + +-- ---------------------------- +-- Table structure for dv_common_task +-- ---------------------------- +DROP TABLE IF EXISTS dv_common_task; +CREATE TABLE dv_common_task ( + id bigserial, + task_type varchar(128) DEFAULT NULL, + type varchar(128) DEFAULT NULL, + datasource_id bigint NOT NULL DEFAULT '-1', + database_name varchar(128) DEFAULT NULL, + table_name varchar(128) DEFAULT NULL, + status integer DEFAULT NULL, + parameter text, + execute_host varchar(255) DEFAULT NULL, + submit_time timestamp DEFAULT NULL, + schedule_time timestamp DEFAULT NULL, + start_time timestamp DEFAULT NULL, + end_time timestamp DEFAULT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_common_task_schedule +-- ---------------------------- +DROP TABLE IF EXISTS dv_common_task_schedule; +CREATE TABLE dv_common_task_schedule ( + id bigserial, + task_type varchar(128) DEFAULT NULL, + type varchar(255) NOT NULL, + param text, + datasource_id bigint NOT NULL, + cron_expression varchar(255) DEFAULT NULL, + status smallint DEFAULT NULL, + start_time timestamp DEFAULT NULL, + end_time timestamp DEFAULT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_command +-- ---------------------------- +DROP TABLE IF EXISTS dv_command; +CREATE TABLE dv_command ( + id bigserial, + type smallint NOT NULL DEFAULT '0', + parameter text, + execute_host varchar(255), + job_execution_id bigint NOT NULL, + priority integer DEFAULT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_datasource +-- ---------------------------- +DROP TABLE IF EXISTS dv_datasource; +CREATE TABLE dv_datasource ( + id bigserial, + uuid varchar(64) NOT NULL, + name varchar(255) NOT NULL, + category varchar(255) DEFAULT 'database', + type varchar(255) NOT NULL, + param text NOT NULL, + param_code text NULL, + workspace_id bigint NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (name) +); + +-- ---------------------------- +-- Table structure for dv_env +-- ---------------------------- +DROP TABLE IF EXISTS dv_env; +CREATE TABLE dv_env ( + id bigserial, + name varchar(255) NOT NULL, + env text NOT NULL, + workspace_id bigint NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (name) +); + +DROP TABLE IF EXISTS dv_access_token; +CREATE TABLE dv_access_token ( + id bigserial, + workspace_id bigint NOT NULL, + user_id bigint NOT NULL, + token varchar(1024) NOT NULL, + expire_time timestamp NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_error_data_storage +-- ---------------------------- +DROP TABLE IF EXISTS dv_error_data_storage; +CREATE TABLE dv_error_data_storage ( + id bigserial, + name varchar(255) NOT NULL, + type varchar(255) NOT NULL, + param text NOT NULL, + workspace_id bigint NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (name,workspace_id) +); + +-- ---------------------------- +-- Table structure for dv_issue +-- ---------------------------- +DROP TABLE IF EXISTS dv_issue; +CREATE TABLE dv_issue ( + id bigserial, + title varchar(1024) DEFAULT NULL, + content text NOT NULL, + status varchar(255) NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_job +-- ---------------------------- +DROP TABLE IF EXISTS dv_job; +CREATE TABLE dv_job ( + id bigserial, + name varchar(255) DEFAULT NULL, + type integer NOT NULL DEFAULT '0', + datasource_id bigint NOT NULL, + datasource_id_2 bigint DEFAULT NULL, + schema_name varchar(128) DEFAULT NULL, + table_name varchar(128) DEFAULT NULL, + column_name varchar(128) DEFAULT NULL, + selected_column text, + metric_type varchar(255) DEFAULT NULL, + execute_platform_type varchar(128) DEFAULT NULL, + execute_platform_parameter text, + engine_type varchar(128) DEFAULT NULL, + engine_parameter text, + error_data_storage_id bigint DEFAULT NULL, + is_error_data_output_to_datasource smallint DEFAULT '0', + error_data_output_to_datasource_database varchar(255) DEFAULT NULL, + parameter text, + retry_times integer DEFAULT NULL, + retry_interval integer DEFAULT NULL, + timeout integer DEFAULT NULL, + timeout_strategy integer DEFAULT NULL, + pre_sql text DEFAULT NULL, + post_sql text DEFAULT NULL, + tenant_code bigint DEFAULT NULL, + env bigint DEFAULT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (name,datasource_id,schema_name,table_name,column_name) +); + +-- ---------------------------- +-- Table structure for dv_job_execution +-- ---------------------------- +DROP TABLE IF EXISTS dv_job_execution; +CREATE TABLE dv_job_execution ( + id bigserial, + name varchar(255) NOT NULL, + job_id bigint NOT NULL DEFAULT '-1', + job_type integer NOT NULL DEFAULT '0', + schema_name varchar(128) DEFAULT NULL, + table_name varchar(128) DEFAULT NULL, + column_name varchar(128) DEFAULT NULL, + metric_type varchar(255) DEFAULT NULL, + datasource_id bigint NOT NULL DEFAULT '-1', + execute_platform_type varchar(128) DEFAULT NULL, + execute_platform_parameter text, + engine_type varchar(128) DEFAULT NULL, + engine_parameter text, + error_data_storage_type varchar(128) DEFAULT NULL, + error_data_storage_parameter text, + error_data_file_name varchar(255) DEFAULT NULL, + parameter text NOT NULL, + status integer DEFAULT NULL, + retry_times integer DEFAULT NULL, + retry_interval integer DEFAULT NULL, + timeout integer DEFAULT NULL, + timeout_strategy integer DEFAULT NULL, + pre_sql text DEFAULT NULL, + post_sql text DEFAULT NULL, + tenant_code varchar(255) DEFAULT NULL, + execute_host varchar(255) DEFAULT NULL, + application_id varchar(255) DEFAULT NULL, + application_tag varchar(255) DEFAULT NULL, + process_id integer DEFAULT NULL, + execute_file_path varchar(255) DEFAULT NULL, + log_path varchar(255) DEFAULT NULL, + env text, + submit_time timestamp DEFAULT NULL, + schedule_time timestamp DEFAULT NULL, + start_time timestamp DEFAULT NULL, + end_time timestamp DEFAULT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_job_execution_result +-- ---------------------------- +DROP TABLE IF EXISTS dv_job_execution_result; +CREATE TABLE dv_job_execution_result ( + id bigserial, + job_execution_id bigint DEFAULT NULL, + metric_unique_key varchar(255) DEFAULT NULL, + metric_type varchar(255) DEFAULT NULL, + metric_dimension varchar(255) DEFAULT NULL, + metric_name varchar(255) DEFAULT NULL, + database_name varchar(128) DEFAULT NULL, + table_name varchar(128) DEFAULT NULL, + column_name varchar(128) DEFAULT NULL, + actual_value decimal(20,4) DEFAULT NULL, + expected_value decimal(20,4) DEFAULT NULL, + expected_type varchar(255) DEFAULT NULL, + result_formula varchar(255) DEFAULT NULL, + operator varchar(255) DEFAULT NULL, + threshold decimal(20,4) DEFAULT NULL, + score decimal(20,4) DEFAULT 0, + state integer NOT NULL DEFAULT '0', + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (job_execution_id,metric_unique_key) +); + +-- ---------------------------- +-- Table structure for dv_job_quality_report +-- ---------------------------- +DROP TABLE IF EXISTS dv_job_quality_report; +CREATE TABLE dv_job_quality_report ( + id bigserial, + datasource_id bigint DEFAULT NULL, + entity_level varchar(128) DEFAULT NULL, + database_name varchar(128) DEFAULT NULL, + table_name varchar(128) DEFAULT NULL, + column_name varchar(128) DEFAULT NULL, + score decimal(20,4) DEFAULT NULL, + report_date date DEFAULT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_job_execution_result_report_rel +-- ---------------------------- +DROP TABLE IF EXISTS dv_job_execution_result_report_rel; +CREATE TABLE dv_job_execution_result_report_rel ( + id bigserial, + quality_report_id bigint NOT NULL, + job_execution_result_id bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (job_execution_result_id,quality_report_id) +); + +-- ---------------------------- +-- Table structure for dv_job_issue_rel +-- ---------------------------- +DROP TABLE IF EXISTS dv_job_issue_rel; +CREATE TABLE dv_job_issue_rel ( + id bigserial, + job_id bigint NOT NULL, + issue_id bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (job_id,issue_id) +); +CREATE INDEX dv_job_issue_rel_idx_entity2_uuid ON dv_job_issue_rel (issue_id); + +-- ---------------------------- +-- Table structure for dv_job_schedule +-- ---------------------------- +DROP TABLE IF EXISTS dv_job_schedule; +CREATE TABLE dv_job_schedule ( + id bigserial, + type varchar(255) NOT NULL, + param text, + job_id bigint NOT NULL, + cron_expression varchar(255) DEFAULT NULL, + status smallint DEFAULT NULL, + start_time timestamp DEFAULT NULL, + end_time timestamp DEFAULT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_server +-- ---------------------------- +DROP TABLE IF EXISTS dv_server; +CREATE TABLE dv_server ( + id serial, + host varchar(255) NOT NULL, + port integer NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (host,port) +); + +DROP TABLE IF EXISTS dv_registry_lock; +CREATE TABLE dv_registry_lock +( + id bigserial, + lock_key varchar(256) NOT NULL, + lock_owner varchar(256) NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (lock_key) +); +CREATE INDEX idx_upt ON dv_registry_lock (update_time); + +-- ---------------------------- +-- Table structure for dv_sla +-- ---------------------------- +DROP TABLE IF EXISTS dv_sla; +CREATE TABLE dv_sla ( + id bigserial, + workspace_id bigint NOT NULL, + name varchar(255) NOT NULL, + description varchar(255) NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_sla_job +-- ---------------------------- +DROP TABLE IF EXISTS dv_sla_job; +CREATE TABLE dv_sla_job ( + id bigserial, + workspace_id bigint NOT NULL, + sla_id bigint NOT NULL, + job_id bigint NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (workspace_id,sla_id,job_id) +); + +-- ---------------------------- +-- Table structure for dv_sla_notification +-- ---------------------------- +DROP TABLE IF EXISTS dv_sla_notification; +CREATE TABLE dv_sla_notification ( + id bigserial, + type varchar(40) NOT NULL, + workspace_id bigint NOT NULL, + sla_id bigint NOT NULL, + sender_id bigint NOT NULL, + config text, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_sla_sender +-- ---------------------------- +DROP TABLE IF EXISTS dv_sla_sender; +CREATE TABLE dv_sla_sender ( + id bigserial, + type varchar(40) NOT NULL, + name varchar(255) NOT NULL, + workspace_id bigint NOT NULL, + config text NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_tenant +-- ---------------------------- +DROP TABLE IF EXISTS dv_tenant; +CREATE TABLE dv_tenant ( + id bigserial, + tenant varchar(255) NOT NULL, + workspace_id bigint NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (tenant) +); + +-- ---------------------------- +-- Table structure for dv_user +-- ---------------------------- +DROP TABLE IF EXISTS dv_user; +CREATE TABLE dv_user ( + id bigserial, + username varchar(255) NOT NULL, + password varchar(255) NOT NULL, + email varchar(255) NOT NULL, + phone varchar(127) DEFAULT NULL, + admin smallint NOT NULL DEFAULT '0', + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (username) +); + +-- ---------------------------- +-- Table structure for dv_user_workspace +-- ---------------------------- +DROP TABLE IF EXISTS dv_user_workspace; +CREATE TABLE dv_user_workspace ( + id bigserial, + user_id bigint NOT NULL, + workspace_id bigint NOT NULL, + role_id bigint DEFAULT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +-- ---------------------------- +-- Table structure for dv_workspace +-- ---------------------------- +DROP TABLE IF EXISTS dv_workspace; +CREATE TABLE dv_workspace ( + id bigserial, + name varchar(255) NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE (name) +); + +-- ---------------------------- +-- Table structure for dv_config +-- ---------------------------- +DROP TABLE IF EXISTS dv_config; +CREATE TABLE dv_config ( + id bigserial, + workspace_id bigint NOT NULL, + var_key varchar(255) NOT NULL, + var_value text NOT NULL, + is_default smallint NOT NULL, + create_by bigint NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_by bigint NOT NULL, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +INSERT INTO dv_config VALUES ('1', '-1', 'data.quality.jar.name', '/libs/datavines-engine-spark-core-1.0.0-SNAPSHOT.jar', '1', '1', '2023-09-02 16:52:56', '1', '2023-09-03 09:56:12'); +INSERT INTO dv_config VALUES ('2', '-1', 'yarn.mode', 'standalone', '1', '1', '2023-09-02 18:28:59', '1', '2023-09-03 12:46:24'); +INSERT INTO dv_config VALUES ('3', '-1', 'yarn.application.status.address', 'http://%s:%s/ws/v1/cluster/apps/%s', '1', '1', '2023-09-03 09:57:01', '1', '2023-09-03 09:57:01'); +INSERT INTO dv_config VALUES ('4', '-1', 'yarn.resource.manager.http.address.port', '8088', '1', '1', '2023-09-03 09:57:34', '1', '2023-09-03 09:57:34'); +INSERT INTO dv_config VALUES ('5', '-1', 'yarn.resource.manager.ha.ids', '192.168.0.x,192.168.0.x', '1', '1', '2023-09-03 09:58:17', '1', '2023-09-03 09:58:17'); +INSERT INTO dv_config VALUES ('7', '-1', 'max.cpu.load.avg', '10', '1', '1', '2023-09-03 09:59:06', '1', '2023-09-03 09:59:06'); +INSERT INTO dv_config VALUES ('8', '-1', 'reserved.memory', '0.3f', '1', '1', '2023-09-03 09:59:28', '1', '2023-09-03 09:59:28'); +INSERT INTO dv_config VALUES ('9', '-1', 'file.max.length', '10000000', '1', '1', '2023-09-03 14:57:33', '1', '2023-09-03 14:57:33'); +INSERT INTO dv_config VALUES ('10', '-1', 'error.data.dir', '/tmp/datavines/error-data', '1', '1', '2023-09-03 14:58:01', '1', '2023-09-03 14:58:01'); +INSERT INTO dv_config VALUES ('11', '-1', 'validate.result.data.dir', '/tmp/datavines/validate-result-data', '1', '1', '2023-09-03 14:58:29', '1', '2023-09-03 14:58:29'); +INSERT INTO dv_config VALUES ('12', '-1', 'local.execution.threshold', '1000', '1', '1', '2023-09-03 15:02:38', '1', '2023-09-03 15:02:38'); +INSERT INTO dv_config VALUES ('13', '-1', 'spark.execution.threshold', '1000', '1', '1', '2023-09-03 15:02:38', '1', '2023-09-03 15:02:38'); +INSERT INTO dv_config VALUES ('14', '-1', 'livy.uri', 'http://localhost:8998/batches', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('15', '-1', 'livy.task.appId.retry.count', '3', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('16', '-1', 'livy.need.kerberos', 'false', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('17', '-1', 'livy.server.auth.kerberos.principal', 'livy/kerberos.principal', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('18', '-1', 'livy.server.auth.kerberos.keytab', '/path/to/livy/keytab/file', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('19', '-1', 'livy.task.proxyUser', 'root', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('20', '-1', 'livy.task.jar.lib.path', 'hdfs:///datavines/lib', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('21', '-1', 'livy.execution.threshold', '1000', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('22', '-1', 'livy.task.jars', CONCAT('datavines-common-1.0.0-SNAPSHOT.jar,datavines-spi-1.0.0-SNAPSHOT.jar,' + 'datavines-engine-spark-api-1.0.0-SNAPSHOT.jar,datavines-engine-spark-connector-jdbc-1.0.0-SNAPSHOT.jar,' + 'datavines-engine-core-1.0.0-SNAPSHOT.jar,datavines-engine-common-1.0.0-SNAPSHOT.jar,datavines-engine-spark-transform-sql-1.0.0-SNAPSHOT.jar,' + 'datavines-engine-api-1.0.0-SNAPSHOT.jar,mysql-connector-j-8.4.0.jar,httpclient-4.4.1.jar,' + 'httpcore-4.4.1.jar,postgresql-42.2.6.jar,presto-jdbc-0.283.jar,trino-jdbc-407.jar,clickhouse-jdbc-0.1.53.jar,' + 'mongo-java-driver-3.9.0.jar,mongo-spark-connector_2.11-2.4.0.jar,datavines-engine-spark-connector-mongodb-1.0.0-SNAPSHOT.jar'), + '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('23', '-1', 'profile.execute.engine', 'local', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('24', '-1', 'spark.engine.parameter.deploy.mode', 'cluster', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('25', '-1', 'spark.engine.parameter.num.executors', '1', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('26', '-1', 'spark.engine.parameter.driver.cores', '1', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('27', '-1', 'spark.engine.parameter.driver.memory', '512M', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('28', '-1', 'spark.engine.parameter.executor.cores', '1', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('29', '-1', 'spark.engine.parameter.executor.memory', '512M', '1', '1', '2023-09-05 21:02:38', '1', '2023-09-05 21:02:38'); +INSERT INTO dv_config VALUES ('30', '-1', 'datavines.fqdn', 'http://127.0.0.1:5600', '1', '1', '2024-05-21 15:15:38', '1', '2024-05-21 15:15:38'); +INSERT INTO dv_config VALUES ('31', '-1', 'data.quality.flink.jar.name', '/libs/datavines-engine-flink-core-1.0.0-SNAPSHOT.jar', '1', '1', '2025-02-02 11:43:04', '1', '2025-02-02 11:43:04'); + +INSERT INTO dv_user (id, username, password, email, phone, admin) VALUES ('1', 'admin', '$2a$10$9ZcicUYFl/.knBi9SE53U.Nml8bfNeArxr35HQshxXzimbA6Ipgqq', 'admin@gmail.com', NULL, '0'); +INSERT INTO dv_workspace (id, name, create_by, update_by) VALUES ('1', 'admin''s default', '1', '1'); +INSERT INTO dv_user_workspace (id, user_id, workspace_id, role_id,create_by,update_by) VALUES ('1', '1', '1', '1','1', '1'); + From ef8ce7c4dce83816582b0f19c3ad545446387ef9 Mon Sep 17 00:00:00 2001 From: xxzuo <1293378490@qq.com> Date: Wed, 29 Apr 2026 01:11:21 +0800 Subject: [PATCH 2/3] [Feature][Server] support postgresql registry --- bin/datavines-daemon.sh | 14 ++-- .../server/api/config/MybatisPlusConfig.java | 13 ++++ .../repository/mapper/JobExecutionMapper.java | 16 ++--- .../mapper/JobQualityReportMapper.java | 40 +++++++++++ .../service/impl/JobExecutionServiceImpl.java | 1 - .../impl/JobQualityReportServiceImpl.java | 69 ++++++++++--------- .../resources/mapper/JobExecutionMapper.xml | 65 +++++++++++++++-- scripts/sql/datavines-postgresql.sql | 5 +- 8 files changed, 163 insertions(+), 60 deletions(-) diff --git a/bin/datavines-daemon.sh b/bin/datavines-daemon.sh index 6b2b645eb..45a7a6e29 100644 --- a/bin/datavines-daemon.sh +++ b/bin/datavines-daemon.sh @@ -16,7 +16,7 @@ # limitations under the License. # -usage="Usage: datavines-daemon.sh (start|start_container|start_with_jmx|stop|restart_with_jmx|status) <''|mysql>" +usage="Usage: datavines-daemon.sh (start|start_container|start_with_jmx|stop|restart_with_jmx|status) <''|postgres|mysql>" # if no args specified, show usage if [ $# -le 0 ]; then @@ -32,12 +32,12 @@ shift springProfileActive= if [ -n "$profile" ]; then - if [ "$profile" = "mysql" ]; then - springProfileActive="-Dspring.profiles.active=mysql" - else - echo "Error: No profile named \`$profile' was found." - exit 1 - fi + if [ "$profile" = "postgres" ] || [ "$profile" = "mysql" ]; then + springProfileActive="-Dspring.profiles.active=$profile" + else + echo "Error: No profile named \`$profile' was found." + exit 1 + fi fi echo "Begin $startStop DataVinesServer $profile......" diff --git a/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java b/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java index 3922590be..191553719 100644 --- a/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java +++ b/datavines-server/src/main/java/io/datavines/server/api/config/MybatisPlusConfig.java @@ -20,10 +20,13 @@ import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor; import com.zaxxer.hikari.HikariDataSource; +import org.apache.ibatis.mapping.DatabaseIdProvider; +import org.apache.ibatis.mapping.VendorDatabaseIdProvider; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.sql.DataSource; +import java.util.Properties; @Configuration public class MybatisPlusConfig { @@ -35,6 +38,16 @@ public MybatisPlusInterceptor mybatisPlusInterceptor(DataSource dataSource) { return interceptor; } + @Bean + public DatabaseIdProvider databaseIdProvider() { + VendorDatabaseIdProvider databaseIdProvider = new VendorDatabaseIdProvider(); + Properties properties = new Properties(); + properties.setProperty("MySQL", "mysql"); + properties.setProperty("PostgreSQL", "postgresql"); + databaseIdProvider.setProperties(properties); + return databaseIdProvider; + } + private DbType resolveDbType(DataSource dataSource) { if (dataSource instanceof HikariDataSource) { HikariDataSource hikariDataSource = (HikariDataSource) dataSource; diff --git a/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobExecutionMapper.java b/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobExecutionMapper.java index 5dc196100..49c19708d 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobExecutionMapper.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobExecutionMapper.java @@ -39,14 +39,14 @@ public interface JobExecutionMapper extends BaseMapper { IPage getJobExecutionPage(Page page, @Param("searchVal") String searchVal, @Param("jobId") Long jobId, - @Param("datasourceId") Long datasourceId, - @Param("status") Integer status, - @Param("metricType") String metricType, @Param("schemaName") String schemaName, - @Param("tableName") String tableName, @Param("columnName") String columnName, - @Param("startTime") String startTime, @Param("endTime") String endTime, - @Param("schemaSearch") String schemaSearch, - @Param("tableSearch") String tableSearch, - @Param("columnSearch") String columnSearch); + @Param("datasourceId") Long datasourceId, + @Param("status") Integer status, + @Param("metricType") String metricType, @Param("schemaName") String schemaName, + @Param("tableName") String tableName, @Param("columnName") String columnName, + @Param("startTime") String startTime, @Param("endTime") String endTime, + @Param("schemaSearch") String schemaSearch, + @Param("tableSearch") String tableSearch, + @Param("columnSearch") String columnSearch); List getJobExecutionAggPie(@Param("datasourceId") Long datasourceId, @Param("metricType") String metricType, @Param("schemaName") String schemaName, diff --git a/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobQualityReportMapper.java b/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobQualityReportMapper.java index ff26b5afe..e9a5740fa 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobQualityReportMapper.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/mapper/JobQualityReportMapper.java @@ -17,6 +17,8 @@ package io.datavines.server.repository.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import io.datavines.server.repository.entity.JobQualityReport; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; @@ -27,21 +29,59 @@ @Mapper public interface JobQualityReportMapper extends BaseMapper { + @Select(value = "SELECT datasource_id, database_name, table_name, '--' as column_name, avg(score) as score, CAST(#{reportDate} AS DATE) as report_date, 'table' as entity_level" + + " from dv_job_quality_report where report_date = CAST(#{reportDate} AS DATE) and datasource_id = #{datasourceId} and entity_level = 'column'" + + " group by datasource_id, database_name, table_name", databaseId = "postgresql") + @Select(value = "SELECT datasource_id, database_name, table_name, '--' as column_name, avg(score) as score, #{reportDate} as report_date, 'table' as entity_level" + + " from dv_job_quality_report where report_date = #{reportDate} and datasource_id = #{datasourceId} and entity_level = 'column'" + + " group by datasource_id, database_name, table_name", databaseId = "mysql") @Select("SELECT datasource_id, database_name, table_name, '--' as column_name, avg(score) as score, #{reportDate} as report_date, 'table' as entity_level" + " from dv_job_quality_report where report_date = #{reportDate} and datasource_id = #{datasourceId} and entity_level = 'column'" + " group by datasource_id, database_name, table_name") List listTableScoreGroupByDatasource(@Param("datasourceId") Long datasourceId, @Param("reportDate") String reportDate); + @Select(value = "SELECT datasource_id, database_name, '--' as table_name, '--' as column_name, avg(score) as score, CAST(#{reportDate} AS DATE) as report_date, 'database' as entity_level" + + " from dv_job_quality_report where report_date = CAST(#{reportDate} AS DATE) and datasource_id = #{datasourceId} and entity_level = 'table'" + + " group by datasource_id, database_name", databaseId = "postgresql") + @Select(value = "SELECT datasource_id, database_name, '--' as table_name, '--' as column_name, avg(score) as score, #{reportDate} as report_date, 'database' as entity_level" + + " from dv_job_quality_report where report_date = #{reportDate} and datasource_id = #{datasourceId} and entity_level = 'table'" + + " group by datasource_id, database_name", databaseId = "mysql") @Select("SELECT datasource_id, database_name, '--' as table_name, '--' as column_name, avg(score) as score, #{reportDate} as report_date, 'database' as entity_level" + " from dv_job_quality_report where report_date = #{reportDate} and datasource_id = #{datasourceId} and entity_level = 'table'" + " group by datasource_id, database_name") List listDbScoreGroupByDatasource(@Param("datasourceId") Long datasourceId, @Param("reportDate") String reportDate); + @Select(value = "SELECT datasource_id, '--' as database_name, '--' as table_name, '--' as column_name, avg(score) as score, CAST(#{reportDate} AS DATE) as report_date, 'datasource' as entity_level" + + " from dv_job_quality_report where report_date = CAST(#{reportDate} AS DATE) and datasource_id = #{datasourceId} and entity_level = 'database'" + + " group by datasource_id", databaseId = "postgresql") + @Select(value = "SELECT datasource_id, '--' as database_name, '--' as table_name, '--' as column_name, avg(score) as score, #{reportDate} as report_date, 'datasource' as entity_level" + + " from dv_job_quality_report where report_date = #{reportDate} and datasource_id = #{datasourceId} and entity_level = 'database'" + + " group by datasource_id", databaseId = "mysql") @Select("SELECT datasource_id, '--' as database_name, '--' as table_name, '--' as column_name, avg(score) as score, #{reportDate} as report_date, 'datasource' as entity_level" + " from dv_job_quality_report where report_date = #{reportDate} and datasource_id = #{datasourceId} and entity_level = 'database'" + " group by datasource_id") List listDatasourceScoreGroupByDatasource(@Param("datasourceId") Long datasourceId, @Param("reportDate") String reportDate); + + List listScoreByCondition(@Param("datasourceId") Long datasourceId, + @Param("entityLevel") String entityLevel, + @Param("schemaName") String schemaName, + @Param("tableName") String tableName, + @Param("reportDate") String reportDate); + + List listScoreTrendByCondition(@Param("datasourceId") Long datasourceId, + @Param("entityLevel") String entityLevel, + @Param("schemaName") String schemaName, + @Param("tableName") String tableName, + @Param("startDate") String startDate, + @Param("endDate") String endDate); + + IPage getQualityReportPage(Page page, + @Param("datasourceId") Long datasourceId, + @Param("schemaName") String schemaName, + @Param("tableName") String tableName, + @Param("reportDate") String reportDate, + @Param("entityLevel") String entityLevel); } diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExecutionServiceImpl.java b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExecutionServiceImpl.java index 37fb35f63..f14ffcc62 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExecutionServiceImpl.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobExecutionServiceImpl.java @@ -367,7 +367,6 @@ public List getJobExecutionAggPie(JobExecutionDashboardPara } startDateStr += " 00:00:00"; endDateStr += " 23:59:59"; - List items = baseMapper.getJobExecutionAggPie(dashboardParam.getDatasourceId(), dashboardParam.getMetricType(), dashboardParam.getSchemaName(), dashboardParam.getTableName(), dashboardParam.getColumnName(), diff --git a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobQualityReportServiceImpl.java b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobQualityReportServiceImpl.java index ec1862bec..ea860e8b0 100644 --- a/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobQualityReportServiceImpl.java +++ b/datavines-server/src/main/java/io/datavines/server/repository/service/impl/JobQualityReportServiceImpl.java @@ -225,12 +225,10 @@ public boolean generateQualityReport(Long datasourceId) { @Override public JobQualityReportScore getScoreByCondition(JobQualityReportDashboardParam dashboardParam) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); if (dashboardParam == null) { throw new DataVinesException("param can not be null"); } - queryWrapper.eq(JobQualityReport::getDatasourceId, dashboardParam.getDatasourceId()); String entityLevel = DATASOURCE; if (StringUtils.isNotEmpty(dashboardParam.getSchemaName())) { @@ -243,29 +241,31 @@ public JobQualityReportScore getScoreByCondition(JobQualityReportDashboardParam switch (entityLevel) { case DATASOURCE: - queryWrapper.eq(JobQualityReport::getEntityLevel, DATASOURCE); + entityLevel = DATASOURCE; break; case DATABASE: - queryWrapper.eq(JobQualityReport::getEntityLevel, DATABASE); - queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getSchemaName()), JobQualityReport::getDatabaseName, dashboardParam.getSchemaName()); + entityLevel = DATABASE; break; case TABLE: - queryWrapper.eq(JobQualityReport::getEntityLevel, TABLE); - queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getSchemaName()), JobQualityReport::getDatabaseName, dashboardParam.getSchemaName()); - queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getTableName()), JobQualityReport::getTableName, dashboardParam.getTableName()); + entityLevel = TABLE; break; default: break; } + String reportDate; if (StringUtils.isEmpty(dashboardParam.getReportDate())) { - String yesterday = DateUtils.format(DateUtils.addDays(DateUtils.getCurrentDate(),-1),DateUtils.YYYY_MM_DD); - queryWrapper.eq(JobQualityReport::getReportDate, yesterday); + reportDate = DateUtils.format(DateUtils.addDays(DateUtils.getCurrentDate(),-1),DateUtils.YYYY_MM_DD); } else { - queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getReportDate()), JobQualityReport::getReportDate, dashboardParam.getReportDate()); + reportDate = dashboardParam.getReportDate(); } - List jobQualityReports = jobQualityReportMapper.selectList(queryWrapper); + List jobQualityReports = jobQualityReportMapper.listScoreByCondition( + dashboardParam.getDatasourceId(), + entityLevel, + dashboardParam.getSchemaName(), + dashboardParam.getTableName(), + reportDate); if (CollectionUtils.isEmpty(jobQualityReports)) { return null; } @@ -312,9 +312,6 @@ public JobQualityReportScoreTrend getScoreTrendByCondition(JobQualityReportDashb currentDate = currentDate.plusDays(1); } - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(JobQualityReport::getDatasourceId, dashboardParam.getDatasourceId()); - String entityLevel = DATASOURCE; if (StringUtils.isNotEmpty(dashboardParam.getSchemaName())) { @@ -327,24 +324,25 @@ public JobQualityReportScoreTrend getScoreTrendByCondition(JobQualityReportDashb switch (entityLevel) { case DATASOURCE: - queryWrapper.eq(JobQualityReport::getEntityLevel, DATASOURCE); + entityLevel = DATASOURCE; break; case DATABASE: - queryWrapper.eq(JobQualityReport::getEntityLevel, DATABASE); - queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getSchemaName()), JobQualityReport::getDatabaseName, dashboardParam.getSchemaName()); + entityLevel = DATABASE; break; case TABLE: - queryWrapper.eq(JobQualityReport::getEntityLevel, TABLE); - queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getSchemaName()), JobQualityReport::getDatabaseName, dashboardParam.getSchemaName()); - queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getTableName()), JobQualityReport::getTableName, dashboardParam.getTableName()); + entityLevel = TABLE; break; default: break; } - queryWrapper.between(JobQualityReport::getReportDate, startDateStr, endDateStr); - queryWrapper.orderByAsc(JobQualityReport::getReportDate); - List reportList = list(queryWrapper); + List reportList = jobQualityReportMapper.listScoreTrendByCondition( + dashboardParam.getDatasourceId(), + entityLevel, + dashboardParam.getSchemaName(), + dashboardParam.getTableName(), + startDateStr, + endDateStr); Map date2Score = new HashMap<>(); if (CollectionUtils.isNotEmpty(reportList)) { @@ -375,24 +373,27 @@ public JobQualityReportScoreTrend getScoreTrendByCondition(JobQualityReportDashb @Override public IPage getQualityReportPage(JobQualityReportDashboardParam dashboardParam) { Page page = new Page<>(dashboardParam.getPageNumber(), dashboardParam.getPageSize()); - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(JobQualityReport::getDatasourceId, dashboardParam.getDatasourceId()); - queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getSchemaName()),JobQualityReport::getDatabaseName,dashboardParam.getSchemaName()); - queryWrapper.eq(StringUtils.isNotEmpty(dashboardParam.getTableName()),JobQualityReport::getTableName,dashboardParam.getTableName()); + String reportDate; if (StringUtils.isEmpty(dashboardParam.getReportDate())) { - String yesterday = DateUtils.format(DateUtils.addDays(DateUtils.getCurrentDate(),-1),DateUtils.YYYY_MM_DD); - queryWrapper.eq(JobQualityReport::getReportDate, yesterday); + reportDate = DateUtils.format(DateUtils.addDays(DateUtils.getCurrentDate(),-1),DateUtils.YYYY_MM_DD); } else { - queryWrapper.eq(JobQualityReport::getReportDate, dashboardParam.getReportDate()); + reportDate = dashboardParam.getReportDate(); } + String entityLevel; if (StringUtils.isNotEmpty(dashboardParam.getTableName())) { - queryWrapper.eq(JobQualityReport::getEntityLevel, COLUMN); + entityLevel = COLUMN; } else { - queryWrapper.eq(JobQualityReport::getEntityLevel, TABLE); + entityLevel = TABLE; } - return page(page, queryWrapper).convert(jobQualityReport -> { + return jobQualityReportMapper.getQualityReportPage( + page, + dashboardParam.getDatasourceId(), + dashboardParam.getSchemaName(), + dashboardParam.getTableName(), + reportDate, + entityLevel).convert(jobQualityReport -> { JobQualityReportVO jobQualityReportVO = new JobQualityReportVO(); BeanUtils.copyProperties(jobQualityReport, jobQualityReportVO); return jobQualityReportVO; diff --git a/datavines-server/src/main/resources/mapper/JobExecutionMapper.xml b/datavines-server/src/main/resources/mapper/JobExecutionMapper.xml index c23d66f78..190226766 100644 --- a/datavines-server/src/main/resources/mapper/JobExecutionMapper.xml +++ b/datavines-server/src/main/resources/mapper/JobExecutionMapper.xml @@ -39,10 +39,24 @@ and column_name = #{columnName} - and update_time >= #{startTime} + + + and update_time >= CAST(#{startTime} AS timestamp) + + + and update_time >= #{startTime} + + - and update_time <= #{endTime} + + + and update_time <= CAST(#{endTime} AS timestamp) + + + and update_time <= #{endTime} + + and datasource_id = #{datasourceId} @@ -70,10 +84,24 @@ and column_name = #{columnName} - and update_time >= #{startTime} + + + and update_time >= CAST(#{startTime} AS timestamp) + + + and update_time >= #{startTime} + + - and update_time <= #{endTime} + + + and update_time <= CAST(#{endTime} AS timestamp) + + + and update_time <= #{endTime} + + and datasource_id = #{datasourceId} @@ -82,7 +110,16 @@ - select TO_CHAR(create_time, 'YYYY-MM-DD') AS create_date, status from dv_job_execution + select + + + TO_CHAR(create_time, 'YYYY-MM-DD') + + + DATE_FORMAT(create_time, '%Y-%m-%d') + + + AS create_date, status from dv_job_execution and metric_type = #{metricType} @@ -97,10 +134,24 @@ and column_name = #{columnName} - and create_time >= #{startTime} + + + and create_time >= CAST(#{startTime} AS timestamp) + + + and create_time >= #{startTime} + + - and create_time <= #{endTime} + + + and create_time <= CAST(#{endTime} AS timestamp) + + + and create_time <= #{endTime} + + and datasource_id = #{datasourceId} diff --git a/scripts/sql/datavines-postgresql.sql b/scripts/sql/datavines-postgresql.sql index 0db97cf67..4d9a4300a 100644 --- a/scripts/sql/datavines-postgresql.sql +++ b/scripts/sql/datavines-postgresql.sql @@ -383,7 +383,7 @@ CREATE TABLE dv_common_task_schedule ( param text, datasource_id bigint NOT NULL, cron_expression varchar(255) DEFAULT NULL, - status smallint DEFAULT NULL, + status boolean DEFAULT NULL, start_time timestamp DEFAULT NULL, end_time timestamp DEFAULT NULL, create_by bigint NOT NULL, @@ -663,7 +663,7 @@ CREATE TABLE dv_job_schedule ( param text, job_id bigint NOT NULL, cron_expression varchar(255) DEFAULT NULL, - status smallint DEFAULT NULL, + status boolean DEFAULT NULL, start_time timestamp DEFAULT NULL, end_time timestamp DEFAULT NULL, create_by bigint NOT NULL, @@ -889,4 +889,3 @@ INSERT INTO dv_config VALUES ('31', '-1', 'data.quality.flink.jar.name', '/libs/ INSERT INTO dv_user (id, username, password, email, phone, admin) VALUES ('1', 'admin', '$2a$10$9ZcicUYFl/.knBi9SE53U.Nml8bfNeArxr35HQshxXzimbA6Ipgqq', 'admin@gmail.com', NULL, '0'); INSERT INTO dv_workspace (id, name, create_by, update_by) VALUES ('1', 'admin''s default', '1', '1'); INSERT INTO dv_user_workspace (id, user_id, workspace_id, role_id,create_by,update_by) VALUES ('1', '1', '1', '1','1', '1'); - From 4a0b8c84de8892e194dff959f1993d70a0233808 Mon Sep 17 00:00:00 2001 From: xxzuo <1293378490@qq.com> Date: Wed, 29 Apr 2026 01:11:57 +0800 Subject: [PATCH 3/3] [Feature][Server] support postgresql registry --- .../mapper/JobQualityReportMapper.xml | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 datavines-server/src/main/resources/mapper/JobQualityReportMapper.xml diff --git a/datavines-server/src/main/resources/mapper/JobQualityReportMapper.xml b/datavines-server/src/main/resources/mapper/JobQualityReportMapper.xml new file mode 100644 index 000000000..202b030c1 --- /dev/null +++ b/datavines-server/src/main/resources/mapper/JobQualityReportMapper.xml @@ -0,0 +1,119 @@ + + + + + + + id, + datasource_id, + entity_level, + database_name, + table_name, + column_name, + score, + report_date, + create_time, + update_time + + + + + datasource_id = #{datasourceId} + and entity_level = #{entityLevel} + + and database_name = #{schemaName} + + + and table_name = #{tableName} + + + + + + + + and report_date = CAST(#{reportDate} AS date) + + + and report_date = #{reportDate} + + + + + + + + and report_date between CAST(#{startDate} AS date) and CAST(#{endDate} AS date) + + + and report_date between #{startDate} and #{endDate} + + + + + + + + + +