From 28dd032ac159a5ee351a6657b1a7dbfd9fe99612 Mon Sep 17 00:00:00 2001 From: Chris Ricketts Date: Thu, 28 May 2026 13:03:40 +0100 Subject: [PATCH 1/2] refactor: decouple solver runtime & api services --- README.md | 17 +- cmd/solverd/main.go | 267 +++++++++++------- internal/core/application/preimage_service.go | 132 +-------- internal/core/application/swap_service.go | 61 +--- internal/interface/grpc/e2e_test.go | 22 +- internal/interface/grpc/handler.go | 3 +- internal/interface/grpc/server.go | 43 ++- pkg/banco/README.md | 6 +- pkg/preimage/README.md | 8 +- pkg/solver/README.md | 31 +- pkg/solver/solver.go | 14 +- pkg/solver/solver_test.go | 20 +- test/e2e/main_test.go | 89 ++++-- 13 files changed, 342 insertions(+), 371 deletions(-) diff --git a/README.md b/README.md index 7cb965e..3b784d0 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ them atomically via an emulator-signed Arkade transaction. ``` arkd tx stream ─► Solver ─► Plugin.Match(tx) ─► intent ─► Plugin.Solve(intent) │ - └── fans every tx out to all registered plugins + └── runs enabled plugins in one runtime ``` A solver bot is a small runtime that subscribes to arkd's transaction stream and @@ -41,8 +41,10 @@ two plugins ship with the daemon: - **`pkg/preimage`** — preimage-gated claim solver. Decrypts an ECIES payload attached to the funding tx and claims the VTXO when the arkade-script matches. -Each enabled plugin owns its own `Solver` and arkd subscription, so adding a -new protocol means writing a new `Plugin` and wiring it in `cmd/solverd`. See +`solverd` composes enabled plugins into one `Solver` runtime. The runtime may +still use per-plugin arkd subscriptions internally so server-side filters can +drop unrelated txs before they reach the bot. Adding a new protocol means +writing a new `Plugin` and wiring it in `cmd/solverd`. See [`pkg/solver/README.md`](pkg/solver/README.md) for the plugin authoring guide. ## Packages @@ -76,9 +78,8 @@ dispatches each one to its registered plugins. - `Plugin` interface — `Match(ctx, *psbt.Packet) (intent any, ok bool)` decides whether a tx is interesting; `Solve(ctx, intent)` reacts to a match. - `Solver` / `New(plugins ...Plugin)` — runtime wrapping one or more plugins. -- `Run(ctx, <-chan *psbt.Packet) error` — drains the channel sequentially, - fans matches out to `Solve` goroutines. Returns `ctx.Err()` on cancel, - `nil` when the channel closes. +- `Run(ctx, source) error` — subscribes plugins, fans matches out to `Solve` + goroutines, and returns `ctx.Err()` on cancel. ### `pkg/banco` @@ -143,8 +144,8 @@ web UI. Configured entirely through environment variables: | `SOLVER_BANCO_ENABLED` | | `true` | enable the swap plugin | | `SOLVER_PREIMAGE_ENABLED` | | `false` | enable the preimage-claim plugin | -At least one plugin must be enabled. Each enabled plugin owns its own solver -and arkd subscription. +At least one plugin must be enabled. The daemon registers all enabled plugins +in one solver runtime. ### `solver` diff --git a/cmd/solverd/main.go b/cmd/solverd/main.go index a6c101e..b88fbbc 100644 --- a/cmd/solverd/main.go +++ b/cmd/solverd/main.go @@ -28,7 +28,9 @@ import ( "github.com/arkade-os/solver/internal/infrastructure/pricefeed" grpcservice "github.com/arkade-os/solver/internal/interface/grpc" "github.com/arkade-os/solver/pkg/banco" + "github.com/arkade-os/solver/pkg/preimage" "github.com/arkade-os/solver/pkg/solver" + "github.com/arkade-os/solver/pkg/solver/arkdsource" ) // Version is injected at build time via -ldflags "-X main.Version=". @@ -36,16 +38,24 @@ import ( var Version = "dev" func main() { + log := logrus.New() + + if err := run(log); err != nil { + log.Error(err) + os.Exit(1) + } +} + +func run(log *logrus.Logger) error { cfg, err := config.LoadConfig() if err != nil { - logrus.WithError(err).Fatal("failed to load config") + return fmt.Errorf("failed to load config: %w", err) } - log := logrus.New() log.SetLevel(logrus.Level(cfg.LogLevel)) if err := os.MkdirAll(cfg.Datadir, 0750); err != nil { - log.WithError(err).Fatal("failed to create datadir") + return fmt.Errorf("failed to create datadir: %w", err) } emulatorConn, err := grpc.NewClient( @@ -53,109 +63,69 @@ func main() { grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { - log.WithError(err).Fatal("failed to connect to emulator") + return fmt.Errorf("failed to connect to emulator: %w", err) } // nolint:errcheck defer emulatorConn.Close() emulator := emulatorclient.NewGRPCClient(emulatorConn) ctx := context.Background() - identityStore, err := singlekeyfilestore.NewStore(cfg.Datadir) - if err != nil { - log.WithError(err).Fatal("failed to init identity store") - } - singleKeyIdentity, err := singlekey.NewIdentity(identityStore) - if err != nil { - log.WithError(err).Fatal("failed to init single-key identity") - } - walletOpts := []arksdk.WalletOption{arksdk.WithIdentity(singleKeyIdentity)} - arkClient, err := arksdk.LoadWallet(cfg.Datadir, walletOpts...) + arkClient, err := setupWallet(ctx, cfg) if err != nil { - // Fresh datadir surfaces as either go-sdk's or client-lib's - // ErrNotInitialized depending on which layer first noticed the - // missing config — both are valid "no wallet yet" signals. - if !errors.Is(err, arksdk.ErrNotInitialized) && - !errors.Is(err, arkdclient.ErrNotInitialized) { - log.WithError(err).Fatal("failed to load ark client") - } - // Fresh datadir — create and initialize the wallet. - arkClient, err = arksdk.NewWallet(cfg.Datadir, walletOpts...) - if err != nil { - log.WithError(err).Fatal("failed to create ark client") - } - if err := arkClient.Init(ctx, cfg.ArkURL, cfg.WalletSeed, cfg.WalletPassword); err != nil { - log.WithError(err).Fatal("failed to init ark client") - } - } - if err := arkClient.Unlock(ctx, cfg.WalletPassword); err != nil { - log.WithError(err).Fatal("failed to unlock ark client") + return fmt.Errorf("failed to setup wallet: %w", err) } defer arkClient.Stop() - var ( - takerSvc *application.TakerService - preimageSvc *application.PreimageService - srv *grpcservice.Server - db = optionalSqliteDB(cfg, log) - ) - if db != nil { + plugins := make([]solver.Plugin, 0) + serverOpts := make([]grpcservice.Option, 0) + + if cfg.BancoEnabled { + db, err := sqlitedb.OpenDB(cfg.Datadir) + if err != nil { + return fmt.Errorf("failed to open database: %w", err) + } // nolint:errcheck defer db.Close() - } - if cfg.BancoEnabled { - if db == nil { - log.Fatal("banco plugin requires sqlite datadir") + svc, plugin, err := setupBanco(db, arkClient, emulator, log) + if err != nil { + return fmt.Errorf("failed to setup banco: %w", err) } - pairRepo := sqlitedb.NewPairRepository(db) - tradeRepo := sqlitedb.NewTradeRepository(db) - priceFeed := pricefeed.NewCoinGecko() - tradeListener := application.NewTradeListener(tradeRepo, log) - - plugin := banco.NewPlugin(banco.Config{ - SolverClient: arkClient, - Emulator: emulator, - PairsRepository: pairRepo, - PriceFeed: priceFeed, - Listener: tradeListener, - Log: log, - }) - s := solver.New(plugin).WithLogger(log) - - takerSvc = application.NewTakerService(s, pairRepo, tradeRepo, arkClient, arkClient.Indexer(), log) - takerSvc.Start() - log.Info("banco plugin started") + plugins = append(plugins, plugin) + serverOpts = append(serverOpts, grpcservice.WithBancoService(svc)) + + log.Info("banco plugin enabled") } if cfg.PreimageEnabled { - solverPriv, err := deriveSolverPrivKey(cfg.WalletSeed) + svc, plugin, err := setupPreimage(ctx, cfg, arkClient, emulator, log) if err != nil { - log.WithError(err).Fatal("failed to derive preimage solver privkey") - } - preimageSvc, err = application.NewPreimageService(ctx, application.PreimageServiceConfig{ - ArkClient: arkClient, - Emulator: emulator, - SolverPrivKey: solverPriv, - Log: log, - }) - if err != nil { - log.WithError(err).Fatal("failed to create preimage service") - } - if err := preimageSvc.Start(); err != nil { - log.WithError(err).Fatal("failed to start preimage service") + return fmt.Errorf("failed to setup preimage: %w", err) } - log.Info("preimage plugin started") + plugins = append(plugins, plugin) + serverOpts = append(serverOpts, grpcservice.WithPreimageService(svc)) + log.Info("preimage plugin enabled") } // One gRPC + HTTP server hosts whichever services are enabled. - if cfg.BancoEnabled || cfg.PreimageEnabled { - srv = grpcservice.NewServer(takerSvc, cfg.GRPCPort, cfg.HTTPPort, log). - WithPreimageService(preimageSvc) + if len(serverOpts) > 0 { + srv := grpcservice.NewServer(cfg.GRPCPort, cfg.HTTPPort, log, serverOpts...) if err := srv.Start(); err != nil { - log.WithError(err).Fatal("failed to start server") + return fmt.Errorf("failed to start server: %w", err) } + defer srv.Stop() } + solverCtx, cancelSolver := context.WithCancel(ctx) + defer cancelSolver() + + solverDone := make(chan error, 1) + s := solver.New(plugins...).WithLogger(log) + src := arkdsource.New(arkClient.Client(), log) + go func() { + solverDone <- s.Run(solverCtx, src) + }() + log.WithField("version", Version). WithField("banco", cfg.BancoEnabled). WithField("preimage", cfg.PreimageEnabled). @@ -163,31 +133,136 @@ func main() { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - <-sigCh - log.Info("shutting down...") - if preimageSvc != nil { - preimageSvc.Stop() + select { + case <-sigCh: + log.Info("shutting down...") + cancelSolver() + signal.Stop(sigCh) + <-solverDone + case err := <-solverDone: + if !errors.Is(err, context.Canceled) { + return fmt.Errorf("solver runtime exited unexpectedly: %w", err) + } } - if takerSvc != nil { - takerSvc.Stop() + + return nil +} + +func setupWallet(ctx context.Context, cfg *config.Config) (arksdk.Wallet, error) { + identityStore, err := singlekeyfilestore.NewStore(cfg.Datadir) + if err != nil { + return nil, fmt.Errorf("init identity store: %w", err) + } + singleKeyIdentity, err := singlekey.NewIdentity(identityStore) + if err != nil { + return nil, fmt.Errorf("init single-key identity: %w", err) } - if srv != nil { - srv.Stop() + + walletOpts := []arksdk.WalletOption{arksdk.WithIdentity(singleKeyIdentity)} + arkClient, err := arksdk.LoadWallet(cfg.Datadir, walletOpts...) + if err != nil { + // Fresh datadir surfaces as either go-sdk's or client-lib's + // ErrNotInitialized depending on which layer first noticed the + // missing config — both are valid "no wallet yet" signals. + if !errors.Is(err, arksdk.ErrNotInitialized) && + !errors.Is(err, arkdclient.ErrNotInitialized) { + return nil, fmt.Errorf("load ark client: %w", err) + } + + arkClient, err = arksdk.NewWallet(cfg.Datadir, walletOpts...) + if err != nil { + return nil, fmt.Errorf("create ark client: %w", err) + } + if err := arkClient.Init(ctx, cfg.ArkURL, cfg.WalletSeed, cfg.WalletPassword); err != nil { + return nil, fmt.Errorf("init ark client: %w", err) + } } - log.Info("solverd stopped") + if err := arkClient.Unlock(ctx, cfg.WalletPassword); err != nil { + return nil, fmt.Errorf("unlock ark client: %w", err) + } + + return arkClient, nil } -// optionalSqliteDB opens the sqlite DB iff banco plugin is enabled -func optionalSqliteDB(cfg *config.Config, log logrus.FieldLogger) *sql.DB { - if !cfg.BancoEnabled { - return nil +func setupBanco( + db *sql.DB, + arkClient arksdk.Wallet, + emulator emulatorclient.TransportClient, + log logrus.FieldLogger, +) (*application.TakerService, solver.Plugin, error) { + pairRepo := sqlitedb.NewPairRepository(db) + tradeRepo := sqlitedb.NewTradeRepository(db) + priceFeed := pricefeed.NewCoinGecko() + tradeListener := application.NewTradeListener(tradeRepo, log) + + plugin := banco.NewPlugin(banco.Config{ + SolverClient: arkClient, + Emulator: emulator, + PairsRepository: pairRepo, + PriceFeed: priceFeed, + Listener: tradeListener, + Log: log, + }) + takerSvc := application.NewTakerService(pairRepo, tradeRepo, arkClient, arkClient.Indexer(), log) + + return takerSvc, plugin, nil +} + +func setupPreimage( + ctx context.Context, + cfg *config.Config, + arkClient arksdk.Wallet, + emulator emulatorclient.TransportClient, + log logrus.FieldLogger, +) (*application.PreimageService, solver.Plugin, error) { + solverPriv, err := deriveSolverPrivKey(cfg.WalletSeed) + if err != nil { + return nil, nil, fmt.Errorf("derive preimage solver privkey: %w", err) } - db, err := sqlitedb.OpenDB(cfg.Datadir) + info, err := emulator.GetInfo(ctx) if err != nil { - log.WithError(err).Fatal("failed to open database") + return nil, nil, fmt.Errorf("get emulator info: %w", err) } - return db + rawEmulatorPub, err := hex.DecodeString(info.SignerPublicKey) + if err != nil { + return nil, nil, fmt.Errorf("decode emulator pubkey: %w", err) + } + emulatorPub, err := btcec.ParsePubKey(rawEmulatorPub) + if err != nil { + return nil, nil, fmt.Errorf("parse emulator pubkey: %w", err) + } + configData, err := arkClient.GetConfigData(ctx) + if err != nil { + return nil, nil, fmt.Errorf("get ark config: %w", err) + } + checkpointBytes, err := hex.DecodeString(configData.CheckpointTapscript) + if err != nil { + return nil, nil, fmt.Errorf("decode checkpoint tapscript: %w", err) + } + preimageSvc, err := application.NewPreimageService(application.PreimageServiceConfig{ + SolverPrivKey: solverPriv, + EmulatorPubKey: emulatorPub, + Log: log, + }) + if err != nil { + return nil, nil, fmt.Errorf("create preimage service: %w", err) + } + plugin, err := preimage.NewPlugin(ctx, preimage.Config{ + ArkClient: arkClient, + Emulator: emulator, + SolverPrivKey: solverPriv, + EmulatorPubKey: emulatorPub, + ServerPubKey: configData.SignerPubKey, + CheckpointTapscript: checkpointBytes, + Network: configData.Network, + Log: log, + }) + if err != nil { + return nil, nil, fmt.Errorf("build preimage plugin: %w", err) + } + + return preimageSvc, plugin, nil } const preimageKeyDomain = "solverd/preimage-plugin/v1" diff --git a/internal/core/application/preimage_service.go b/internal/core/application/preimage_service.go index 20f8b47..8190234 100644 --- a/internal/core/application/preimage_service.go +++ b/internal/core/application/preimage_service.go @@ -1,144 +1,36 @@ package application import ( - "context" - "encoding/hex" - "errors" "fmt" - "sync" - arklib "github.com/arkade-os/arkd/pkg/ark-lib" - emulatorclient "github.com/arkade-os/emulator/pkg/client" - arksdk "github.com/arkade-os/go-sdk" "github.com/btcsuite/btcd/btcec/v2" "github.com/sirupsen/logrus" - - "github.com/arkade-os/solver/pkg/preimage" - "github.com/arkade-os/solver/pkg/solver" - "github.com/arkade-os/solver/pkg/solver/arkdsource" ) type PreimageServiceConfig struct { - ArkClient arksdk.Wallet - Emulator emulatorclient.TransportClient - SolverPrivKey *btcec.PrivateKey // ECIES decryption key (derived from wallet seed) - Log logrus.FieldLogger + SolverPrivKey *btcec.PrivateKey // ECIES decryption key (derived from wallet seed) + EmulatorPubKey *btcec.PublicKey + Log logrus.FieldLogger } -// PreimageService runs the preimage solver loop and exposes the solver pubkey -// for clients to encrypt against. +// PreimageService exposes preimage plugin metadata for API clients. type PreimageService struct { - cfg PreimageServiceConfig - emulatorPubkey *btcec.PublicKey - serverPubkey *btcec.PublicKey - checkpointScript []byte - network arklib.Network - log logrus.FieldLogger - - cancel context.CancelFunc - done chan struct{} - mu sync.RWMutex - running bool + cfg PreimageServiceConfig + log logrus.FieldLogger } -func NewPreimageService(ctx context.Context, cfg PreimageServiceConfig) (*PreimageService, error) { - if cfg.ArkClient == nil { - return nil, fmt.Errorf("PreimageServiceConfig.ArkClient must not be nil") - } - if cfg.Emulator == nil { - return nil, fmt.Errorf("PreimageServiceConfig.Emulator must not be nil") - } +func NewPreimageService(cfg PreimageServiceConfig) (*PreimageService, error) { if cfg.SolverPrivKey == nil { return nil, fmt.Errorf("PreimageServiceConfig.SolverPrivKey must not be nil") } + if cfg.EmulatorPubKey == nil { + return nil, fmt.Errorf("PreimageServiceConfig.EmulatorPubKey must not be nil") + } if cfg.Log == nil { cfg.Log = logrus.StandardLogger() } - info, err := cfg.Emulator.GetInfo(ctx) - if err != nil { - return nil, fmt.Errorf("get emulator info: %w", err) - } - rawIntro, err := hex.DecodeString(info.SignerPublicKey) - if err != nil { - return nil, fmt.Errorf("decode emulator pubkey: %w", err) - } - emulatorPub, err := btcec.ParsePubKey(rawIntro) - if err != nil { - return nil, fmt.Errorf("parse emulator pubkey: %w", err) - } - - configData, err := cfg.ArkClient.GetConfigData(ctx) - if err != nil { - return nil, fmt.Errorf("get ark config: %w", err) - } - checkpointBytes, err := hex.DecodeString(configData.CheckpointTapscript) - if err != nil { - return nil, fmt.Errorf("decode checkpoint tapscript: %w", err) - } - - return &PreimageService{ - cfg: cfg, - emulatorPubkey: emulatorPub, - serverPubkey: configData.SignerPubKey, - checkpointScript: checkpointBytes, - network: configData.Network, - log: cfg.Log, - }, nil -} - -func (svc *PreimageService) Start() error { - ctx, cancel := context.WithCancel(context.Background()) - - plugin, err := preimage.NewPlugin(ctx, preimage.Config{ - ArkClient: svc.cfg.ArkClient, - Emulator: svc.cfg.Emulator, - SolverPrivKey: svc.cfg.SolverPrivKey, - EmulatorPubKey: svc.emulatorPubkey, - ServerPubKey: svc.serverPubkey, - CheckpointTapscript: svc.checkpointScript, - Network: svc.network, - Log: svc.log, - }) - if err != nil { - cancel() - return fmt.Errorf("build preimage plugin: %w", err) - } - - s := solver.New(plugin).WithLogger(svc.log) - src := arkdsource.New(svc.cfg.ArkClient.Client(), svc.log) - - svc.cancel = cancel - svc.done = make(chan struct{}) - svc.setRunning(true) - - go func() { - defer close(svc.done) - defer svc.setRunning(false) - if err := s.Run(ctx, src); err != nil && !errors.Is(err, context.Canceled) { - svc.log.WithError(err).Error("preimage solver run exited") - } - }() - return nil -} - -func (svc *PreimageService) Stop() { - if svc.cancel != nil { - svc.cancel() - <-svc.done - } -} - -func (svc *PreimageService) Status() Status { - svc.mu.RLock() - defer svc.mu.RUnlock() - return Status{Running: svc.running} -} - -func (svc *PreimageService) setRunning(v bool) { - svc.mu.Lock() - svc.running = v - svc.mu.Unlock() + return &PreimageService{cfg: cfg, log: cfg.Log}, nil } // SolverPubKey returns the encryption pubkey clients must use to ECIES-encrypt @@ -150,5 +42,5 @@ func (svc *PreimageService) SolverPubKey() *btcec.PublicKey { // EmulatorPubKey returns the bot's configured emulator pubkey, // fetched at service construction time via Emulator.GetInfo(). func (svc *PreimageService) EmulatorPubKey() *btcec.PublicKey { - return svc.emulatorPubkey + return svc.cfg.EmulatorPubKey } diff --git a/internal/core/application/swap_service.go b/internal/core/application/swap_service.go index 1928f6d..2a4bea0 100644 --- a/internal/core/application/swap_service.go +++ b/internal/core/application/swap_service.go @@ -2,11 +2,9 @@ package application import ( "context" - "errors" "fmt" "strconv" "strings" - "sync" "github.com/arkade-os/arkd/pkg/client-lib/indexer" arksdk "github.com/arkade-os/go-sdk" @@ -14,37 +12,21 @@ import ( "github.com/arkade-os/solver/internal/core/ports" "github.com/arkade-os/solver/pkg/banco" - "github.com/arkade-os/solver/pkg/solver" - "github.com/arkade-os/solver/pkg/solver/arkdsource" ) const btcDecimals = 8 -// Status reports whether the solver run goroutine is currently active. -type Status struct { - Running bool -} - -// TakerService is the application-level service that owns the solver run -// loop and provides CRUD for trading pairs plus wallet operations. +// TakerService provides CRUD for trading pairs plus wallet operations. type TakerService struct { - solver *solver.Solver pairRepo ports.PairRepository tradeRepo ports.TradeRepository arkClient arksdk.Wallet indexer indexer.Indexer log logrus.FieldLogger - - cancel context.CancelFunc - done chan struct{} - mu sync.RWMutex - running bool } -// NewTakerService creates a new TakerService. The caller is responsible -// for constructing the solver from a banco.Plugin. +// NewTakerService creates a new TakerService. func NewTakerService( - s *solver.Solver, pairRepo ports.PairRepository, tradeRepo ports.TradeRepository, arkClient arksdk.Wallet, @@ -55,7 +37,6 @@ func NewTakerService( log = logrus.StandardLogger() } return &TakerService{ - solver: s, pairRepo: pairRepo, tradeRepo: tradeRepo, arkClient: arkClient, @@ -73,44 +54,6 @@ func (svc *TakerService) ListTrades(ctx context.Context, limit int) ([]ports.Tra return svc.tradeRepo.List(ctx, limit) } -// Start spawns the solver run goroutine, subscribed to the arkd tx stream. -func (svc *TakerService) Start() { - ctx, cancel := context.WithCancel(context.Background()) - svc.cancel = cancel - svc.done = make(chan struct{}) - - src := arkdsource.New(svc.arkClient.Client(), svc.log) - svc.setRunning(true) - go func() { - defer close(svc.done) - defer svc.setRunning(false) - if err := svc.solver.Run(ctx, src); err != nil && !errors.Is(err, context.Canceled) { - svc.log.WithError(err).Error("solver run exited") - } - }() -} - -// Stop signals the run goroutine to exit and waits for it. -func (svc *TakerService) Stop() { - if svc.cancel != nil { - svc.cancel() - <-svc.done - } -} - -// Status reports whether Run is active. -func (svc *TakerService) Status() Status { - svc.mu.RLock() - defer svc.mu.RUnlock() - return Status{Running: svc.running} -} - -func (svc *TakerService) setRunning(v bool) { - svc.mu.Lock() - svc.running = v - svc.mu.Unlock() -} - // AddPair validates, resolves decimals from the indexer, and adds a new pair. func (svc *TakerService) AddPair(ctx context.Context, pair banco.Pair) error { resolved, err := svc.resolveDecimals(ctx, pair) diff --git a/internal/interface/grpc/e2e_test.go b/internal/interface/grpc/e2e_test.go index 266ca29..04a7031 100644 --- a/internal/interface/grpc/e2e_test.go +++ b/internal/interface/grpc/e2e_test.go @@ -3,7 +3,6 @@ package grpcservice import ( "context" "testing" - "time" "github.com/arkade-os/arkd/pkg/ark-lib/asset" "github.com/arkade-os/arkd/pkg/client-lib/indexer" @@ -15,8 +14,6 @@ import ( bancov1 "github.com/arkade-os/solver/api-spec/protobuf/gen/go/solverd/v1" "github.com/arkade-os/solver/internal/core/application" sqlitedb "github.com/arkade-os/solver/internal/infrastructure/db/sqlite" - "github.com/arkade-os/solver/pkg/banco" - "github.com/arkade-os/solver/pkg/solver" ) // mockArkClient implements arksdk.Wallet with stub methods for balance and address. @@ -77,18 +74,13 @@ func setupHandler(t *testing.T) bancov1.BancoServiceServer { pairRepo := sqlitedb.NewPairRepository(db) - plugin := banco.NewPlugin(banco.Config{ - PriceCacheTTL: 5 * time.Minute, - }) - s := solver.New(plugin) - idx := &mockIndexer{decimals: map[string]string{ "USDT": "6", "ETH": "18", "LTC": "8", }} tradeRepo := sqlitedb.NewTradeRepository(db) - svc := application.NewTakerService(s, pairRepo, tradeRepo, &mockArkClient{}, idx, nil) + svc := application.NewTakerService(pairRepo, tradeRepo, &mockArkClient{}, idx, nil) return newHandler(svc) } @@ -291,18 +283,6 @@ func TestAddPair_Duplicate(t *testing.T) { assert.Error(t, err, "adding duplicate pair should fail with PRIMARY KEY conflict") } -// TestGetStatus verifies that GetStatus returns a valid response. -func TestGetStatus(t *testing.T) { - h := setupHandler(t) - ctx := context.Background() - - resp, err := h.GetStatus(ctx, &bancov1.GetStatusRequest{}) - require.NoError(t, err) - - // The solver has not been Started, so Running should be false. - assert.False(t, resp.Running) -} - // TestGetBalance verifies that GetBalance returns expected values from the mock client. func TestGetBalance(t *testing.T) { h := setupHandler(t) diff --git a/internal/interface/grpc/handler.go b/internal/interface/grpc/handler.go index 761dd51..e4e6204 100644 --- a/internal/interface/grpc/handler.go +++ b/internal/interface/grpc/handler.go @@ -82,8 +82,7 @@ func (h *handler) ListPairs( func (h *handler) GetStatus( ctx context.Context, _ *bancov1.GetStatusRequest, ) (*bancov1.GetStatusResponse, error) { - status := h.svc.Status() - return &bancov1.GetStatusResponse{Running: status.Running}, nil + return &bancov1.GetStatusResponse{Running: true}, nil } func (h *handler) GetBalance( diff --git a/internal/interface/grpc/server.go b/internal/interface/grpc/server.go index e8ebea6..c2a5b99 100644 --- a/internal/interface/grpc/server.go +++ b/internal/interface/grpc/server.go @@ -36,33 +36,46 @@ type Server struct { log logrus.FieldLogger } -// NewServer creates a new Server that serves both gRPC and HTTP. svc may be -// nil when the banco plugin is disabled. +// Option configures the services exposed by Server. +type Option func(*Server) + +// NewServer creates a new Server that serves both gRPC and HTTP. func NewServer( - svc *application.TakerService, grpcPort, httpPort int, log logrus.FieldLogger, + opts ...Option, ) *Server { s := &Server{ grpcPort: grpcPort, httpPort: httpPort, log: log, } - if svc != nil { - s.handler = newHandler(svc) - s.bancoSvc = svc + for _, opt := range opts { + opt(s) } return s } -// WithPreimageService attaches the preimage handler so its RPCs are exposed -// alongside the banco handler. Pass nil to leave it disabled. -func (s *Server) WithPreimageService(svc *application.PreimageService) *Server { - if svc != nil { - s.preimageHandler = newPreimageHandler(svc) - s.preimageSvc = svc +// WithBancoService exposes the banco RPC and HTTP routes. Pass nil to leave it +// disabled. +func WithBancoService(svc *application.TakerService) Option { + return func(s *Server) { + if svc != nil { + s.handler = newHandler(svc) + s.bancoSvc = svc + } + } +} + +// WithPreimageService exposes the preimage RPC and HTTP routes. Pass nil to +// leave it disabled. +func WithPreimageService(svc *application.PreimageService) Option { + return func(s *Server) { + if svc != nil { + s.preimageHandler = newPreimageHandler(svc) + s.preimageSvc = svc + } } - return s } // Start starts both the gRPC server and the HTTP gateway. @@ -283,11 +296,11 @@ func (s *Server) pluginsHandler() http.Handler { resp := pluginsResponse{} if s.bancoSvc != nil { resp.Banco.Enabled = true - resp.Banco.Running = s.bancoSvc.Status().Running + resp.Banco.Running = true } if s.preimageSvc != nil { resp.Preimage.Enabled = true - resp.Preimage.Running = s.preimageSvc.Status().Running + resp.Preimage.Running = true } jsonResponse(w, resp) }) diff --git a/pkg/banco/README.md b/pkg/banco/README.md index 763144c..582c188 100644 --- a/pkg/banco/README.md +++ b/pkg/banco/README.md @@ -68,12 +68,12 @@ plugin := banco.NewPlugin(banco.Config{ Listener: tradeListener,// persists FulfillmentEvent → trades table Log: log, }) -s := solver.New(plugin).WithLogger(log) +plugins = append(plugins, plugin) ``` The application-level service (`internal/core/application/swap_service.go`) -owns the run loop and exposes pair/trade CRUD; `pkg/banco` only owns -match/validate/solve. +exposes pair/trade CRUD; `cmd/solverd` wires the plugin into the shared solver +runtime. `pkg/banco` only owns match/validate/solve. ## Files quick-reference diff --git a/pkg/preimage/README.md b/pkg/preimage/README.md index 0122fec..3cb7fd6 100644 --- a/pkg/preimage/README.md +++ b/pkg/preimage/README.md @@ -67,13 +67,13 @@ plugin, err := preimage.NewPlugin(ctx, preimage.Config{ Network: configData.Network, Log: log, }) -s := solver.New(plugin).WithLogger(log) +plugins = append(plugins, plugin) ``` The application-level service (`internal/core/application/preimage_service.go`) -fetches the emulator/server pubkeys at startup, owns the run loop, -and exposes `SolverPubKey()` / `EmulatorPubKey()` so makers can encrypt -preimages against the right keys. +exposes `SolverPubKey()` / `EmulatorPubKey()` so makers can encrypt preimages +against the right keys. `cmd/solverd` fetches ark/emulator metadata, constructs +the plugin, and wires it into the shared solver runtime. ## Files quick-reference diff --git a/pkg/solver/README.md b/pkg/solver/README.md index f362e37..c3317b2 100644 --- a/pkg/solver/README.md +++ b/pkg/solver/README.md @@ -7,7 +7,7 @@ A solver bot subscribes to arkd's transaction stream and reacts to txs that match a protocol it cares about. This package provides: 1. A tx source (`arkdsource`) that turns the arkd gRPC stream into `<-chan *psbt.Packet`. -2. A runtime (`solver.Solver`) that fans incoming txs out to one or more plugins, recovers panics, and waits for in-flight work on shutdown. +2. A runtime (`solver.Solver`) that subscribes one or more plugins to a tx source, recovers panics, and waits for in-flight work on shutdown. 3. A typed plugin-authoring toolkit (`builder`) that hides the OP_RETURN parse and exposes a stage-based pipeline (Filter → Decode → Validate → Solve) for protocol-specific code. 4. A predicate library (`txmatch`) for cheap structural filters over `*psbt.Packet`. @@ -51,24 +51,25 @@ type Solver struct { /* ... */ } func New(plugins ...Plugin) *Solver func (s *Solver) WithLogger(log logrus.FieldLogger) *Solver -func (s *Solver) Run(ctx context.Context, txs <-chan *psbt.Packet) error +func (s *Solver) Run(ctx context.Context, source Source) error ``` `Run` semantics: -- Reads txs sequentially. -- For every tx, calls each plugin's `Match` in registration order. +- Subscribes each plugin with its own filter. +- For every tx from a plugin subscription, calls that plugin's `Match`. - Every accepted tx spawns `go p.Solve(ctx, intent)` — Solves run concurrently, but Match calls are sequential. - Panics in `Match` and `Solve` are recovered and reported via the configured logger (Error level with stack). The solver does NOT crash on plugin panics. - `Run` only returns after all in-flight Solve goroutines finish. Two exit conditions: - - `txs` channel closes → returns `nil` (after drain) + - all plugin streams close → returns `solver.ErrAllStreamsClosed` (after drain) - `ctx` cancels → returns `ctx.Err()` (after drain) - `WithLogger(nil)` disables panic logging entirely (panics still recovered, just silent). Default is `logrus.StandardLogger()`. -### `arkdsource.Subscribe` +### `arkdsource.Source` ```go -func Subscribe(ctx context.Context, c arksdk.ArkClient, log logrus.FieldLogger) <-chan *psbt.Packet +src := arkdsource.New(arkClient, log) +txs, err := src.Subscribe(ctx, filter) ``` Returns a channel that closes when: @@ -238,14 +239,16 @@ Reference shape — every solver bot looks like this: arkClient := /* arksdk client */ emulator := /* emulator client */ -plugin := banco.NewPlugin(banco.Config{ ... }) // returns solver.Plugin -s := solver.New(plugin).WithLogger(log) +plugin1 := banco.NewPlugin(banco.Config{ ... }) // returns solver.Plugin +plugin2, err := preimage.NewPlugin(ctx, preimage.Config{ ... }) +if err != nil { /* handle */ } +s := solver.New(plugin1, plugin2).WithLogger(log) ctx, cancel := context.WithCancel(...) -txs := arkdsource.Subscribe(ctx, arkClient, log) // <-chan *psbt.Packet +src := arkdsource.New(arkClient, log) go func() { - if err := s.Run(ctx, txs); err != nil && !errors.Is(err, context.Canceled) { + if err := s.Run(ctx, src); err != nil && !errors.Is(err, context.Canceled) { log.WithError(err).Error("solver run exited") } }() @@ -253,13 +256,13 @@ go func() { // later: cancel() and wait. Run drains in-flight Solves before returning. ``` -`solver.Solver` accepts multiple plugins (`solver.New(p1, p2, p3)`); every plugin sees every tx, in registration order. +`solver.Solver` accepts multiple plugins (`solver.New(p1, p2, p3)`); each plugin gets its own source subscription, so source-side filters can reduce traffic before `Match` runs. --- ## Invariants the runtime guarantees -1. **`Match` is called sequentially** for each tx, in plugin-registration order. You can rely on Match not being called concurrently for the same plugin. +1. **`Match` is called sequentially per plugin subscription.** You can rely on Match not being called concurrently for the same plugin. 2. **`Solve` is called concurrently** with other Solves (potentially across plugins). If your Solve mutates shared state, you own the synchronization. 3. **Panic recovery is unconditional** — `Match` or `Solve` panicking will not crash the bot. The panic is logged with stack and the tx is dropped. 4. **Graceful shutdown drains in-flight Solves** — `Solver.Run` does not return until every Solve goroutine it dispatched has completed. Cancel ctx and wait. @@ -291,7 +294,7 @@ The `builder` pipeline is a degenerate case of **Railway-Oriented Programming** ## Files quick-reference - `solver.go` — `Plugin`, `Solver`, `Run`, panic recovery, graceful shutdown. -- `arkdsource/arkdsource.go` — `Subscribe(ctx, ArkClient, log) <-chan *psbt.Packet`. +- `arkdsource/arkdsource.go` — `New(ArkClient, log)` source with `Subscribe(ctx, filter)`. - `txmatch/txmatch.go` — `Predicate` + `All/Any/Not` + `HasOutput/HasInput/Has*` + functional options. - `builder/builder.go` — `Builder[T]`, stages, `ErrSkip`, `Build()`. - `builder/extension.go` — `ForExtension[T]`, `HasArkExtension()`. diff --git a/pkg/solver/solver.go b/pkg/solver/solver.go index bd557ea..4b5dda4 100644 --- a/pkg/solver/solver.go +++ b/pkg/solver/solver.go @@ -2,6 +2,7 @@ package solver import ( "context" + "errors" "runtime/debug" "sync" @@ -9,6 +10,10 @@ import ( "github.com/sirupsen/logrus" ) +// ErrAllStreamsClosed reports that every plugin stream ended before the +// runtime context was canceled. +var ErrAllStreamsClosed = errors.New("solver: all plugin streams closed") + // Plugin is the protocol-specific contract a Solver runs. type Plugin interface { // Filter returns the CEL expression applied server-side to the upstream @@ -61,8 +66,8 @@ func (s *Solver) WithLogger(log logrus.FieldLogger) *Solver { // one buggy plugin can't crash the bot. // // - returns ctx.Err() when ctx is canceled (after in-flight Solves drain) -// - returns nil when every per-plugin stream has closed (or no plugin -// subscribed successfully) +// - returns ErrAllStreamsClosed when every per-plugin stream has closed +// (or no plugin subscribed successfully) // // A Subscribe error for a single plugin is logged and that plugin is // skipped; the rest continue. @@ -92,7 +97,10 @@ func (s *Solver) Run(ctx context.Context, source Source) error { <-done return ctx.Err() case <-done: - return nil + if err := ctx.Err(); err != nil { + return err + } + return ErrAllStreamsClosed } } diff --git a/pkg/solver/solver_test.go b/pkg/solver/solver_test.go index ee9acc5..817d0f1 100644 --- a/pkg/solver/solver_test.go +++ b/pkg/solver/solver_test.go @@ -134,7 +134,7 @@ func singleSource(ch chan *psbt.Packet) Source { return newFakeSource().withFallback(ch) } -func TestRun_ReturnsNilOnChannelClose(t *testing.T) { +func TestRun_ReturnsErrAllStreamsClosedOnChannelClose(t *testing.T) { plugin := &fakePlugin{} s := New(plugin) ctx := context.Background() @@ -148,7 +148,7 @@ func TestRun_ReturnsNilOnChannelClose(t *testing.T) { case <-time.After(time.Second): t.Fatal("Run did not return within 1s of channel close") } - require.NoError(t, *errp) + require.ErrorIs(t, *errp, ErrAllStreamsClosed) } func TestRun_ReturnsCtxErrOnCancel(t *testing.T) { @@ -181,7 +181,7 @@ func TestRun_MatchOkFalseDoesNotCallSolve(t *testing.T) { done, errp := runEngine(t, s, ctx, singleSource(ch)) <-done - require.NoError(t, *errp) + require.ErrorIs(t, *errp, ErrAllStreamsClosed) require.Empty(t, plugin.solved) } @@ -200,7 +200,7 @@ func TestRun_MatchOkTrueCallsSolve(t *testing.T) { done, errp := runEngine(t, s, ctx, singleSource(ch)) <-done plugin.waitSolves(t) - require.NoError(t, *errp) + require.ErrorIs(t, *errp, ErrAllStreamsClosed) require.Equal(t, []any{"intent-1"}, plugin.solved) } @@ -216,7 +216,7 @@ func TestRun_NilTxIsSkipped(t *testing.T) { done, errp := runEngine(t, s, ctx, singleSource(ch)) <-done - require.NoError(t, *errp) + require.ErrorIs(t, *errp, ErrAllStreamsClosed) // Only the non-nil tx should have been Matched. require.Equal(t, 1, plugin.matched) } @@ -255,7 +255,7 @@ func TestRun_PerPluginFilteredSubscription(t *testing.T) { <-done p1.waitSolves(t) p2.waitSolves(t) - require.NoError(t, *errp) + require.ErrorIs(t, *errp, ErrAllStreamsClosed) require.ElementsMatch(t, []string{"tag == 'a'", "tag == 'b'"}, src.seenFilters()) // Each plugin saw exactly its own tx, not the other's. require.Equal(t, 1, p1.matched) @@ -289,7 +289,7 @@ func TestRun_SubscribeErrorSkipsPlugin(t *testing.T) { done, errp := runEngine(t, s, ctx, src) <-done working.waitSolves(t) - require.NoError(t, *errp) + require.ErrorIs(t, *errp, ErrAllStreamsClosed) require.Equal(t, 0, failing.matched) require.Equal(t, []any{"ok"}, working.solved) } @@ -331,7 +331,7 @@ func TestRun_WaitsForInflightSolvesOnChannelClose(t *testing.T) { t.Fatal("Run did not return within 1s after Solve unblocked") } plugin.waitSolves(t) - require.NoError(t, *errp) + require.ErrorIs(t, *errp, ErrAllStreamsClosed) require.Equal(t, int32(1), atomic.LoadInt32(&solveDone)) } @@ -394,7 +394,7 @@ func TestRun_RecoversMatchPanic(t *testing.T) { case <-time.After(time.Second): t.Fatal("Run did not return within 1s after Match panic") } - require.NoError(t, *errp) + require.ErrorIs(t, *errp, ErrAllStreamsClosed) require.Empty(t, plugin.solved) } @@ -420,5 +420,5 @@ func TestRun_RecoversSolvePanic(t *testing.T) { t.Fatal("Run did not return within 1s after Solve panic") } plugin.waitSolves(t) - require.NoError(t, *errp) + require.ErrorIs(t, *errp, ErrAllStreamsClosed) } diff --git a/test/e2e/main_test.go b/test/e2e/main_test.go index 331efd2..538e2c8 100644 --- a/test/e2e/main_test.go +++ b/test/e2e/main_test.go @@ -2,6 +2,8 @@ package e2e_test import ( "context" + "encoding/hex" + "errors" "fmt" "os" "regexp" @@ -23,7 +25,9 @@ import ( sqlitedb "github.com/arkade-os/solver/internal/infrastructure/db/sqlite" grpcservice "github.com/arkade-os/solver/internal/interface/grpc" "github.com/arkade-os/solver/pkg/banco" + "github.com/arkade-os/solver/pkg/preimage" "github.com/arkade-os/solver/pkg/solver" + "github.com/arkade-os/solver/pkg/solver/arkdsource" ) const ( @@ -117,19 +121,19 @@ func runTests(m *testing.M) int { } emulatorClient := emulatorclient.NewGRPCClient(emulatorConn) - // Build solver - plugin := banco.NewPlugin(banco.Config{ + var plugins []solver.Plugin + + // Build banco plugin + bancoPlugin := banco.NewPlugin(banco.Config{ SolverClient: takerClient, Emulator: emulatorClient, PairsRepository: pairRepo, PriceFeed: &mockPriceFeed{}, Log: log.StandardLogger(), }) - s := solver.New(plugin) + plugins = append(plugins, bancoPlugin) - takerSvc = application.NewTakerService(s, pairRepo, tradeRepo, takerClient, takerClient.Indexer(), log.StandardLogger()) - takerSvc.Start() - defer takerSvc.Stop() + takerSvc = application.NewTakerService(pairRepo, tradeRepo, takerClient, takerClient.Indexer(), log.StandardLogger()) // Preimage service: stateless — the solver privkey is generated fresh for // the test and the preimage plugin recovers credentials from the tx stream @@ -139,27 +143,80 @@ func runTests(m *testing.M) int { log.Errorf("failed to generate preimage privkey: %s", err) return 1 } - preimageSvc, err = application.NewPreimageService(ctx, application.PreimageServiceConfig{ - ArkClient: takerClient, - Emulator: emulatorClient, - SolverPrivKey: preimagePriv, - Log: log.StandardLogger(), + info, err := emulatorClient.GetInfo(ctx) + if err != nil { + log.Errorf("failed to get emulator info: %s", err) + return 1 + } + rawIntro, err := hex.DecodeString(info.SignerPublicKey) + if err != nil { + log.Errorf("failed to decode emulator pubkey: %s", err) + return 1 + } + emulatorPub, err := btcec.ParsePubKey(rawIntro) + if err != nil { + log.Errorf("failed to parse emulator pubkey: %s", err) + return 1 + } + configData, err := takerClient.GetConfigData(ctx) + if err != nil { + log.Errorf("failed to get ark config: %s", err) + return 1 + } + checkpointBytes, err := hex.DecodeString(configData.CheckpointTapscript) + if err != nil { + log.Errorf("failed to decode checkpoint tapscript: %s", err) + return 1 + } + preimageSvc, err = application.NewPreimageService(application.PreimageServiceConfig{ + SolverPrivKey: preimagePriv, + EmulatorPubKey: emulatorPub, + Log: log.StandardLogger(), }) if err != nil { log.Errorf("failed to create preimage service: %s", err) return 1 } - if err := preimageSvc.Start(); err != nil { - log.Errorf("failed to start preimage service: %s", err) + preimagePlugin, err := preimage.NewPlugin(ctx, preimage.Config{ + ArkClient: takerClient, + Emulator: emulatorClient, + SolverPrivKey: preimagePriv, + EmulatorPubKey: emulatorPub, + ServerPubKey: configData.SignerPubKey, + CheckpointTapscript: checkpointBytes, + Network: configData.Network, + Log: log.StandardLogger(), + }) + if err != nil { + log.Errorf("failed to build preimage plugin: %s", err) return 1 } - defer preimageSvc.Stop() + plugins = append(plugins, preimagePlugin) + + solverCtx, solverCancel := context.WithCancel(ctx) + solverDone := make(chan error, 1) + s := solver.New(plugins...).WithLogger(log.StandardLogger()) + src := arkdsource.New(takerClient.Client(), log.StandardLogger()) + go func() { + solverDone <- s.Run(solverCtx, src) + }() + defer func() { + solverCancel() + if err := <-solverDone; err != nil && !errors.Is(err, context.Canceled) { + log.Errorf("solver exited during shutdown: %s", err) + } + }() // Start the real gRPC + HTTP gateway server hosting both takerSvc and // preimageSvc. e2e tests dial this server as a real client would, rather // than calling application services directly. - srv := grpcservice.NewServer(takerSvc, e2eGRPCPort, e2eHTTPPort, log.StandardLogger()). - WithPreimageService(preimageSvc) + srv := grpcservice.NewServer( + e2eGRPCPort, + e2eHTTPPort, + log.StandardLogger(), + grpcservice.WithBancoService(takerSvc), + grpcservice.WithPreimageService(preimageSvc), + ) if err := srv.Start(); err != nil { log.Errorf("failed to start grpc server: %s", err) return 1 From f616bcdcb518c8119d0e429edb1faa9f58b4bcc9 Mon Sep 17 00:00:00 2001 From: Chris Ricketts Date: Thu, 28 May 2026 16:27:57 +0100 Subject: [PATCH 2/2] refactor: extract solverd runtime to internal package for reuse in e2e test setup --- cmd/solverd/main.go | 252 ++----------------------------- internal/solverd/runtime.go | 290 ++++++++++++++++++++++++++++++++++++ test/e2e/banco_test.go | 106 +++++++------ test/e2e/main_test.go | 250 +++++++++++-------------------- 4 files changed, 450 insertions(+), 448 deletions(-) create mode 100644 internal/solverd/runtime.go diff --git a/cmd/solverd/main.go b/cmd/solverd/main.go index b88fbbc..175b310 100644 --- a/cmd/solverd/main.go +++ b/cmd/solverd/main.go @@ -2,35 +2,14 @@ package main import ( "context" - "crypto/hmac" - "crypto/sha256" - "database/sql" - "encoding/hex" - "errors" - "fmt" "os" "os/signal" "syscall" - arkdclient "github.com/arkade-os/arkd/pkg/client-lib" - singlekey "github.com/arkade-os/arkd/pkg/client-lib/identity/singlekey" - singlekeyfilestore "github.com/arkade-os/arkd/pkg/client-lib/identity/singlekey/store/file" - emulatorclient "github.com/arkade-os/emulator/pkg/client" - arksdk "github.com/arkade-os/go-sdk" - "github.com/btcsuite/btcd/btcec/v2" "github.com/sirupsen/logrus" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "github.com/arkade-os/solver/internal/config" - "github.com/arkade-os/solver/internal/core/application" - sqlitedb "github.com/arkade-os/solver/internal/infrastructure/db/sqlite" - "github.com/arkade-os/solver/internal/infrastructure/pricefeed" - grpcservice "github.com/arkade-os/solver/internal/interface/grpc" - "github.com/arkade-os/solver/pkg/banco" - "github.com/arkade-os/solver/pkg/preimage" - "github.com/arkade-os/solver/pkg/solver" - "github.com/arkade-os/solver/pkg/solver/arkdsource" + "github.com/arkade-os/solver/internal/solverd" ) // Version is injected at build time via -ldflags "-X main.Version=". @@ -49,234 +28,27 @@ func main() { func run(log *logrus.Logger) error { cfg, err := config.LoadConfig() if err != nil { - return fmt.Errorf("failed to load config: %w", err) + return err } - log.SetLevel(logrus.Level(cfg.LogLevel)) - - if err := os.MkdirAll(cfg.Datadir, 0750); err != nil { - return fmt.Errorf("failed to create datadir: %w", err) - } - - emulatorConn, err := grpc.NewClient( - cfg.EmulatorURL, - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - return fmt.Errorf("failed to connect to emulator: %w", err) - } - // nolint:errcheck - defer emulatorConn.Close() - emulator := emulatorclient.NewGRPCClient(emulatorConn) - - ctx := context.Background() - arkClient, err := setupWallet(ctx, cfg) - if err != nil { - return fmt.Errorf("failed to setup wallet: %w", err) - } - defer arkClient.Stop() - - plugins := make([]solver.Plugin, 0) - serverOpts := make([]grpcservice.Option, 0) - - if cfg.BancoEnabled { - db, err := sqlitedb.OpenDB(cfg.Datadir) - if err != nil { - return fmt.Errorf("failed to open database: %w", err) - } - // nolint:errcheck - defer db.Close() - - svc, plugin, err := setupBanco(db, arkClient, emulator, log) - if err != nil { - return fmt.Errorf("failed to setup banco: %w", err) - } - plugins = append(plugins, plugin) - serverOpts = append(serverOpts, grpcservice.WithBancoService(svc)) - - log.Info("banco plugin enabled") - } - - if cfg.PreimageEnabled { - svc, plugin, err := setupPreimage(ctx, cfg, arkClient, emulator, log) - if err != nil { - return fmt.Errorf("failed to setup preimage: %w", err) - } - plugins = append(plugins, plugin) - serverOpts = append(serverOpts, grpcservice.WithPreimageService(svc)) - log.Info("preimage plugin enabled") - } - - // One gRPC + HTTP server hosts whichever services are enabled. - if len(serverOpts) > 0 { - srv := grpcservice.NewServer(cfg.GRPCPort, cfg.HTTPPort, log, serverOpts...) - if err := srv.Start(); err != nil { - return fmt.Errorf("failed to start server: %w", err) - } - defer srv.Stop() - } - - solverCtx, cancelSolver := context.WithCancel(ctx) - defer cancelSolver() - - solverDone := make(chan error, 1) - s := solver.New(plugins...).WithLogger(log) - src := arkdsource.New(arkClient.Client(), log) - go func() { - solverDone <- s.Run(solverCtx, src) - }() + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() log.WithField("version", Version). WithField("banco", cfg.BancoEnabled). WithField("preimage", cfg.PreimageEnabled). - Info("solverd started") + Info("starting solverd") - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - - select { - case <-sigCh: - log.Info("shutting down...") - cancelSolver() - signal.Stop(sigCh) - <-solverDone - case err := <-solverDone: - if !errors.Is(err, context.Canceled) { - return fmt.Errorf("solver runtime exited unexpectedly: %w", err) - } - } - - return nil -} - -func setupWallet(ctx context.Context, cfg *config.Config) (arksdk.Wallet, error) { - identityStore, err := singlekeyfilestore.NewStore(cfg.Datadir) - if err != nil { - return nil, fmt.Errorf("init identity store: %w", err) - } - singleKeyIdentity, err := singlekey.NewIdentity(identityStore) + wallet, err := solverd.SetupWallet(ctx, cfg) if err != nil { - return nil, fmt.Errorf("init single-key identity: %w", err) + return err } + defer wallet.Stop() - walletOpts := []arksdk.WalletOption{arksdk.WithIdentity(singleKeyIdentity)} - arkClient, err := arksdk.LoadWallet(cfg.Datadir, walletOpts...) - if err != nil { - // Fresh datadir surfaces as either go-sdk's or client-lib's - // ErrNotInitialized depending on which layer first noticed the - // missing config — both are valid "no wallet yet" signals. - if !errors.Is(err, arksdk.ErrNotInitialized) && - !errors.Is(err, arkdclient.ErrNotInitialized) { - return nil, fmt.Errorf("load ark client: %w", err) - } - - arkClient, err = arksdk.NewWallet(cfg.Datadir, walletOpts...) - if err != nil { - return nil, fmt.Errorf("create ark client: %w", err) - } - if err := arkClient.Init(ctx, cfg.ArkURL, cfg.WalletSeed, cfg.WalletPassword); err != nil { - return nil, fmt.Errorf("init ark client: %w", err) - } - } - if err := arkClient.Unlock(ctx, cfg.WalletPassword); err != nil { - return nil, fmt.Errorf("unlock ark client: %w", err) + if err := solverd.Run(ctx, cfg, log, wallet); err != nil { + return err } + log.Info("solverd stopped") - return arkClient, nil -} - -func setupBanco( - db *sql.DB, - arkClient arksdk.Wallet, - emulator emulatorclient.TransportClient, - log logrus.FieldLogger, -) (*application.TakerService, solver.Plugin, error) { - pairRepo := sqlitedb.NewPairRepository(db) - tradeRepo := sqlitedb.NewTradeRepository(db) - priceFeed := pricefeed.NewCoinGecko() - tradeListener := application.NewTradeListener(tradeRepo, log) - - plugin := banco.NewPlugin(banco.Config{ - SolverClient: arkClient, - Emulator: emulator, - PairsRepository: pairRepo, - PriceFeed: priceFeed, - Listener: tradeListener, - Log: log, - }) - takerSvc := application.NewTakerService(pairRepo, tradeRepo, arkClient, arkClient.Indexer(), log) - - return takerSvc, plugin, nil -} - -func setupPreimage( - ctx context.Context, - cfg *config.Config, - arkClient arksdk.Wallet, - emulator emulatorclient.TransportClient, - log logrus.FieldLogger, -) (*application.PreimageService, solver.Plugin, error) { - solverPriv, err := deriveSolverPrivKey(cfg.WalletSeed) - if err != nil { - return nil, nil, fmt.Errorf("derive preimage solver privkey: %w", err) - } - info, err := emulator.GetInfo(ctx) - if err != nil { - return nil, nil, fmt.Errorf("get emulator info: %w", err) - } - rawEmulatorPub, err := hex.DecodeString(info.SignerPublicKey) - if err != nil { - return nil, nil, fmt.Errorf("decode emulator pubkey: %w", err) - } - emulatorPub, err := btcec.ParsePubKey(rawEmulatorPub) - if err != nil { - return nil, nil, fmt.Errorf("parse emulator pubkey: %w", err) - } - configData, err := arkClient.GetConfigData(ctx) - if err != nil { - return nil, nil, fmt.Errorf("get ark config: %w", err) - } - checkpointBytes, err := hex.DecodeString(configData.CheckpointTapscript) - if err != nil { - return nil, nil, fmt.Errorf("decode checkpoint tapscript: %w", err) - } - preimageSvc, err := application.NewPreimageService(application.PreimageServiceConfig{ - SolverPrivKey: solverPriv, - EmulatorPubKey: emulatorPub, - Log: log, - }) - if err != nil { - return nil, nil, fmt.Errorf("create preimage service: %w", err) - } - plugin, err := preimage.NewPlugin(ctx, preimage.Config{ - ArkClient: arkClient, - Emulator: emulator, - SolverPrivKey: solverPriv, - EmulatorPubKey: emulatorPub, - ServerPubKey: configData.SignerPubKey, - CheckpointTapscript: checkpointBytes, - Network: configData.Network, - Log: log, - }) - if err != nil { - return nil, nil, fmt.Errorf("build preimage plugin: %w", err) - } - - return preimageSvc, plugin, nil -} - -const preimageKeyDomain = "solverd/preimage-plugin/v1" - -// deriveSolverPrivKey derives the preimage-plugin's encryption privkey from -// the wallet seed via HMAC-SHA256(seed, domain). Stable across restarts as -// long as the seed is unchanged. -func deriveSolverPrivKey(seedHex string) (*btcec.PrivateKey, error) { - seed, err := hex.DecodeString(seedHex) - if err != nil { - return nil, fmt.Errorf("decode wallet seed: %w", err) - } - mac := hmac.New(sha256.New, seed) - mac.Write([]byte(preimageKeyDomain)) - priv, _ := btcec.PrivKeyFromBytes(mac.Sum(nil)) - return priv, nil + return nil } diff --git a/internal/solverd/runtime.go b/internal/solverd/runtime.go new file mode 100644 index 0000000..2bbf615 --- /dev/null +++ b/internal/solverd/runtime.go @@ -0,0 +1,290 @@ +package solverd + +import ( + "context" + "crypto/hmac" + "crypto/sha256" + "database/sql" + "encoding/hex" + "errors" + "fmt" + "os" + + arkdclient "github.com/arkade-os/arkd/pkg/client-lib" + singlekey "github.com/arkade-os/arkd/pkg/client-lib/identity/singlekey" + singlekeyfilestore "github.com/arkade-os/arkd/pkg/client-lib/identity/singlekey/store/file" + emulatorclient "github.com/arkade-os/emulator/pkg/client" + arksdk "github.com/arkade-os/go-sdk" + "github.com/btcsuite/btcd/btcec/v2" + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/arkade-os/solver/internal/config" + "github.com/arkade-os/solver/internal/core/application" + sqlitedb "github.com/arkade-os/solver/internal/infrastructure/db/sqlite" + "github.com/arkade-os/solver/internal/infrastructure/pricefeed" + grpcservice "github.com/arkade-os/solver/internal/interface/grpc" + "github.com/arkade-os/solver/pkg/banco" + "github.com/arkade-os/solver/pkg/preimage" + "github.com/arkade-os/solver/pkg/solver" + "github.com/arkade-os/solver/pkg/solver/arkdsource" +) + +type options struct { + bancoPriceFeed banco.PriceFeed +} + +// Option customizes the solverd runtime wiring. +type Option func(*options) + +// WithBancoPriceFeed overrides banco's default production price feed. +func WithBancoPriceFeed(feed banco.PriceFeed) Option { + return func(o *options) { + o.bancoPriceFeed = feed + } +} + +// Run wires solverd from config and blocks until ctx is canceled or the solver +// exits unexpectedly. +func Run(ctx context.Context, cfg *config.Config, log *logrus.Logger, wallet arksdk.Wallet, opts ...Option) error { + if cfg == nil { + return fmt.Errorf("config must not be nil") + } + if wallet == nil { + return fmt.Errorf("wallet must not be nil") + } + + if log == nil { + log = logrus.StandardLogger() + } + + log.SetLevel(logrus.Level(cfg.LogLevel)) + + if cfg.PreimageEnabled && cfg.WalletSeed == "" { + return fmt.Errorf("wallet seed is required when preimage plugin is enabled") + } + + var runtimeOpts options + for _, opt := range opts { + opt(&runtimeOpts) + } + + if err := os.MkdirAll(cfg.Datadir, 0750); err != nil { + return fmt.Errorf("failed to create datadir: %w", err) + } + + emulatorConn, err := grpc.NewClient( + cfg.EmulatorURL, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return fmt.Errorf("failed to connect to emulator: %w", err) + } + // nolint:errcheck + defer emulatorConn.Close() + + emulator := emulatorclient.NewGRPCClient(emulatorConn) + + plugins := make([]solver.Plugin, 0) + serverOpts := make([]grpcservice.Option, 0) + + if cfg.BancoEnabled { + db, err := sqlitedb.OpenDB(cfg.Datadir) + if err != nil { + return fmt.Errorf("failed to open database: %w", err) + } + // nolint:errcheck + defer db.Close() + + svc, plugin, err := setupBanco(db, wallet, emulator, runtimeOpts.bancoPriceFeed, log) + if err != nil { + return fmt.Errorf("failed to setup banco: %w", err) + } + plugins = append(plugins, plugin) + serverOpts = append(serverOpts, grpcservice.WithBancoService(svc)) + + log.Info("banco plugin enabled") + } + + if cfg.PreimageEnabled { + svc, plugin, err := setupPreimage(ctx, cfg, wallet, emulator, log) + if err != nil { + return fmt.Errorf("failed to setup preimage: %w", err) + } + plugins = append(plugins, plugin) + serverOpts = append(serverOpts, grpcservice.WithPreimageService(svc)) + log.Info("preimage plugin enabled") + } + + if len(plugins) == 0 { + return fmt.Errorf("at least one plugin must be enabled") + } + + // One gRPC + HTTP server hosts whichever services are enabled. + if len(serverOpts) > 0 { + srv := grpcservice.NewServer(cfg.GRPCPort, cfg.HTTPPort, log, serverOpts...) + if err := srv.Start(); err != nil { + return fmt.Errorf("failed to start server: %w", err) + } + defer srv.Stop() + } + + runtimeCtx, cancel := context.WithCancel(ctx) + defer cancel() + + done := make(chan error, 1) + s := solver.New(plugins...).WithLogger(log) + src := arkdsource.New(wallet.Client(), log) + go func() { + done <- s.Run(runtimeCtx, src) + }() + + select { + case <-ctx.Done(): + cancel() + <-done + case err := <-done: + if err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("solver runtime exited unexpectedly: %w", err) + } + } + + return nil +} + +// SetupWallet loads or initializes and unlocks the solverd wallet. +func SetupWallet(ctx context.Context, cfg *config.Config, extraOpts ...arksdk.WalletOption) (arksdk.Wallet, error) { + identityStore, err := singlekeyfilestore.NewStore(cfg.Datadir) + if err != nil { + return nil, fmt.Errorf("init identity store: %w", err) + } + singleKeyIdentity, err := singlekey.NewIdentity(identityStore) + if err != nil { + return nil, fmt.Errorf("init single-key identity: %w", err) + } + + walletOpts := append([]arksdk.WalletOption{arksdk.WithIdentity(singleKeyIdentity)}, extraOpts...) + arkClient, err := arksdk.LoadWallet(cfg.Datadir, walletOpts...) + if err != nil { + // Fresh datadir surfaces as either go-sdk's or client-lib's + // ErrNotInitialized depending on which layer first noticed the + // missing config — both are valid "no wallet yet" signals. + if !errors.Is(err, arksdk.ErrNotInitialized) && + !errors.Is(err, arkdclient.ErrNotInitialized) { + return nil, fmt.Errorf("load ark client: %w", err) + } + + arkClient, err = arksdk.NewWallet(cfg.Datadir, walletOpts...) + if err != nil { + return nil, fmt.Errorf("create ark client: %w", err) + } + if err := arkClient.Init(ctx, cfg.ArkURL, cfg.WalletSeed, cfg.WalletPassword); err != nil { + return nil, fmt.Errorf("init ark client: %w", err) + } + } + if err := arkClient.Unlock(ctx, cfg.WalletPassword); err != nil { + return nil, fmt.Errorf("unlock ark client: %w", err) + } + + return arkClient, nil +} + +func setupBanco( + db *sql.DB, + arkClient arksdk.Wallet, + emulator emulatorclient.TransportClient, + feed banco.PriceFeed, + log logrus.FieldLogger, +) (*application.TakerService, solver.Plugin, error) { + pairRepo := sqlitedb.NewPairRepository(db) + tradeRepo := sqlitedb.NewTradeRepository(db) + if feed == nil { + feed = pricefeed.NewCoinGecko() + } + tradeListener := application.NewTradeListener(tradeRepo, log) + + plugin := banco.NewPlugin(banco.Config{ + SolverClient: arkClient, + Emulator: emulator, + PairsRepository: pairRepo, + PriceFeed: feed, + Listener: tradeListener, + Log: log, + }) + takerSvc := application.NewTakerService(pairRepo, tradeRepo, arkClient, arkClient.Indexer(), log) + + return takerSvc, plugin, nil +} + +func setupPreimage( + ctx context.Context, + cfg *config.Config, + arkClient arksdk.Wallet, + emulator emulatorclient.TransportClient, + log logrus.FieldLogger, +) (*application.PreimageService, solver.Plugin, error) { + solverPriv, err := deriveSolverPrivKey(cfg.WalletSeed) + if err != nil { + return nil, nil, fmt.Errorf("derive preimage solver privkey: %w", err) + } + info, err := emulator.GetInfo(ctx) + if err != nil { + return nil, nil, fmt.Errorf("get emulator info: %w", err) + } + rawEmulatorPub, err := hex.DecodeString(info.SignerPublicKey) + if err != nil { + return nil, nil, fmt.Errorf("decode emulator pubkey: %w", err) + } + emulatorPub, err := btcec.ParsePubKey(rawEmulatorPub) + if err != nil { + return nil, nil, fmt.Errorf("parse emulator pubkey: %w", err) + } + configData, err := arkClient.GetConfigData(ctx) + if err != nil { + return nil, nil, fmt.Errorf("get ark config: %w", err) + } + checkpointBytes, err := hex.DecodeString(configData.CheckpointTapscript) + if err != nil { + return nil, nil, fmt.Errorf("decode checkpoint tapscript: %w", err) + } + preimageSvc, err := application.NewPreimageService(application.PreimageServiceConfig{ + SolverPrivKey: solverPriv, + EmulatorPubKey: emulatorPub, + Log: log, + }) + if err != nil { + return nil, nil, fmt.Errorf("create preimage service: %w", err) + } + plugin, err := preimage.NewPlugin(ctx, preimage.Config{ + ArkClient: arkClient, + Emulator: emulator, + SolverPrivKey: solverPriv, + EmulatorPubKey: emulatorPub, + ServerPubKey: configData.SignerPubKey, + CheckpointTapscript: checkpointBytes, + Network: configData.Network, + Log: log, + }) + if err != nil { + return nil, nil, fmt.Errorf("build preimage plugin: %w", err) + } + + return preimageSvc, plugin, nil +} + +const preimageKeyDomain = "solverd/preimage-plugin/v1" + +// deriveSolverPrivKey derives the preimage-plugin's encryption privkey from +// the wallet seed via HMAC-SHA256(seed, domain). Stable across restarts as +// long as the seed is unchanged. +func deriveSolverPrivKey(seedHex string) (*btcec.PrivateKey, error) { + seed, err := hex.DecodeString(seedHex) + if err != nil { + return nil, fmt.Errorf("decode wallet seed: %w", err) + } + mac := hmac.New(sha256.New, seed) + mac.Write([]byte(preimageKeyDomain)) + priv, _ := btcec.PrivKeyFromBytes(mac.Sum(nil)) + return priv, nil +} diff --git a/test/e2e/banco_test.go b/test/e2e/banco_test.go index dde006d..d7f34e3 100644 --- a/test/e2e/banco_test.go +++ b/test/e2e/banco_test.go @@ -9,17 +9,21 @@ import ( clientTypes "github.com/arkade-os/arkd/pkg/client-lib/types" sdktypes "github.com/arkade-os/go-sdk/types" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + bancov1 "github.com/arkade-os/solver/api-spec/protobuf/gen/go/solverd/v1" "github.com/arkade-os/solver/pkg/banco" "github.com/arkade-os/solver/pkg/banco/contract" ) -const mockPriceFeedURL = "http://mock-price-feed" +const ( + mockAssetBTCPriceFeed = "mock://asset-btc" + mockBTCAssetPriceFeed = "mock://btc-asset" + mockAssetAssetPriceFeed = "mock://asset-asset" +) // TestBancoAssetToBTC: maker deposits asset, wants BTC. -// Mock price feed returns 1.0. With BaseDecimals=0, QuoteDecimals=0: -// -// price = depositAmount/wantAmount = 500/500 = 1.0 ✓ func TestBancoAssetToBTC(t *testing.T) { ctx := t.Context() @@ -28,21 +32,13 @@ func TestBancoAssetToBTC(t *testing.T) { faucetOffchain(t, maker, 0.0005) assetID := issueAsset(t, maker, 500) - // Configure taker pair: asset/BTC. We write directly to pairRepo - // (instead of going through takerSvc.AddPair) so we can pin - // BaseDecimals=QuoteDecimals=0; AddPair would resolve BTC's quote - // decimals to 8 and the mock price feed (1.0) would no longer match - // the 500/500 offer ratio. pair := banco.Pair{ - Pair: assetID + "/BTC", - MinAmount: 1, - MaxAmount: 100000000, - BaseDecimals: 0, - QuoteDecimals: 0, - PriceFeed: mockPriceFeedURL, + Pair: assetID + "/BTC", + MinAmount: 1, + MaxAmount: 100000000, + PriceFeed: mockAssetBTCPriceFeed, } - require.NoError(t, pairRepo.Add(ctx, pair)) - t.Cleanup(func() { _ = pairRepo.Remove(ctx, pair.Pair) }) + addPair(t, pair) // Maker creates offer: deposit asset, want 500 sats BTC. emulator := newEmulatorClient(t) @@ -72,9 +68,6 @@ func TestBancoAssetToBTC(t *testing.T) { } // TestBancoBTCToAsset: maker deposits BTC, wants asset. -// Mock price feed returns 1.0. With BaseDecimals=0, QuoteDecimals=0: -// -// price = depositAmount/wantAmount = 500/500 = 1.0 ✓ func TestBancoBTCToAsset(t *testing.T) { ctx := t.Context() @@ -84,11 +77,10 @@ func TestBancoBTCToAsset(t *testing.T) { faucetOffchain(t, tempClient, 0.001) assetID := issueAsset(t, tempClient, 1000) - takerAddr, err := takerSvc.GetAddress(ctx) - require.NoError(t, err) + takerAddr := getAddress(t) takerVtxoCh := takerClient.GetVtxoEventChannel(ctx) - _, err = tempClient.SendOffChain(ctx, []clientTypes.Receiver{{ + _, err := tempClient.SendOffChain(ctx, []clientTypes.Receiver{{ To: takerAddr.OffchainAddress, Amount: 1000, Assets: []clientTypes.Asset{{AssetId: assetID, Amount: 1000}}, @@ -97,17 +89,13 @@ func TestBancoBTCToAsset(t *testing.T) { // Wait for the taker wallet to see the incoming asset VTXO. waitForVtxoAdded(t, ctx, takerVtxoCh, 30*time.Second) - // Configure pair: BTC/asset with both decimals=0. pair := banco.Pair{ - Pair: "BTC/" + assetID, - MinAmount: 1, - MaxAmount: 100000000, - BaseDecimals: 0, - QuoteDecimals: 0, - PriceFeed: mockPriceFeedURL, + Pair: "BTC/" + assetID, + MinAmount: 1, + MaxAmount: 100000000, + PriceFeed: mockBTCAssetPriceFeed, } - require.NoError(t, pairRepo.Add(ctx, pair)) - t.Cleanup(func() { _ = pairRepo.Remove(ctx, pair.Pair) }) + addPair(t, pair) // Maker creates offer: deposit BTC, want 500 units of asset. maker := setupArkClient(t) @@ -125,8 +113,7 @@ func TestBancoBTCToAsset(t *testing.T) { // Subscribe to maker vtxo events before funding the swap. makerVtxoCh := maker.GetVtxoEventChannel(ctx) - // Fund swap address with 500 sats BTC + offer packet. Deposit must - // equal WantAmount for price=1.0 with decimals=0. + // Fund swap address with 500 sats BTC + offer packet. sendOffChainWithExtension(t, maker, clientTypes.Receiver{ To: offerResult.SwapAddress, Amount: 500, @@ -138,9 +125,6 @@ func TestBancoBTCToAsset(t *testing.T) { } // TestBancoAssetToAsset: maker deposits assetA, wants assetB. -// Mock price feed returns 1.0. With BaseDecimals=0, QuoteDecimals=0: -// -// price = depositAmount/wantAmount = 500/500 = 1.0 ✓ func TestBancoAssetToAsset(t *testing.T) { ctx := t.Context() @@ -153,11 +137,10 @@ func TestBancoAssetToAsset(t *testing.T) { faucetOffchain(t, tempClient, 0.001) assetB := issueAsset(t, tempClient, 1000) - takerAddr, err := takerSvc.GetAddress(ctx) - require.NoError(t, err) + takerAddr := getAddress(t) takerVtxoCh := takerClient.GetVtxoEventChannel(ctx) - _, err = tempClient.SendOffChain(ctx, []clientTypes.Receiver{{ + _, err := tempClient.SendOffChain(ctx, []clientTypes.Receiver{{ To: takerAddr.OffchainAddress, Amount: 1000, Assets: []clientTypes.Asset{{AssetId: assetB, Amount: 1000}}, @@ -166,15 +149,12 @@ func TestBancoAssetToAsset(t *testing.T) { waitForVtxoAdded(t, ctx, takerVtxoCh, 30*time.Second) pair := banco.Pair{ - Pair: assetA + "/" + assetB, - MinAmount: 1, - MaxAmount: 100000000, - BaseDecimals: 0, - QuoteDecimals: 0, - PriceFeed: mockPriceFeedURL, + Pair: assetA + "/" + assetB, + MinAmount: 1, + MaxAmount: 100000000, + PriceFeed: mockAssetAssetPriceFeed, } - require.NoError(t, pairRepo.Add(ctx, pair)) - t.Cleanup(func() { _ = pairRepo.Remove(ctx, pair.Pair) }) + addPair(t, pair) emulator := newEmulatorClient(t) wantAssetID, err := asset.NewAssetIdFromString(assetB) @@ -196,6 +176,36 @@ func TestBancoAssetToAsset(t *testing.T) { requireAssetFulfillment(t, ctx, makerVtxoCh, assetB, 60*time.Second) } +func dialBancoClient(t *testing.T) bancov1.BancoServiceClient { + t.Helper() + conn, err := grpc.NewClient(e2eGRPCAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + t.Cleanup(func() { _ = conn.Close() }) + return bancov1.NewBancoServiceClient(conn) +} + +func addPair(t *testing.T, pair banco.Pair) { + t.Helper() + _, err := dialBancoClient(t).AddPair(t.Context(), &bancov1.AddPairRequest{ + Pair: &bancov1.PairInfo{ + Pair: pair.Pair, + MinAmount: pair.MinAmount, + MaxAmount: pair.MaxAmount, + PriceFeed: pair.PriceFeed, + InvertPrice: pair.InvertPrice, + }, + }) + require.NoError(t, err) +} + +func getAddress(t *testing.T) *bancov1.GetAddressResponse { + t.Helper() + resp, err := dialBancoClient(t).GetAddress(t.Context(), &bancov1.GetAddressRequest{}) + require.NoError(t, err) + require.NotEmpty(t, resp.OffchainAddress) + return resp +} + // waitForVtxoAdded blocks until vtxoCh delivers a VtxosAdded event or the // timeout expires. func waitForVtxoAdded( diff --git a/test/e2e/main_test.go b/test/e2e/main_test.go index 538e2c8..a089d11 100644 --- a/test/e2e/main_test.go +++ b/test/e2e/main_test.go @@ -2,6 +2,7 @@ package e2e_test import ( "context" + "crypto/rand" "encoding/hex" "errors" "fmt" @@ -12,22 +13,15 @@ import ( "testing" "time" - emulatorclient "github.com/arkade-os/emulator/pkg/client" arksdk "github.com/arkade-os/go-sdk" sdktypes "github.com/arkade-os/go-sdk/types" - "github.com/btcsuite/btcd/btcec/v2" log "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/arkade-os/solver/internal/core/application" - "github.com/arkade-os/solver/internal/core/ports" - sqlitedb "github.com/arkade-os/solver/internal/infrastructure/db/sqlite" - grpcservice "github.com/arkade-os/solver/internal/interface/grpc" - "github.com/arkade-os/solver/pkg/banco" - "github.com/arkade-os/solver/pkg/preimage" - "github.com/arkade-os/solver/pkg/solver" - "github.com/arkade-os/solver/pkg/solver/arkdsource" + bancov1 "github.com/arkade-os/solver/api-spec/protobuf/gen/go/solverd/v1" + "github.com/arkade-os/solver/internal/config" + "github.com/arkade-os/solver/internal/solverd" ) const ( @@ -42,18 +36,18 @@ const ( // gRPC API. var e2eGRPCAddr = fmt.Sprintf("localhost:%d", e2eGRPCPort) -// mockPriceFeed always returns a fixed price of 1.0. -// This makes any offer with roughly 1:1 ratio pass the 1% margin check. -type mockPriceFeed struct{} +// mockPriceFeed returns deterministic prices keyed by the pair's feed URL. +type mockPriceFeed map[string]float64 -func (m *mockPriceFeed) Fetch(_ context.Context, _ string) (float64, error) { - return 1.0, nil +func (m mockPriceFeed) Fetch(_ context.Context, feedURL string) (float64, error) { + price, ok := m[feedURL] + if !ok { + return 0, fmt.Errorf("mock price feed missing price for %s", feedURL) + } + return price, nil } var ( - takerSvc *application.TakerService - preimageSvc *application.PreimageService - pairRepo ports.PairRepository takerClient arksdk.Wallet ) @@ -73,7 +67,6 @@ func runTests(m *testing.M) int { return 1 } - // Create taker's ArkClient takerDatadir, err := os.MkdirTemp("", "solverd-e2e-taker-*") if err != nil { log.Errorf("failed to create taker datadir: %s", err) @@ -81,177 +74,114 @@ func runTests(m *testing.M) int { } // nolint:errcheck defer os.RemoveAll(takerDatadir) - takerClient, err = setupTakerClient(ctx, takerDatadir) + walletSeed, err := randomSeedHex() if err != nil { - log.Errorf("failed to setup taker client: %s", err) + log.Errorf("failed to generate wallet seed: %s", err) return 1 } - - // Fund taker with offchain BTC - if err := fundTaker(ctx, takerClient); err != nil { - log.Errorf("failed to fund taker: %s", err) - return 1 + cfg := &config.Config{ + Datadir: takerDatadir, + ArkURL: arkdURL, + WalletSeed: walletSeed, + WalletPassword: password, + EmulatorURL: emulatorAddr, + GRPCPort: e2eGRPCPort, + HTTPPort: e2eHTTPPort, + LogLevel: int(log.DebugLevel), + BancoEnabled: true, + PreimageEnabled: true, } - - // SQLite pair repo in temp dir - tmpDir, err := os.MkdirTemp("", "solverd-e2e-*") + takerClient, err = solverd.SetupWallet(ctx, cfg, arksdk.WithoutAutoSettle()) if err != nil { - log.Errorf("failed to create temp dir: %s", err) + log.Errorf("failed to setup taker client: %s", err) return 1 } - // nolint:errcheck - defer os.RemoveAll(tmpDir) + defer takerClient.Stop() - db, err := sqlitedb.OpenDB(tmpDir) - if err != nil { - log.Errorf("failed to open db: %s", err) + runCtx, cancelSolver := context.WithCancel(ctx) + done := make(chan error, 1) + go func() { + done <- solverd.Run(runCtx, cfg, log.StandardLogger(), takerClient, + solverd.WithBancoPriceFeed(mockPriceFeed{ + mockAssetBTCPriceFeed: 100_000_000, + mockBTCAssetPriceFeed: 0.00000001, + mockAssetAssetPriceFeed: 1, + }), + ) + }() + defer func() { + cancelSolver() + if err := <-done; err != nil && !errors.Is(err, context.Canceled) { + log.Errorf("solverd exited during shutdown: %s", err) + } + }() + + if err := waitBancoReady(ctx); err != nil { + log.Errorf("failed waiting for solverd readiness: %s", err) return 1 } - // nolint:errcheck - defer db.Close() - pairRepo = sqlitedb.NewPairRepository(db) - tradeRepo := sqlitedb.NewTradeRepository(db) + if err := waitWalletSynced(ctx, takerClient); err != nil { + log.Errorf("failed to sync taker client: %s", err) + return 1 + } - // Emulator client - emulatorConn, err := grpc.NewClient(emulatorAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - log.Errorf("failed to connect to emulator: %s", err) + // Fund taker with offchain BTC. + if err := fundTaker(ctx, takerClient); err != nil { + log.Errorf("failed to fund taker: %s", err) return 1 } - emulatorClient := emulatorclient.NewGRPCClient(emulatorConn) - var plugins []solver.Plugin + return m.Run() +} - // Build banco plugin - bancoPlugin := banco.NewPlugin(banco.Config{ - SolverClient: takerClient, - Emulator: emulatorClient, - PairsRepository: pairRepo, - PriceFeed: &mockPriceFeed{}, - Log: log.StandardLogger(), - }) - plugins = append(plugins, bancoPlugin) +func randomSeedHex() (string, error) { + seed := make([]byte, 32) + if _, err := rand.Read(seed); err != nil { + return "", err + } + return hex.EncodeToString(seed), nil +} - takerSvc = application.NewTakerService(pairRepo, tradeRepo, takerClient, takerClient.Indexer(), log.StandardLogger()) +func waitBancoReady(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() - // Preimage service: stateless — the solver privkey is generated fresh for - // the test and the preimage plugin recovers credentials from the tx stream - // (no DB). - preimagePriv, err := btcec.NewPrivateKey() - if err != nil { - log.Errorf("failed to generate preimage privkey: %s", err) - return 1 - } - info, err := emulatorClient.GetInfo(ctx) - if err != nil { - log.Errorf("failed to get emulator info: %s", err) - return 1 - } - rawIntro, err := hex.DecodeString(info.SignerPublicKey) + conn, err := grpc.NewClient(e2eGRPCAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - log.Errorf("failed to decode emulator pubkey: %s", err) - return 1 - } - emulatorPub, err := btcec.ParsePubKey(rawIntro) - if err != nil { - log.Errorf("failed to parse emulator pubkey: %s", err) - return 1 - } - configData, err := takerClient.GetConfigData(ctx) - if err != nil { - log.Errorf("failed to get ark config: %s", err) - return 1 - } - checkpointBytes, err := hex.DecodeString(configData.CheckpointTapscript) - if err != nil { - log.Errorf("failed to decode checkpoint tapscript: %s", err) - return 1 - } - preimageSvc, err = application.NewPreimageService(application.PreimageServiceConfig{ - SolverPrivKey: preimagePriv, - EmulatorPubKey: emulatorPub, - Log: log.StandardLogger(), - }) - if err != nil { - log.Errorf("failed to create preimage service: %s", err) - return 1 - } - preimagePlugin, err := preimage.NewPlugin(ctx, preimage.Config{ - ArkClient: takerClient, - Emulator: emulatorClient, - SolverPrivKey: preimagePriv, - EmulatorPubKey: emulatorPub, - ServerPubKey: configData.SignerPubKey, - CheckpointTapscript: checkpointBytes, - Network: configData.Network, - Log: log.StandardLogger(), - }) - if err != nil { - log.Errorf("failed to build preimage plugin: %s", err) - return 1 + return err } - plugins = append(plugins, preimagePlugin) - - solverCtx, solverCancel := context.WithCancel(ctx) - solverDone := make(chan error, 1) - s := solver.New(plugins...).WithLogger(log.StandardLogger()) - src := arkdsource.New(takerClient.Client(), log.StandardLogger()) - go func() { - solverDone <- s.Run(solverCtx, src) - }() - defer func() { - solverCancel() - if err := <-solverDone; err != nil && !errors.Is(err, context.Canceled) { - log.Errorf("solver exited during shutdown: %s", err) + // nolint:errcheck + defer conn.Close() + client := bancov1.NewBancoServiceClient(conn) + + for { + callCtx, callCancel := context.WithTimeout(ctx, time.Second) + resp, err := client.GetStatus(callCtx, &bancov1.GetStatusRequest{}) + callCancel() + if err == nil && resp.GetRunning() { + return nil } - }() - // Start the real gRPC + HTTP gateway server hosting both takerSvc and - // preimageSvc. e2e tests dial this server as a real client would, rather - // than calling application services directly. - srv := grpcservice.NewServer( - e2eGRPCPort, - e2eHTTPPort, - log.StandardLogger(), - grpcservice.WithBancoService(takerSvc), - grpcservice.WithPreimageService(preimageSvc), - ) - if err := srv.Start(); err != nil { - log.Errorf("failed to start grpc server: %s", err) - return 1 + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } } - defer srv.Stop() - // Give the listeners a moment to come up before tests dial. - time.Sleep(500 * time.Millisecond) - - return m.Run() } -// setupTakerClient builds, inits, and unlocks the bot's wallet. Same flow as -// utils_test.go setupArkClient but adapted for TestMain (no *testing.T) and -// with a caller-managed datadir whose lifetime spans the whole test run. -func setupTakerClient(ctx context.Context, datadir string) (arksdk.Wallet, error) { - // Auto-settle disabled: the bot spends its VTXOs to fulfill offers, - // which races the scheduler and floods logs with VTXO_ALREADY_SPENT. - client, err := arksdk.NewWallet(datadir, arksdk.WithoutAutoSettle()) - if err != nil { - return nil, err - } - if err := client.Init(ctx, arkdURL, "", password); err != nil { - return nil, err - } - if err := client.Unlock(ctx, password); err != nil { - return nil, err - } +func waitWalletSynced(ctx context.Context, client arksdk.Wallet) error { synced := <-client.IsSynced(ctx) if synced.Err != nil { - return nil, fmt.Errorf("taker client sync: %w", synced.Err) + return fmt.Errorf("taker client sync: %w", synced.Err) } if !synced.Synced { - return nil, fmt.Errorf("taker client failed to sync") + return fmt.Errorf("taker client failed to sync") } - return client, nil + return nil } // fundTaker tops up the bot's offchain balance via an admin-issued note, using