Skip to content

Commit e7e73e7

Browse files
committed
refactor: consolidate query options into execution/execopts package
- Move query.Options to execopts.Options - Rename engine.QueryOpts to engine.QueryOptions - Simplify QueryOptions (remove promql.QueryOpts interface) - Delete the query package Signed-off-by: Michael Hoffmann <[email protected]>
1 parent 460a2db commit e7e73e7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+238
-240
lines changed

engine/engine.go

Lines changed: 56 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ import (
1414
"time"
1515

1616
"github.com/thanos-io/promql-engine/execution"
17+
"github.com/thanos-io/promql-engine/execution/execopts"
1718
"github.com/thanos-io/promql-engine/execution/model"
1819
"github.com/thanos-io/promql-engine/execution/parse"
1920
"github.com/thanos-io/promql-engine/execution/telemetry"
2021
"github.com/thanos-io/promql-engine/extlabels"
2122
"github.com/thanos-io/promql-engine/logicalplan"
22-
"github.com/thanos-io/promql-engine/query"
2323
engstorage "github.com/thanos-io/promql-engine/storage"
2424
promstorage "github.com/thanos-io/promql-engine/storage/prometheus"
2525
"github.com/thanos-io/promql-engine/warnings"
@@ -83,33 +83,31 @@ type Opts struct {
8383
DisableDuplicateLabelChecks bool
8484
}
8585

86-
// QueryOpts implements promql.QueryOpts but allows to override more engine default options.
87-
type QueryOpts struct {
88-
// These values are used to implement promql.QueryOpts, they have weird "Param" suffix because
89-
// they are accessed by methods of the same name.
90-
LookbackDeltaParam time.Duration
91-
EnablePerStepStatsParam bool
86+
// QueryOptions allows overriding engine default options on a per-query basis.
87+
type QueryOptions struct {
88+
// LookbackDelta overrides the engine's default lookback delta for this query.
89+
LookbackDelta time.Duration
9290

93-
// DecodingConcurrency can be used to override the DecodingConcurrency engine setting.
91+
// EnablePerStepStats enables per-step statistics for this query.
92+
EnablePerStepStats bool
93+
94+
// DecodingConcurrency overrides the engine's default decoding concurrency for this query.
9495
DecodingConcurrency int
9596

96-
// SelectorBatchSize can be used to override the SelectorBatchSize engine setting.
97+
// SelectorBatchSize overrides the engine's default selector batch size for this query.
9798
SelectorBatchSize int64
9899

99-
// LogicalOptimizers can be used to override the LogicalOptimizers engine setting.
100+
// LogicalOptimizers overrides the engine's default logical optimizers for this query.
100101
LogicalOptimizers []logicalplan.Optimizer
101102
}
102103

103-
func (opts QueryOpts) LookbackDelta() time.Duration { return opts.LookbackDeltaParam }
104-
func (opts QueryOpts) EnablePerStepStats() bool { return opts.EnablePerStepStatsParam }
105-
106-
func fromPromQLOpts(opts promql.QueryOpts) *QueryOpts {
104+
func fromPromQLOpts(opts promql.QueryOpts) *QueryOptions {
107105
if opts == nil {
108-
return &QueryOpts{}
106+
return &QueryOptions{}
109107
}
110-
return &QueryOpts{
111-
LookbackDeltaParam: opts.LookbackDelta(),
112-
EnablePerStepStatsParam: opts.EnablePerStepStats(),
108+
return &QueryOptions{
109+
LookbackDelta: opts.LookbackDelta(),
110+
EnablePerStepStats: opts.EnablePerStepStats(),
113111
}
114112
}
115113

@@ -229,7 +227,7 @@ type Engine struct {
229227
noStepSubqueryIntervalFn func(time.Duration) time.Duration
230228
}
231229

232-
func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
230+
func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOptions, qs string, ts time.Time) (promql.Query, error) {
233231
idx, err := e.activeQueryTracker.Insert(ctx, qs)
234232
if err != nil {
235233
return nil, err
@@ -245,15 +243,15 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
245243
// the presentation layer and not when computing the results.
246244
resultSort := newResultSort(expr)
247245

248-
qOpts := e.makeQueryOpts(ts, ts, 0, opts)
249-
if qOpts.StepsBatch > 64 {
246+
execOpts := e.makeExecutionOpts(ts, ts, 0, opts)
247+
if execOpts.StepsBatch > 64 {
250248
return nil, ErrStepsBatchTooLarge
251249
}
252250

253251
planOpts := logicalplan.PlanOptions{
254252
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
255253
}
256-
initialPlan, err := logicalplan.NewFromAST(expr, qOpts, planOpts)
254+
initialPlan, err := logicalplan.NewFromAST(expr, execOpts, planOpts)
257255
if err != nil {
258256
return nil, errors.Wrap(err, "creating plan")
259257
}
@@ -262,18 +260,18 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
262260
ctx = warnings.NewContext(ctx)
263261
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
264262

265-
scanners, err := e.storageScanners(q, qOpts, optimizedPlan)
263+
scanners, err := e.storageScanners(q, execOpts, optimizedPlan)
266264
if err != nil {
267265
return nil, errors.Wrap(err, "creating storage scanners")
268266
}
269267

270-
exec, err := execution.New(ctx, optimizedPlan.Root(), scanners, qOpts)
268+
exec, err := execution.New(ctx, optimizedPlan.Root(), scanners, execOpts)
271269
if err != nil {
272270
return nil, err
273271
}
274272
e.metrics.totalQueries.Inc()
275273
return &compatibilityQuery{
276-
Query: &Query{exec: exec, opts: qOpts},
274+
Query: &Query{exec: exec, opts: execOpts},
277275
engine: e,
278276
plan: optimizedPlan,
279277
warns: warns,
@@ -284,38 +282,38 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
284282
}, nil
285283
}
286284

287-
func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, root logicalplan.Node, ts time.Time) (promql.Query, error) {
285+
func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOptions, root logicalplan.Node, ts time.Time) (promql.Query, error) {
288286
idx, err := e.activeQueryTracker.Insert(ctx, root.String())
289287
if err != nil {
290288
return nil, err
291289
}
292290
defer e.activeQueryTracker.Delete(idx)
293291

294-
qOpts := e.makeQueryOpts(ts, ts, 0, opts)
295-
if qOpts.StepsBatch > 64 {
292+
execOpts := e.makeExecutionOpts(ts, ts, 0, opts)
293+
if execOpts.StepsBatch > 64 {
296294
return nil, ErrStepsBatchTooLarge
297295
}
298296
planOpts := logicalplan.PlanOptions{
299297
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
300298
}
301-
lplan, warns := logicalplan.New(root, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))
299+
lplan, warns := logicalplan.New(root, execOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))
302300

303301
ctx = warnings.NewContext(ctx)
304302
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
305303

306-
scnrs, err := e.storageScanners(q, qOpts, lplan)
304+
scnrs, err := e.storageScanners(q, execOpts, lplan)
307305
if err != nil {
308306
return nil, errors.Wrap(err, "creating storage scanners")
309307
}
310308

311-
exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
309+
exec, err := execution.New(ctx, lplan.Root(), scnrs, execOpts)
312310
if err != nil {
313311
return nil, err
314312
}
315313
e.metrics.totalQueries.Inc()
316314

317315
return &compatibilityQuery{
318-
Query: &Query{exec: exec, opts: qOpts},
316+
Query: &Query{exec: exec, opts: execOpts},
319317
engine: e,
320318
plan: lplan,
321319
warns: warns,
@@ -327,7 +325,7 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab
327325
}, nil
328326
}
329327

330-
func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
328+
func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOptions, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
331329
idx, err := e.activeQueryTracker.Insert(ctx, qs)
332330
if err != nil {
333331
return nil, err
@@ -343,15 +341,15 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *
343341
if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar {
344342
return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
345343
}
346-
qOpts := e.makeQueryOpts(start, end, step, opts)
347-
if qOpts.StepsBatch > 64 {
344+
execOpts := e.makeExecutionOpts(start, end, step, opts)
345+
if execOpts.StepsBatch > 64 {
348346
return nil, ErrStepsBatchTooLarge
349347
}
350348
planOpts := logicalplan.PlanOptions{
351349
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
352350
}
353351

354-
initialPlan, err := logicalplan.NewFromAST(expr, qOpts, planOpts)
352+
initialPlan, err := logicalplan.NewFromAST(expr, execOpts, planOpts)
355353
if err != nil {
356354
return nil, errors.Wrap(err, "creating plan")
357355
}
@@ -360,19 +358,19 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *
360358
ctx = warnings.NewContext(ctx)
361359
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
362360

363-
scnrs, err := e.storageScanners(q, qOpts, optimizedPlan)
361+
scnrs, err := e.storageScanners(q, execOpts, optimizedPlan)
364362
if err != nil {
365363
return nil, errors.Wrap(err, "creating storage scanners")
366364
}
367365

368-
exec, err := execution.New(ctx, optimizedPlan.Root(), scnrs, qOpts)
366+
exec, err := execution.New(ctx, optimizedPlan.Root(), scnrs, execOpts)
369367
if err != nil {
370368
return nil, err
371369
}
372370
e.metrics.totalQueries.Inc()
373371

374372
return &compatibilityQuery{
375-
Query: &Query{exec: exec, opts: qOpts},
373+
Query: &Query{exec: exec, opts: execOpts},
376374
engine: e,
377375
plan: optimizedPlan,
378376
warns: warns,
@@ -381,37 +379,37 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *
381379
}, nil
382380
}
383381

384-
func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) {
382+
func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOptions, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) {
385383
idx, err := e.activeQueryTracker.Insert(ctx, root.String())
386384
if err != nil {
387385
return nil, err
388386
}
389387
defer e.activeQueryTracker.Delete(idx)
390388

391-
qOpts := e.makeQueryOpts(start, end, step, opts)
392-
if qOpts.StepsBatch > 64 {
389+
execOpts := e.makeExecutionOpts(start, end, step, opts)
390+
if execOpts.StepsBatch > 64 {
393391
return nil, ErrStepsBatchTooLarge
394392
}
395393
planOpts := logicalplan.PlanOptions{
396394
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
397395
}
398-
lplan, warns := logicalplan.New(root, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))
396+
lplan, warns := logicalplan.New(root, execOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))
399397

400-
scnrs, err := e.storageScanners(q, qOpts, lplan)
398+
scnrs, err := e.storageScanners(q, execOpts, lplan)
401399
if err != nil {
402400
return nil, errors.Wrap(err, "creating storage scanners")
403401
}
404402

405403
ctx = warnings.NewContext(ctx)
406404
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
407-
exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
405+
exec, err := execution.New(ctx, lplan.Root(), scnrs, execOpts)
408406
if err != nil {
409407
return nil, err
410408
}
411409
e.metrics.totalQueries.Inc()
412410

413411
return &compatibilityQuery{
414-
Query: &Query{exec: exec, opts: qOpts},
412+
Query: &Query{exec: exec, opts: execOpts},
415413
engine: e,
416414
plan: lplan,
417415
warns: warns,
@@ -432,8 +430,8 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr
432430
return e.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, step)
433431
}
434432

435-
func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duration, opts *QueryOpts) *query.Options {
436-
res := &query.Options{
433+
func (e *Engine) makeExecutionOpts(start time.Time, end time.Time, step time.Duration, opts *QueryOptions) *execopts.Options {
434+
res := &execopts.Options{
437435
Start: start,
438436
End: end,
439437
Step: step,
@@ -449,11 +447,11 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
449447
return res
450448
}
451449

452-
if opts.LookbackDelta() > 0 {
453-
res.LookbackDelta = opts.LookbackDelta()
450+
if opts.LookbackDelta > 0 {
451+
res.LookbackDelta = opts.LookbackDelta
454452
}
455-
if opts.EnablePerStepStats() {
456-
res.EnablePerStepStats = opts.EnablePerStepStats()
453+
if opts.EnablePerStepStats {
454+
res.EnablePerStepStats = opts.EnablePerStepStats
457455
}
458456

459457
if opts.DecodingConcurrency != 0 {
@@ -463,30 +461,30 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
463461
return res
464462
}
465463

466-
func (e *Engine) getLogicalOptimizers(opts *QueryOpts) []logicalplan.Optimizer {
464+
func (e *Engine) getLogicalOptimizers(opts *QueryOptions) []logicalplan.Optimizer {
467465
var optimizers []logicalplan.Optimizer
468-
if len(opts.LogicalOptimizers) != 0 {
466+
if opts != nil && len(opts.LogicalOptimizers) != 0 {
469467
optimizers = slices.Clone(opts.LogicalOptimizers)
470468
} else {
471469
optimizers = slices.Clone(e.logicalOptimizers)
472470
}
473471
selectorBatchSize := e.selectorBatchSize
474-
if opts.SelectorBatchSize != 0 {
472+
if opts != nil && opts.SelectorBatchSize != 0 {
475473
selectorBatchSize = opts.SelectorBatchSize
476474
}
477475
return append(optimizers, logicalplan.SelectorBatchSize{Size: selectorBatchSize})
478476
}
479477

480-
func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (engstorage.Scanners, error) {
478+
func (e *Engine) storageScanners(queryable storage.Queryable, execOpts *execopts.Options, lplan logicalplan.Plan) (engstorage.Scanners, error) {
481479
if e.scanners == nil {
482-
return promstorage.NewPrometheusScanners(queryable, qOpts, lplan)
480+
return promstorage.NewPrometheusScanners(queryable, execOpts, lplan)
483481
}
484482
return e.scanners, nil
485483
}
486484

487485
type Query struct {
488486
exec model.VectorOperator
489-
opts *query.Options
487+
opts *execopts.Options
490488
}
491489

492490
// Explain returns human-readable explanation of the created executor.

engine/engine_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ import (
2222
"time"
2323

2424
"github.com/thanos-io/promql-engine/engine"
25+
"github.com/thanos-io/promql-engine/execution/execopts"
2526
"github.com/thanos-io/promql-engine/execution/model"
2627
"github.com/thanos-io/promql-engine/extlabels"
2728
"github.com/thanos-io/promql-engine/logicalplan"
28-
"github.com/thanos-io/promql-engine/query"
2929
"github.com/thanos-io/promql-engine/storage/prometheus"
3030
"github.com/thanos-io/promql-engine/warnings"
3131

@@ -2482,7 +2482,7 @@ type scannersWithWarns struct {
24822482
promScanners *prometheus.Scanners
24832483
}
24842484

2485-
func newScannersWithWarns(warn error, qOpts *query.Options, lplan logicalplan.Plan) (*scannersWithWarns, error) {
2485+
func newScannersWithWarns(warn error, qOpts *execopts.Options, lplan logicalplan.Plan) (*scannersWithWarns, error) {
24862486
scanners, err := prometheus.NewPrometheusScanners(&storage.MockQueryable{
24872487
MockQuerier: storage.NoopQuerier(),
24882488
}, qOpts, lplan)
@@ -2497,12 +2497,12 @@ func newScannersWithWarns(warn error, qOpts *query.Options, lplan logicalplan.Pl
24972497

24982498
func (s *scannersWithWarns) Close() error { return nil }
24992499

2500-
func (s scannersWithWarns) NewVectorSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) {
2500+
func (s scannersWithWarns) NewVectorSelector(ctx context.Context, opts *execopts.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) {
25012501
warnings.AddToContext(s.warn, ctx)
25022502
return s.promScanners.NewVectorSelector(ctx, opts, hints, selector)
25032503
}
25042504

2505-
func (s scannersWithWarns) NewMatrixSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.MatrixSelector, call logicalplan.FunctionCall) (model.VectorOperator, error) {
2505+
func (s scannersWithWarns) NewMatrixSelector(ctx context.Context, opts *execopts.Options, hints storage.SelectHints, selector logicalplan.MatrixSelector, call logicalplan.FunctionCall) (model.VectorOperator, error) {
25062506
warnings.AddToContext(s.warn, ctx)
25072507
return s.promScanners.NewMatrixSelector(ctx, opts, hints, selector, call)
25082508
}
@@ -2513,7 +2513,7 @@ func TestWarningsPlanCreation(t *testing.T) {
25132513
expectedWarn = errors.New("test warning")
25142514
)
25152515

2516-
scnrs, err := newScannersWithWarns(expectedWarn, &query.Options{}, nil)
2516+
scnrs, err := newScannersWithWarns(expectedWarn, &execopts.Options{}, nil)
25172517
testutil.Ok(t, err)
25182518
newEngine := engine.NewWithScanners(opts, scnrs)
25192519
q1, err := newEngine.NewRangeQuery(context.Background(), nil, nil, "http_requests_total", time.UnixMilli(0), time.UnixMilli(600), 30*time.Second)

engine/projection_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func TestProjectionWithFuzz(t *testing.T) {
242242
Queryable: storage,
243243
}
244244

245-
normalQuery, err := normalEngine.NewInstantQuery(ctx, storage, &engine.QueryOpts{}, query, queryTime)
245+
normalQuery, err := normalEngine.NewInstantQuery(ctx, storage, nil, query, queryTime)
246246
testutil.Ok(t, err)
247247
defer normalQuery.Close()
248248
normalResult := normalQuery.Exec(ctx)
@@ -252,7 +252,7 @@ func TestProjectionWithFuzz(t *testing.T) {
252252
}
253253
testutil.Ok(t, normalResult.Err, "query: %s", query)
254254

255-
projectionQuery, err := projectionEngine.MakeInstantQuery(ctx, projectionStorage, &engine.QueryOpts{}, query, queryTime)
255+
projectionQuery, err := projectionEngine.MakeInstantQuery(ctx, projectionStorage, nil, query, queryTime)
256256
testutil.Ok(t, err)
257257

258258
defer projectionQuery.Close()

engine/propagate_selector_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func TestPropagateMatchers(t *testing.T) {
120120
}
121121

122122
t.Run(fmt.Sprintf("Query_%d", i), func(t *testing.T) {
123-
normalQuery, err := normalEngine.NewInstantQuery(ctx, storage, &engine.QueryOpts{}, query, queryTime)
123+
normalQuery, err := normalEngine.NewInstantQuery(ctx, storage, nil, query, queryTime)
124124
testutil.Ok(t, err)
125125
defer normalQuery.Close()
126126
normalResult := normalQuery.Exec(ctx)
@@ -130,7 +130,7 @@ func TestPropagateMatchers(t *testing.T) {
130130
}
131131
testutil.Ok(t, normalResult.Err, "query: %s", query)
132132

133-
optimizedQuery, err := optimizedEngine.MakeInstantQuery(ctx, storage, &engine.QueryOpts{}, query, queryTime)
133+
optimizedQuery, err := optimizedEngine.MakeInstantQuery(ctx, storage, nil, query, queryTime)
134134
testutil.Ok(t, err)
135135

136136
defer optimizedQuery.Close()

0 commit comments

Comments
 (0)