Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
"syscall"
"time"

"github.com/hookdeck/outpost/internal/clickhouse"
"github.com/hookdeck/outpost/internal/config"
"github.com/hookdeck/outpost/internal/idgen"
"github.com/hookdeck/outpost/internal/infra"
"github.com/hookdeck/outpost/internal/logging"
"github.com/hookdeck/outpost/internal/logretention"
"github.com/hookdeck/outpost/internal/otel"
"github.com/hookdeck/outpost/internal/redis"
"github.com/hookdeck/outpost/internal/services"
Expand Down Expand Up @@ -76,6 +78,10 @@ func (a *App) PreRun(ctx context.Context) (err error) {
return err
}

if err := a.applyLogRetentionTTL(ctx); err != nil {
return err
}

if err := a.initializeInfrastructure(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -257,3 +263,21 @@ func (a *App) buildServices(ctx context.Context) error {
a.supervisor = supervisor
return nil
}

func (a *App) applyLogRetentionTTL(ctx context.Context) error {
// Skip if ClickHouse not configured
if a.config.ClickHouse.Addr == "" {
return nil
}

a.logger.Debug("applying log retention TTL")

chConn, err := clickhouse.New(a.config.ClickHouse.ToConfig())
if err != nil {
a.logger.Error("failed to connect to ClickHouse for TTL management", zap.Error(err))
return err
}
defer chConn.Close()

return logretention.Apply(ctx, a.redisClient, chConn, a.config.DeploymentID, a.config.ClickHouseLogRetentionTTLDays, a.logger)
}
5 changes: 5 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type Config struct {

// ID Generation
IDGen IDGenConfig `yaml:"idgen"`

// Retention
ClickHouseLogRetentionTTLDays int `yaml:"clickhouse_log_retention_ttl_days" env:"CLICKHOUSE_LOG_RETENTION_TTL_DAYS" desc:"Days to retain logs in ClickHouse. 0 = unlimited." required:"N"`
}

var (
Expand Down Expand Up @@ -206,6 +209,8 @@ func (c *Config) InitDefaults() {
Type: "uuidv4",
EventPrefix: "",
}

c.ClickHouseLogRetentionTTLDays = 0 // Unlimited by default
}

func (c *Config) parseConfigFile(flagPath string, osInterface OSInterface) error {
Expand Down
3 changes: 3 additions & 0 deletions internal/config/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func (c *Config) LogConfigurationSummary() []zap.Field {
// ID Generation
zap.String("idgen_type", c.IDGen.Type),
zap.String("idgen_event_prefix", c.IDGen.EventPrefix),

// Retention
zap.Int("clickhouse_log_retention_ttl_days", c.ClickHouseLogRetentionTTLDays),
}

// Add MQ-specific fields based on type
Expand Down
71 changes: 71 additions & 0 deletions internal/logretention/clickhouse_ttl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package logretention

import (
"context"
"fmt"

"github.com/hookdeck/outpost/internal/clickhouse"
)

// ClickHouseExecer is a minimal interface for ClickHouse operations needed by ClickHouseTTL.
type ClickHouseExecer interface {
Exec(ctx context.Context, query string, args ...any) error
}

// ClickHouseTTL implements LogStoreTTL for ClickHouse.
type ClickHouseTTL struct {
conn ClickHouseExecer
eventsTable string
attemptsTable string
}

var _ logStoreTTL = (*ClickHouseTTL)(nil)

// NewClickHouseTTL creates a new ClickHouse TTL applier.
func NewClickHouseTTL(conn clickhouse.DB, deploymentID string) *ClickHouseTTL {
return newClickHouseTTLWithExecer(conn, deploymentID)
}

// newClickHouseTTLWithExecer creates a ClickHouse TTL applier with a minimal execer (for testing).
func newClickHouseTTLWithExecer(conn ClickHouseExecer, deploymentID string) *ClickHouseTTL {
prefix := ""
if deploymentID != "" {
prefix = deploymentID + "_"
}
return &ClickHouseTTL{
conn: conn,
eventsTable: prefix + "events",
attemptsTable: prefix + "attempts",
}
}

// ApplyTTL modifies the TTL on ClickHouse tables.
// If ttlDays is 0, the TTL is removed.
func (c *ClickHouseTTL) ApplyTTL(ctx context.Context, ttlDays int) error {
// Apply TTL to events table
if err := c.alterTableTTL(ctx, c.eventsTable, "event_time", ttlDays); err != nil {
return fmt.Errorf("failed to alter TTL on events table: %w", err)
}

// Apply TTL to attempts table
if err := c.alterTableTTL(ctx, c.attemptsTable, "attempt_time", ttlDays); err != nil {
return fmt.Errorf("failed to alter TTL on attempts table: %w", err)
}

return nil
}

// alterTableTTL modifies the TTL on a single ClickHouse table.
// Table/column names are interpolated via fmt.Sprintf because ClickHouse doesn't support
// parameterized identifiers in DDL. All values are derived from operator config (deploymentID)
// and hardcoded column names — never from user input.
func (c *ClickHouseTTL) alterTableTTL(ctx context.Context, tableName, timeColumn string, ttlDays int) error {
var query string
if ttlDays == 0 {
query = fmt.Sprintf("ALTER TABLE %s REMOVE TTL", tableName)
} else {
query = fmt.Sprintf("ALTER TABLE %s MODIFY TTL %s + INTERVAL %d DAY", tableName, timeColumn, ttlDays)
}

return c.conn.Exec(ctx, query)
}
126 changes: 126 additions & 0 deletions internal/logretention/clickhouse_ttl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package logretention

import (
"context"
"errors"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// mockClickHouseConn implements ClickHouseExecer for testing.
type mockClickHouseConn struct {
execCalls []string
execErr error
// failOnTable allows simulating failure on specific table
failOnTable string
}

var _ ClickHouseExecer = (*mockClickHouseConn)(nil)

func (m *mockClickHouseConn) Exec(ctx context.Context, query string, args ...any) error {
m.execCalls = append(m.execCalls, query)
if m.failOnTable != "" && strings.Contains(query, m.failOnTable) {
return m.execErr
}
if m.execErr != nil && m.failOnTable == "" {
return m.execErr
}
return nil
}

func TestClickHouseTTL_ApplyTTL(t *testing.T) {
tests := []struct {
name string
deploymentID string
ttlDays int
wantQueries []string
wantQueryCount int
execErr error
failOnTable string
wantErr bool
wantErrContains string
}{
{
name: "set TTL - no deployment",
deploymentID: "",
ttlDays: 30,
wantQueries: []string{
"ALTER TABLE events MODIFY TTL event_time + INTERVAL 30 DAY",
"ALTER TABLE attempts MODIFY TTL attempt_time + INTERVAL 30 DAY",
},
wantQueryCount: 2,
},
{
name: "set TTL - with deployment",
deploymentID: "dpm_001",
ttlDays: 7,
wantQueries: []string{
"ALTER TABLE dpm_001_events MODIFY TTL event_time + INTERVAL 7 DAY",
"ALTER TABLE dpm_001_attempts MODIFY TTL attempt_time + INTERVAL 7 DAY",
},
wantQueryCount: 2,
},
{
name: "remove TTL - set to 0",
deploymentID: "",
ttlDays: 0,
wantQueries: []string{
"ALTER TABLE events REMOVE TTL",
"ALTER TABLE attempts REMOVE TTL",
},
wantQueryCount: 2,
},
{
name: "events table fails - stops before attempts",
deploymentID: "",
ttlDays: 30,
execErr: errors.New("table not found"),
failOnTable: "events",
wantErr: true,
wantErrContains: "events table",
wantQueryCount: 1,
},
{
name: "attempts table fails",
deploymentID: "",
ttlDays: 30,
execErr: errors.New("permission denied"),
failOnTable: "attempts",
wantErr: true,
wantErrContains: "attempts table",
wantQueryCount: 2,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conn := &mockClickHouseConn{
execErr: tt.execErr,
failOnTable: tt.failOnTable,
}

ch := newClickHouseTTLWithExecer(conn, tt.deploymentID)
err := ch.ApplyTTL(context.Background(), tt.ttlDays)

if tt.wantErr {
require.Error(t, err)
if tt.wantErrContains != "" {
assert.Contains(t, err.Error(), tt.wantErrContains)
}
} else {
require.NoError(t, err)
}
assert.Len(t, conn.execCalls, tt.wantQueryCount)
if tt.wantQueries != nil && !tt.wantErr {
for i, wantQuery := range tt.wantQueries {
if i < len(conn.execCalls) {
assert.Equal(t, wantQuery, conn.execCalls[i], "query %d", i)
}
}
}
})
}
}
58 changes: 58 additions & 0 deletions internal/logretention/redis_policy_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package logretention

import (
"context"
"fmt"
"strconv"

"github.com/hookdeck/outpost/internal/redis"
)

// RedisPolicyStore implements PolicyStore using Redis.
type RedisPolicyStore struct {
client redis.Cmdable
deploymentID string
}

var _ policyStore = (*RedisPolicyStore)(nil)

// NewRedisPolicyStore creates a new Redis-backed policy store.
func NewRedisPolicyStore(client redis.Cmdable, deploymentID string) *RedisPolicyStore {
return &RedisPolicyStore{
client: client,
deploymentID: deploymentID,
}
}

// redisKey returns the Redis key for storing the applied TTL.
// In multi-deployment mode, keys are prefixed with <deploymentID>:.
func (s *RedisPolicyStore) redisKey() string {
if s.deploymentID == "" {
return "outpost:log_retention_ttl"
}
return fmt.Sprintf("%s:outpost:log_retention_ttl", s.deploymentID)
}

// GetAppliedTTL reads the persisted TTL value from Redis.
// Returns -1 if the key doesn't exist.
func (s *RedisPolicyStore) GetAppliedTTL(ctx context.Context) (int, error) {
val, err := s.client.Get(ctx, s.redisKey()).Result()
if err == redis.Nil {
return -1, nil // Key doesn't exist
}
if err != nil {
return 0, err
}

ttl, err := strconv.Atoi(val)
if err != nil {
return 0, fmt.Errorf("invalid TTL value in Redis: %w", err)
}

return ttl, nil
}

// SetAppliedTTL writes the TTL value to Redis.
func (s *RedisPolicyStore) SetAppliedTTL(ctx context.Context, ttlDays int) error {
return s.client.Set(ctx, s.redisKey(), strconv.Itoa(ttlDays), 0).Err()
}
Loading
Loading