diff --git a/storage/mysql/log_storage.go b/storage/mysql/log_storage.go index 2d57c7d4fd..045b803b07 100644 --- a/storage/mysql/log_storage.go +++ b/storage/mysql/log_storage.go @@ -82,10 +82,11 @@ const ( ) var ( - once sync.Once - queuedCounter monitoring.Counter - queuedDupCounter monitoring.Counter - dequeuedCounter monitoring.Counter + once sync.Once + queuedCounter monitoring.Counter + queuedDupCounter monitoring.Counter + dequeuedCounter monitoring.Counter + clearedStmtCacheCounter monitoring.Counter queueLatency monitoring.Histogram queueInsertLatency monitoring.Histogram @@ -101,6 +102,7 @@ func createMetrics(mf monitoring.MetricFactory) { queuedCounter = mf.NewCounter("mysql_queued_leaves", "Number of leaves queued", logIDLabel) queuedDupCounter = mf.NewCounter("mysql_queued_dup_leaves", "Number of duplicate leaves queued", logIDLabel) dequeuedCounter = mf.NewCounter("mysql_dequeued_leaves", "Number of leaves dequeued", logIDLabel) + clearedStmtCacheCounter = mf.NewCounter("mysql_cleared_prepared-statement_caches", "Number of times the prepared-statement cache has been cleared") queueLatency = mf.NewHistogram("mysql_queue_leaves_latency", "Latency of queue leaves operation in seconds", logIDLabel) queueInsertLatency = mf.NewHistogram("mysql_queue_leaves_latency_insert", "Latency of insertion part of queue leaves operation in seconds", logIDLabel) @@ -741,6 +743,7 @@ func (t *logTreeTX) getLeavesByHashInternal(ctx context.Context, leafHashes [][] args = append(args, t.treeID) rows, err := stx.QueryContext(ctx, args...) if err != nil { + t.ls.clearStmtCache() klog.Warningf("Query() %s hash = %v", desc, err) return nil, err } diff --git a/storage/mysql/tree_storage.go b/storage/mysql/tree_storage.go index ffb0159cd6..6cfda3ea5a 100644 --- a/storage/mysql/tree_storage.go +++ b/storage/mysql/tree_storage.go @@ -47,9 +47,9 @@ const ( n.TreeId = ? AND n.SubtreeRevision <= ? GROUP BY n.TreeId, n.SubtreeId ) AS x - INNER JOIN Subtree - ON Subtree.SubtreeId = x.SubtreeId - AND Subtree.SubtreeRevision = x.MaxRevision + INNER JOIN Subtree + ON Subtree.SubtreeId = x.SubtreeId + AND Subtree.SubtreeRevision = x.MaxRevision AND Subtree.TreeId = x.TreeId AND Subtree.TreeId = ?` placeholderSQL = "" @@ -104,6 +104,25 @@ func expandPlaceholderSQL(sql string, num int, first, rest string) string { return strings.Replace(sql, placeholderSQL, parameters, 1) } +// clearStmtCache clear up all sql.Stmt in cache +func (m *mySQLTreeStorage) clearStmtCache() { + m.statementMutex.Lock() + defer m.statementMutex.Unlock() + + klog.Info("Clearing all prepared statements") + + for _, ns := range m.statements { + for _, s := range ns { + if err := s.Close(); err != nil { + klog.Warningf("Failed to close stmt: %s", err) + } + } + } + + m.statements = make(map[string]map[int]*sql.Stmt) + clearedStmtCacheCounter.Inc() +} + // getStmt creates and caches sql.Stmt structs based on the passed in statement // and number of bound arguments. // TODO(al,martin): consider pulling this all out as a separate unit for reuse @@ -114,8 +133,6 @@ func (m *mySQLTreeStorage) getStmt(ctx context.Context, statement string, num in if m.statements[statement] != nil { if m.statements[statement][num] != nil { - // TODO(al,martin): we'll possibly need to expire Stmts from the cache, - // e.g. when DB connections break etc. return m.statements[statement][num], nil } } else { @@ -200,6 +217,7 @@ func (t *treeTX) getSubtrees(ctx context.Context, treeRevision int64, ids [][]by rows, err := stx.QueryContext(ctx, args...) if err != nil { + t.ts.clearStmtCache() klog.Warningf("Failed to get merkle subtrees: %s", err) return nil, err } @@ -296,6 +314,7 @@ func (t *treeTX) storeSubtrees(ctx context.Context, subtrees []*storagepb.Subtre r, err := stx.ExecContext(ctx, args...) if err != nil { + t.ts.clearStmtCache() klog.Warningf("Failed to set merkle subtrees: %s", err) return err }