Skip to content

Commit 026b12e

Browse files
authored
Merge pull request #378 from komari-monitor/dev
refactor: 事件路由
2 parents 0413c9a + 90aad4b commit 026b12e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+888
-671
lines changed

cmd/root.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"fmt"
55
"os"
66

7+
"log/slog"
8+
79
"github.com/gookit/event"
810
"github.com/komari-monitor/komari/cmd/flags"
911
"github.com/komari-monitor/komari/internal/eventType"
@@ -43,8 +45,11 @@ Made by Akizon77 with love.`,
4345
}
4446

4547
func Execute() {
46-
event.Trigger(eventType.ProcessStart, nil)
47-
defer event.Trigger(eventType.ProcessExit, nil)
48+
err, _ := event.Trigger(eventType.ProcessStart, event.M{})
49+
if err != nil {
50+
slog.Error("Something went wrong during process start.", slog.Any("error", err))
51+
os.Exit(1)
52+
}
4853
if err := RootCmd.Execute(); err != nil {
4954
fmt.Fprintln(os.Stderr, err)
5055
os.Exit(1)

cmd/server.go

Lines changed: 74 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,24 @@ package cmd
22

33
import (
44
"context"
5-
"fmt"
65
"log"
6+
"log/slog"
77
"net/http"
88
"os"
99
"os/signal"
10-
"strings"
1110
"syscall"
1211
"time"
1312

1413
"github.com/gin-gonic/gin"
1514
"github.com/gookit/event"
1615
"github.com/komari-monitor/komari/cmd/flags"
17-
api "github.com/komari-monitor/komari/internal/api_v1"
16+
"github.com/komari-monitor/komari/internal"
1817
"github.com/komari-monitor/komari/internal/conf"
19-
"github.com/komari-monitor/komari/internal/database"
20-
"github.com/komari-monitor/komari/internal/database/accounts"
2118
"github.com/komari-monitor/komari/internal/database/auditlog"
22-
"github.com/komari-monitor/komari/internal/database/dbcore"
23-
"github.com/komari-monitor/komari/internal/database/models"
24-
d_notification "github.com/komari-monitor/komari/internal/database/notification"
2519
"github.com/komari-monitor/komari/internal/database/records"
2620
"github.com/komari-monitor/komari/internal/database/tasks"
2721
"github.com/komari-monitor/komari/internal/eventType"
28-
"github.com/komari-monitor/komari/internal/geoip"
2922
logutil "github.com/komari-monitor/komari/internal/log"
30-
"github.com/komari-monitor/komari/internal/messageSender"
31-
"github.com/komari-monitor/komari/internal/notifier"
32-
"github.com/komari-monitor/komari/internal/oauth"
33-
"github.com/komari-monitor/komari/internal/patch"
34-
"github.com/komari-monitor/komari/internal/restore"
35-
"github.com/komari-monitor/komari/pkg/cloudflared"
3623
"github.com/komari-monitor/komari/server"
3724
"github.com/spf13/cobra"
3825
)
@@ -59,63 +46,19 @@ func RunServer() {
5946
if err := os.MkdirAll("./data/theme", os.ModePerm); err != nil {
6047
log.Fatalf("Failed to create theme directory: %v", err)
6148
}
62-
// 进行备份恢复
63-
if restore.NeedBackupRestore() {
64-
restore.RestoreBackup()
65-
}
66-
conf.Load()
67-
InitDatabase()
68-
patch.ApplyPatch()
69-
49+
internal.All()
7050
if conf.Version != conf.Version_Development {
7151
gin.SetMode(gin.ReleaseMode)
7252
}
7353

74-
config, err := conf.GetWithV1Format()
75-
if err != nil {
76-
log.Fatal(err)
77-
}
78-
7954
r := gin.New()
8055
r.Use(logutil.GinLogger())
8156
r.Use(logutil.GinRecovery())
8257

83-
event.Trigger(eventType.ServerInitializeStart, event.M{"config": config, "engine": r})
84-
85-
go geoip.InitGeoIp()
86-
go DoScheduledWork()
87-
go messageSender.Initialize()
88-
go oauth.Initialize()
89-
90-
server.StartNezhaGRPCServer(config.NezhaCompatListen)
91-
92-
event.On(eventType.ConfigUpdated, event.ListenerFunc(func(e event.Event) error {
93-
newConf := e.Get("new").(conf.Config)
94-
oldConf := e.Get("old").(conf.Config)
95-
if newConf.Login.OAuthProvider != oldConf.Login.OAuthProvider {
96-
oidcProvider, err := database.GetOidcConfigByName(newConf.Login.OAuthProvider)
97-
if err != nil {
98-
log.Printf("Failed to get OIDC provider config: %v", err)
99-
} else {
100-
log.Printf("Using %s as OIDC provider", oidcProvider.Name)
101-
}
102-
err = oauth.LoadProvider(oidcProvider.Name, oidcProvider.Addition)
103-
if err != nil {
104-
auditlog.EventLog("error", fmt.Sprintf("Failed to load OIDC provider: %v", err))
105-
}
106-
}
107-
if newConf.Notification.NotificationMethod != oldConf.Notification.NotificationMethod {
108-
messageSender.Initialize()
109-
}
110-
return nil
111-
}), event.Max)
112-
113-
// 初始化 cloudflared
114-
if strings.ToLower(GetEnv("KOMARI_ENABLE_CLOUDFLARED", "false")) == "true" {
115-
err := cloudflared.RunCloudflared() // 阻塞,确保cloudflared跑起来
116-
if err != nil {
117-
log.Fatalf("Failed to run cloudflared: %v", err)
118-
}
58+
err, _ := event.Trigger(eventType.ServerInitializeStart, event.M{"engine": r})
59+
if err != nil {
60+
slog.Error("Something went wrong during ServerInitializeStart event.", slog.Any("error", err))
61+
os.Exit(1)
11962
}
12063

12164
server.Init(r)
@@ -125,17 +68,25 @@ func RunServer() {
12568
Handler: r,
12669
}
12770

128-
event.Trigger(eventType.ServerInitializeDone, event.M{"config": config})
71+
event.Trigger(eventType.ServerInitializeDone, event.M{})
72+
ScheduledEventTasksInit()
12973

130-
log.Printf("Starting server on %s ...", flags.Listen)
131-
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
132-
OnFatal(err)
133-
log.Fatalf("listen: %s\n", err)
134-
}
13574
quit := make(chan os.Signal, 1)
13675
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
76+
77+
log.Printf("Starting server on %s ...", flags.Listen)
78+
79+
go func() {
80+
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
81+
OnFatal(err)
82+
event.Trigger(eventType.ProcessExit, event.M{})
83+
log.Fatalf("listen: %s\n", err)
84+
}
85+
}()
86+
13787
<-quit
13888
OnShutdown()
89+
event.Trigger(eventType.ProcessExit, event.M{})
13990
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
14091
defer cancel()
14192
if err := srv.Shutdown(ctx); err != nil {
@@ -144,66 +95,72 @@ func RunServer() {
14495

14596
}
14697

147-
func InitDatabase() {
148-
// // 打印数据库类型和连接信息
149-
// if flags.DatabaseType == "mysql" {
150-
// log.Printf("使用 MySQL 数据库连接: %s@%s:%s/%s",
151-
// flags.DatabaseUser, flags.DatabaseHost, flags.DatabasePort, flags.DatabaseName)
152-
// log.Printf("环境变量配置: [KOMARI_DB_TYPE=%s] [KOMARI_DB_HOST=%s] [KOMARI_DB_PORT=%s] [KOMARI_DB_USER=%s] [KOMARI_DB_NAME=%s]",
153-
// os.Getenv("KOMARI_DB_TYPE"), os.Getenv("KOMARI_DB_HOST"), os.Getenv("KOMARI_DB_PORT"),
154-
// os.Getenv("KOMARI_DB_USER"), os.Getenv("KOMARI_DB_NAME"))
155-
// } else {
156-
// log.Printf("使用 SQLite 数据库文件: %s", flags.DatabaseFile)
157-
// log.Printf("环境变量配置: [KOMARI_DB_TYPE=%s] [KOMARI_DB_FILE=%s]",
158-
// os.Getenv("KOMARI_DB_TYPE"), os.Getenv("KOMARI_DB_FILE"))
159-
// }
160-
var count int64 = 0
161-
if dbcore.GetDBInstance().Model(&models.User{}).Count(&count); count == 0 {
162-
user, passwd, err := accounts.CreateDefaultAdminAccount()
163-
if err != nil {
164-
panic(err)
165-
}
166-
log.Println("Default admin account created. Username:", user, ", Password:", passwd)
167-
}
168-
}
169-
17098
// #region 定时任务
17199
func DoScheduledWork() {
172100
tasks.ReloadPingSchedule()
173-
d_notification.ReloadLoadNotificationSchedule()
174-
ticker := time.NewTicker(time.Minute * 30)
175-
minute := time.NewTicker(60 * time.Second)
101+
176102
//records.DeleteRecordBefore(time.Now().Add(-time.Hour * 24 * 30))
103+
177104
records.CompactRecord()
178-
cfg, _ := conf.GetWithV1Format()
179-
go notifier.CheckExpireScheduledWork()
180-
for {
181-
select {
182-
case <-ticker.C:
183-
records.DeleteRecordBefore(time.Now().Add(-time.Hour * time.Duration(cfg.RecordPreserveTime)))
184-
records.CompactRecord()
185-
tasks.ClearTaskResultsByTimeBefore(time.Now().Add(-time.Hour * time.Duration(cfg.RecordPreserveTime)))
186-
tasks.DeletePingRecordsBefore(time.Now().Add(-time.Hour * time.Duration(cfg.PingRecordPreserveTime)))
187-
auditlog.RemoveOldLogs()
188-
case <-minute.C:
189-
api.SaveClientReportToDB()
190-
if !cfg.RecordEnabled {
191-
records.DeleteAll()
192-
tasks.DeleteAllPingRecords()
193-
}
194-
// 每分钟检查一次流量提醒
195-
go notifier.CheckTraffic()
105+
106+
event.On(eventType.SchedulerEvery30Minutes, event.ListenerFunc(func(e event.Event) error {
107+
cfg, err := conf.GetWithV1Format()
108+
if err != nil {
109+
slog.Warn("Failed to get config in scheduled task:", "error", err)
110+
return err
111+
}
112+
records.DeleteRecordBefore(time.Now().Add(-time.Hour * time.Duration(cfg.RecordPreserveTime)))
113+
records.CompactRecord()
114+
tasks.ClearTaskResultsByTimeBefore(time.Now().Add(-time.Hour * time.Duration(cfg.RecordPreserveTime)))
115+
tasks.DeletePingRecordsBefore(time.Now().Add(-time.Hour * time.Duration(cfg.PingRecordPreserveTime)))
116+
auditlog.RemoveOldLogs()
117+
return nil
118+
}))
119+
120+
event.On(eventType.SchedulerEveryMinute, event.ListenerFunc(func(e event.Event) error {
121+
cfg, err := conf.GetWithV1Format()
122+
if err != nil {
123+
slog.Warn("Failed to get config in scheduled task:", "error", err)
124+
return err
125+
}
126+
if !cfg.RecordEnabled {
127+
records.DeleteAll()
128+
tasks.DeleteAllPingRecords()
196129
}
197-
}
198130

131+
return nil
132+
}))
199133
}
200134

201135
func OnShutdown() {
202136
auditlog.Log("", "", "server is shutting down", "info")
203-
cloudflared.Kill()
204137
}
205138

206139
func OnFatal(err error) {
207140
auditlog.Log("", "", "server encountered a fatal error: "+err.Error(), "error")
208-
cloudflared.Kill()
141+
}
142+
143+
func ScheduledEventTasksInit() {
144+
go DoScheduledWork()
145+
go func() {
146+
every1m := time.NewTicker(1 * time.Minute)
147+
every5m := time.NewTicker(5 * time.Minute)
148+
every30m := time.NewTicker(30 * time.Minute)
149+
every1h := time.NewTicker(1 * time.Hour)
150+
every1d := time.NewTicker(24 * time.Hour)
151+
for {
152+
select {
153+
case <-every1m.C:
154+
event.Async(eventType.SchedulerEveryMinute, event.M{"interval": "1m"})
155+
case <-every5m.C:
156+
event.Async(eventType.SchedulerEvery5Minutes, event.M{"interval": "5m"})
157+
case <-every30m.C:
158+
event.Async(eventType.SchedulerEvery30Minutes, event.M{"interval": "30m"})
159+
case <-every1h.C:
160+
event.Async(eventType.SchedulerEveryHour, event.M{"interval": "1h"})
161+
case <-every1d.C:
162+
event.Async(eventType.SchedulerEveryDay, event.M{"interval": "1d"})
163+
}
164+
}
165+
}()
209166
}

internal/all.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package internal
2+
3+
import (
4+
// Import all internal packages to ensure their init() functions are executed
5+
_ "github.com/komari-monitor/komari/internal/api_rpc"
6+
_ "github.com/komari-monitor/komari/internal/api_v1"
7+
_ "github.com/komari-monitor/komari/internal/client"
8+
_ "github.com/komari-monitor/komari/internal/cloudflared"
9+
_ "github.com/komari-monitor/komari/internal/common"
10+
_ "github.com/komari-monitor/komari/internal/conf"
11+
_ "github.com/komari-monitor/komari/internal/database"
12+
_ "github.com/komari-monitor/komari/internal/eventType"
13+
_ "github.com/komari-monitor/komari/internal/geoip"
14+
_ "github.com/komari-monitor/komari/internal/log"
15+
_ "github.com/komari-monitor/komari/internal/messageSender"
16+
_ "github.com/komari-monitor/komari/internal/nezha"
17+
_ "github.com/komari-monitor/komari/internal/notifier"
18+
_ "github.com/komari-monitor/komari/internal/oauth"
19+
_ "github.com/komari-monitor/komari/internal/patch"
20+
_ "github.com/komari-monitor/komari/internal/pingSchedule"
21+
_ "github.com/komari-monitor/komari/internal/plugin"
22+
_ "github.com/komari-monitor/komari/internal/renewal"
23+
_ "github.com/komari-monitor/komari/internal/restore"
24+
_ "github.com/komari-monitor/komari/internal/ws"
25+
)
26+
27+
func All() {}

internal/api_rpc/common.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"strings"
99
"time"
1010

11-
api "github.com/komari-monitor/komari/internal/api_v1"
11+
"github.com/komari-monitor/komari/internal/api_v1/vars"
1212
"github.com/komari-monitor/komari/internal/common"
1313
"github.com/komari-monitor/komari/internal/conf"
1414
"github.com/komari-monitor/komari/internal/database"
@@ -489,7 +489,7 @@ func getNodeRecentStatus(ctx context.Context, req *rpc.JsonRpcRequest) (any, *rp
489489
}
490490
}
491491

492-
raw, _ := api.Records.Get(params.UUID)
492+
raw, _ := vars.Records.Get(params.UUID)
493493
reports, _ := raw.([]common.Report)
494494

495495
// 扁平化为 { count, records: [] }

internal/api_rpc/init.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package api_rpc
2+
3+
import (
4+
"github.com/gin-gonic/gin"
5+
"github.com/gookit/event"
6+
"github.com/komari-monitor/komari/internal/eventType"
7+
)
8+
9+
func init() {
10+
event.On(eventType.ServerInitializeStart, event.ListenerFunc(func(e event.Event) error {
11+
r := e.Get("engine").(*gin.Engine)
12+
RegisterRouters("/api/rpc2", r)
13+
return nil
14+
}))
15+
}

internal/api_v1/AdminAuthMiddleware.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package api_v1
33
import (
44
"net/http"
55

6+
"github.com/komari-monitor/komari/internal/api_v1/resp"
67
"github.com/komari-monitor/komari/internal/database/accounts"
78

89
"github.com/gin-gonic/gin"
@@ -20,15 +21,15 @@ func AdminAuthMiddleware() gin.HandlerFunc {
2021
// session-based authentication
2122
session, err := c.Cookie("session_token")
2223
if err != nil {
23-
RespondError(c, http.StatusUnauthorized, "Unauthorized.")
24+
resp.RespondError(c, http.StatusUnauthorized, "Unauthorized.")
2425
c.Abort()
2526
return
2627
}
2728

2829
// Komari is a single user system
2930
uuid, err := accounts.GetSession(session)
3031
if err != nil {
31-
RespondError(c, http.StatusUnauthorized, "Unauthorized.")
32+
resp.RespondError(c, http.StatusUnauthorized, "Unauthorized.")
3233
c.Abort()
3334
return
3435
}

0 commit comments

Comments
 (0)