Skip to content

Commit 826bca7

Browse files
authored
Merge pull request #3131 from actiontech/new-rule-2.2306.x
New rule 2.2306.x
2 parents 3f322af + 2a8f0a0 commit 826bca7

File tree

3 files changed

+270
-25
lines changed

3 files changed

+270
-25
lines changed

sqle/driver/mysql/audit_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ func runDefaultRulesInspectCase(t *testing.T, desc string, i *MysqlDriverImpl, s
129129
rulepkg.DMLCheckAlias: {},
130130
rulepkg.DMLCheckAffectedRows: {},
131131
rulepkg.DMLCheckSortColumnLength: {},
132-
rulepkg.DDLCheckAllIndexNotNullConstraint: {},
132+
rulepkg.DDLCheckAllIndexNotNullConstraint: {},
133+
rulepkg.DDLCheckTransactionNotCommitted: {},
133134
}
134135
for i := range rulepkg.RuleHandlers {
135136
handler := rulepkg.RuleHandlers[i]
@@ -5330,7 +5331,6 @@ func TestDDLCheckAllIndexNotNullConstraint(t *testing.T) {
53305331
)
53315332
}
53325333

5333-
53345334
func TestDMLCheckSameTableJoinedMultipleTimes(t *testing.T) {
53355335
rule := rulepkg.RuleHandlerMap[rulepkg.DMLCheckSameTableJoinedMultipleTimes].Rule
53365336

sqle/driver/mysql/rule/rule.go

Lines changed: 151 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ const (
4646

4747
// inspector DDL rules
4848
const (
49+
DDLCheckTransactionNotCommitted = "ddl_check_transactions_not_committed"
4950
DDLCheckPKWithoutIfNotExists = "ddl_check_table_without_if_not_exists"
5051
DDLCheckObjectNameLength = "ddl_check_object_name_length"
5152
DDLCheckObjectNameUsingKeyword = "ddl_check_object_name_using_keyword"
@@ -432,6 +433,18 @@ var RuleHandlers = []RuleHandler{
432433
},
433434

434435
// rule
436+
{
437+
Rule: driverV2.Rule{
438+
Name: DDLCheckTransactionNotCommitted,
439+
Desc: "DDL执行前存在事务未提交",
440+
Annotation: "检查可能产生锁冲突的DDL操作执行前是否存在事务未提交,MySQL8之前版本会查询information_schema.innodb_trx所有记录,MySQL8版本会查询performance_schema.data_locks相关记录",
441+
Level: driverV2.RuleLevelWarn,
442+
Category: RuleTypeUsageSuggestion,
443+
},
444+
Message: "DDL执行前存在事务未提交, %s",
445+
AllowOffline: false,
446+
Func: checkTransactionNotCommittedBeforeDDL,
447+
},
435448
{
436449
Rule: driverV2.Rule{
437450
Name: DDLCheckPKWithoutIfNotExists,
@@ -5909,7 +5922,7 @@ func getTableNameWithSchema(stmt *ast.TableName, c *session.Context) string {
59095922
} else {
59105923
tableWithSchema = fmt.Sprintf("`%s`.`%s`", stmt.Schema, stmt.Name)
59115924
}
5912-
5925+
59135926
if c.IsLowerCaseTableName() {
59145927
tableWithSchema = strings.ToLower(tableWithSchema)
59155928
}
@@ -5919,7 +5932,7 @@ func getTableNameWithSchema(stmt *ast.TableName, c *session.Context) string {
59195932

59205933
func checkSameTableJoinedMultipleTimes(input *RuleHandlerInput) error {
59215934
var repeatTables []string
5922-
5935+
59235936
if _, ok := input.Node.(ast.DMLNode); ok {
59245937
selectVisitor := &util.SelectVisitor{}
59255938
input.Node.Accept(selectVisitor)
@@ -5973,3 +5986,139 @@ func checkAllIndexNotNullConstraint(input *RuleHandlerInput) error {
59735986
}
59745987
return nil
59755988
}
5989+
5990+
type checkTransactionNotCommittedBeforeDDLTableInfo struct {
5991+
Schema string
5992+
Table string
5993+
}
5994+
5995+
func (c checkTransactionNotCommittedBeforeDDLTableInfo) String() string {
5996+
if c.Table == "" {
5997+
return c.Schema
5998+
}
5999+
if c.Schema == "" {
6000+
return c.Table
6001+
}
6002+
return fmt.Sprintf("%s.%s", c.Schema, c.Table)
6003+
}
6004+
6005+
func checkTransactionNotCommittedBeforeDDL(input *RuleHandlerInput) error {
6006+
// 跳过非DDL语句
6007+
switch input.Node.(type) {
6008+
case ast.DDLNode:
6009+
default:
6010+
return nil
6011+
}
6012+
6013+
tables := extractDDLSchemaAndTable(input.Node)
6014+
if len(tables) == 0 {
6015+
return nil
6016+
}
6017+
6018+
const transactionExecSecs = 600
6019+
6020+
// 首先检测MySQL版本
6021+
version, err := input.Ctx.GetMySQLMajorVersion()
6022+
if err != nil {
6023+
return err
6024+
}
6025+
6026+
for _, table := range tables {
6027+
if table.Schema == "" {
6028+
table.Schema = input.Ctx.CurrentSchema()
6029+
}
6030+
if table.Schema == "" {
6031+
// 如果没有提取到schema,跳过检查
6032+
continue
6033+
}
6034+
6035+
if input.Ctx.IsLowerCaseTableName() {
6036+
table.Schema, table.Table = strings.ToLower(table.Schema), strings.ToLower(table.Table)
6037+
}
6038+
6039+
// 根据版本选择不同的策略
6040+
if version >= 8 {
6041+
// MySQL 8
6042+
count, err := input.Ctx.CheckTableRelatedTransactionNotCommittedMySQL8(table.Schema, table.Table)
6043+
if err != nil {
6044+
return err
6045+
}
6046+
if count > 0 {
6047+
addResult(input.Res, input.Rule, input.Rule.Name,
6048+
fmt.Sprintf("performance_schema.data_locks存在%d条%s的相关记录", count, table))
6049+
return nil
6050+
}
6051+
} else {
6052+
// MySQL 5
6053+
count, ExecTimeoutCount, err := input.Ctx.CheckTransactionNotCommittedMySQL5(transactionExecSecs)
6054+
if err != nil {
6055+
return err
6056+
}
6057+
if count > 0 || ExecTimeoutCount > 0 {
6058+
addResult(input.Res, input.Rule, input.Rule.Name,
6059+
fmt.Sprintf("information_schema.innodb_trx存在%d条记录, %d条执行时长超过%d秒", count, ExecTimeoutCount, transactionExecSecs))
6060+
return nil
6061+
}
6062+
}
6063+
}
6064+
6065+
return nil
6066+
}
6067+
6068+
// extractDDLSchemaAndTable 从DDL语句中提取schema和table name
6069+
func extractDDLSchemaAndTable(node ast.Node) []checkTransactionNotCommittedBeforeDDLTableInfo {
6070+
var tables []checkTransactionNotCommittedBeforeDDLTableInfo
6071+
6072+
switch stmt := node.(type) {
6073+
case *ast.CreateTableStmt, *ast.CreateIndexStmt, *ast.CreateViewStmt, *ast.CreateSequenceStmt, *ast.CreateDatabaseStmt:
6074+
// 跳过CREATE类型的DDL
6075+
return nil
6076+
case *ast.AlterTableStmt:
6077+
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
6078+
Schema: stmt.Table.Schema.O,
6079+
Table: stmt.Table.Name.O,
6080+
})
6081+
case *ast.DropTableStmt:
6082+
for _, table := range stmt.Tables {
6083+
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
6084+
Schema: table.Schema.O,
6085+
Table: table.Name.O,
6086+
})
6087+
}
6088+
case *ast.DropIndexStmt:
6089+
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
6090+
Schema: stmt.Table.Schema.O,
6091+
Table: stmt.Table.Name.O,
6092+
})
6093+
case *ast.DropSequenceStmt:
6094+
for _, sequence := range stmt.Sequences {
6095+
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
6096+
Schema: sequence.Schema.O,
6097+
Table: sequence.Name.L,
6098+
})
6099+
}
6100+
case *ast.RenameTableStmt:
6101+
// 对于RENAME TABLE,检查旧表
6102+
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
6103+
Schema: stmt.OldTable.Schema.O,
6104+
Table: stmt.OldTable.Name.O,
6105+
})
6106+
case *ast.TruncateTableStmt:
6107+
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
6108+
Schema: stmt.Table.Schema.O,
6109+
Table: stmt.Table.Name.O,
6110+
})
6111+
case *ast.RepairTableStmt:
6112+
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
6113+
Schema: stmt.Table.Schema.O,
6114+
Table: stmt.Table.Name.O,
6115+
})
6116+
case *ast.DropDatabaseStmt:
6117+
// 数据库级别的DDL,返回数据库名作为schema
6118+
tables = append(tables, checkTransactionNotCommittedBeforeDDLTableInfo{
6119+
Schema: stmt.Name,
6120+
})
6121+
}
6122+
6123+
return tables
6124+
}

sqle/driver/mysql/session/context.go

Lines changed: 117 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package session
22

33
import (
4+
"database/sql"
45
"fmt"
56
"strconv"
67
"strings"
@@ -187,6 +188,35 @@ func (c *Context) IsLowerCaseTableName() bool {
187188
return lowerCaseTableNames != "0"
188189
}
189190

191+
// GetMySQLVersion 获取MySQL版本号
192+
func (c *Context) GetMySQLVersion() string {
193+
version, err := c.GetSystemVariable(SysVarVersion)
194+
if err != nil {
195+
log.NewEntry().Errorf("fail to load system variable version, error: %v", err)
196+
return ""
197+
}
198+
return version
199+
}
200+
201+
// GetMySQLMajorVersion 获取MySQL主版本号
202+
func (c *Context) GetMySQLMajorVersion() (int, error) {
203+
versionStr := c.GetMySQLVersion()
204+
if versionStr == "" {
205+
return 0, fmt.Errorf("unable to get MySQL version")
206+
}
207+
208+
// 解析版本号,例如 "8.0.33" -> 8
209+
versionParts := strings.Split(versionStr, ".")
210+
if len(versionParts) > 0 {
211+
majorVersion, err := strconv.Atoi(versionParts[0])
212+
if err == nil {
213+
return majorVersion, nil
214+
}
215+
}
216+
217+
return 0, fmt.Errorf("unable to parse MySQL version: %s", versionStr)
218+
}
219+
190220
func (c *Context) getSchema(schemaName string) (*SchemaInfo, bool) {
191221
if c.IsLowerCaseTableName() {
192222
schemaName = strings.ToLower(schemaName)
@@ -478,6 +508,7 @@ func (c *Context) IsTableExist(stmt *ast.TableName) (bool, error) {
478508

479509
const (
480510
SysVarLowerCaseTableNames = "lower_case_table_names"
511+
SysVarVersion = "version"
481512
)
482513

483514
// GetSystemVariable get system variable.
@@ -567,27 +598,31 @@ func (c *Context) GetCreateTableStmt(stmt *ast.TableName) (*ast.CreateTableStmt,
567598
/*
568599
建表语句可能如下:
569600
CREATE TABLE `__all_server_event_history` (
570-
`gmt_create` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
571-
`svr_ip` varchar(46) NOT NULL,
572-
`svr_port` bigint(20) NOT NULL,
573-
`module` varchar(64) NOT NULL,
574-
`event` varchar(64) NOT NULL,
575-
`name1` varchar(256) DEFAULT '',
576-
`value1` varchar(256) DEFAULT '',
577-
`name2` varchar(256) DEFAULT '',
578-
`value2` longtext DEFAULT NULL,
579-
`name3` varchar(256) DEFAULT '',
580-
`value3` varchar(256) DEFAULT '',
581-
`name4` varchar(256) DEFAULT '',
582-
`value4` varchar(256) DEFAULT '',
583-
`name5` varchar(256) DEFAULT '',
584-
`value5` varchar(256) DEFAULT '',
585-
`name6` varchar(256) DEFAULT '',
586-
`value6` varchar(256) DEFAULT '',
587-
`extra_info` varchar(512) DEFAULT '',
588-
PRIMARY KEY (`gmt_create`, `svr_ip`, `svr_port`)
601+
602+
`gmt_create` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
603+
`svr_ip` varchar(46) NOT NULL,
604+
`svr_port` bigint(20) NOT NULL,
605+
`module` varchar(64) NOT NULL,
606+
`event` varchar(64) NOT NULL,
607+
`name1` varchar(256) DEFAULT '',
608+
`value1` varchar(256) DEFAULT '',
609+
`name2` varchar(256) DEFAULT '',
610+
`value2` longtext DEFAULT NULL,
611+
`name3` varchar(256) DEFAULT '',
612+
`value3` varchar(256) DEFAULT '',
613+
`name4` varchar(256) DEFAULT '',
614+
`value4` varchar(256) DEFAULT '',
615+
`name5` varchar(256) DEFAULT '',
616+
`value5` varchar(256) DEFAULT '',
617+
`name6` varchar(256) DEFAULT '',
618+
`value6` varchar(256) DEFAULT '',
619+
`extra_info` varchar(512) DEFAULT '',
620+
PRIMARY KEY (`gmt_create`, `svr_ip`, `svr_port`)
621+
589622
) DEFAULT CHARSET = utf8mb4 ROW_FORMAT = COMPACT COMPRESSION = 'none' REPLICA_NUM = 1 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 10 TABLEGROUP = 'oceanbase'
590-
partition by key_v2(svr_ip, svr_port)
623+
624+
partition by key_v2(svr_ip, svr_port)
625+
591626
(partition p0,
592627
partition p1,
593628
partition p2,
@@ -606,7 +641,6 @@ partition p14,
606641
partition p15)
607642
608643
建表语句后半段是options,oceanbase mysql模式下的show create table结果返回的options中包含mysql不支持的options, 为了能解析, 方法将会倒着遍历建表语句, 每次找到右括号时截断后面的部分, 然后尝试解析一次, 直到解析成功, 此时剩余的建表语句将不在包含OB特有options
609-
610644
*/
611645
func (c *Context) parseCreateTableSqlCompatibly(createTableSql string) (*ast.CreateTableStmt, error) {
612646
for i := len(createTableSql) - 1; i >= 0; i-- {
@@ -889,3 +923,65 @@ func (c *Context) GetColumnCardinality(tn *ast.TableName, columnName string) (in
889923
func (c *Context) GetExecutor() *executor.Executor {
890924
return c.e
891925
}
926+
927+
func (c *Context) CheckTransactionNotCommittedMySQL5(execSecs int) (int, int, error) {
928+
var count, timeoutCount int
929+
query := `SELECT COUNT(*) FROM information_schema.innodb_trx`
930+
results, err := c.GetExecutor().Db.Query(query)
931+
if err != nil {
932+
return 0, 0, err
933+
}
934+
if len(results) > 0 && len(results[0]) > 0 {
935+
countStr := results[0]["COUNT(*)"]
936+
count, err = strconv.Atoi(countStr.String)
937+
if err != nil {
938+
return 0, 0, err
939+
}
940+
}
941+
942+
query = `SELECT COUNT(*) FROM information_schema.innodb_trx WHERE TIMESTAMPDIFF(SECOND, trx_started, NOW()) > ?`
943+
results, err = c.GetExecutor().Db.Query(query, execSecs)
944+
if err != nil {
945+
return 0, 0, err
946+
}
947+
if len(results) > 0 && len(results[0]) > 0 {
948+
countStr := results[0]["COUNT(*)"]
949+
timeoutCount, err = strconv.Atoi(countStr.String)
950+
if err != nil {
951+
return 0, 0, err
952+
}
953+
}
954+
955+
return count, timeoutCount, nil
956+
957+
}
958+
959+
func (c *Context) CheckTableRelatedTransactionNotCommittedMySQL8(schema, table string) (int, error) {
960+
if schema == "" {
961+
return 0, nil
962+
}
963+
964+
var results []map[string]sql.NullString
965+
var err error
966+
967+
if table == "" {
968+
query := `SELECT COUNT(*) FROM performance_schema.data_locks WHERE object_schema = ?`
969+
results, err = c.GetExecutor().Db.Query(query, schema)
970+
} else {
971+
query := `SELECT COUNT(*) FROM performance_schema.data_locks WHERE object_schema = ? AND object_name = ?`
972+
results, err = c.GetExecutor().Db.Query(query, schema, table)
973+
}
974+
975+
if err != nil {
976+
return 0, err
977+
}
978+
979+
if len(results) > 0 && len(results[0]) > 0 {
980+
countStr := results[0]["COUNT(*)"]
981+
if countStr.Valid {
982+
return strconv.Atoi(countStr.String)
983+
}
984+
}
985+
986+
return 0, nil
987+
}

0 commit comments

Comments
 (0)