mirror of
https://github.com/project-zot/zot.git
synced 2026-06-18 05:28:07 +08:00
9dfa7c3ae6
Replace MakeTempFile usage with MakeTempFilePath and MakeTempFileWithContent helpers that automatically handle file lifecycle. This prevents resource leaks by ensuring temporary files are properly closed. Shoudld also make the tests easier to read. Signed-off-by: Andrei Aaron <andreifdaaron@gmail.com>
436 lines
13 KiB
Go
436 lines
13 KiB
Go
package scheduler_test
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"runtime"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
. "github.com/smartystreets/goconvey/convey"
|
|
|
|
"zotregistry.dev/zot/v2/pkg/api/config"
|
|
"zotregistry.dev/zot/v2/pkg/extensions/monitoring"
|
|
"zotregistry.dev/zot/v2/pkg/log"
|
|
"zotregistry.dev/zot/v2/pkg/scheduler"
|
|
test "zotregistry.dev/zot/v2/pkg/test/common"
|
|
)
|
|
|
|
type task struct {
|
|
log log.Logger
|
|
msg string
|
|
err bool
|
|
delay time.Duration
|
|
}
|
|
|
|
var errInternal = errors.New("task: internal error")
|
|
|
|
func (t *task) DoWork(ctx context.Context) error {
|
|
if t.err {
|
|
return errInternal
|
|
}
|
|
|
|
for range 5 {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
time.Sleep(t.delay)
|
|
}
|
|
|
|
t.log.Info().Msg(t.msg)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *task) String() string {
|
|
return t.Name()
|
|
}
|
|
|
|
func (t *task) Name() string {
|
|
return "TestTask"
|
|
}
|
|
|
|
type generator struct {
|
|
log log.Logger
|
|
priority string
|
|
done bool
|
|
index int
|
|
step int
|
|
limit int
|
|
taskDelay time.Duration
|
|
}
|
|
|
|
func (g *generator) Name() string {
|
|
return "TestGenerator"
|
|
}
|
|
|
|
func (g *generator) Next() (scheduler.Task, error) {
|
|
if g.step > g.limit {
|
|
g.done = true
|
|
}
|
|
|
|
g.step++
|
|
g.index++
|
|
|
|
if g.step%11 == 0 {
|
|
return nil, nil //nolint:nilnil
|
|
}
|
|
|
|
if g.step%13 == 0 {
|
|
return nil, errInternal
|
|
}
|
|
|
|
return &task{
|
|
log: g.log,
|
|
msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index),
|
|
err: false,
|
|
delay: g.taskDelay,
|
|
}, nil
|
|
}
|
|
|
|
func (g *generator) IsDone() bool {
|
|
return g.done
|
|
}
|
|
|
|
func (g *generator) IsReady() bool {
|
|
return true
|
|
}
|
|
|
|
func (g *generator) Reset() {
|
|
g.done = false
|
|
g.step = 0
|
|
}
|
|
|
|
type shortGenerator struct {
|
|
log log.Logger
|
|
priority string
|
|
done bool
|
|
index int
|
|
step int
|
|
}
|
|
|
|
func (g *shortGenerator) Name() string {
|
|
return "ShortTestGenerator"
|
|
}
|
|
|
|
func (g *shortGenerator) Next() (scheduler.Task, error) {
|
|
g.done = true
|
|
|
|
return &task{log: g.log, msg: fmt.Sprintf("executing %s task; index: %d", g.priority, g.index), err: false}, nil
|
|
}
|
|
|
|
func (g *shortGenerator) IsDone() bool {
|
|
return g.done
|
|
}
|
|
|
|
func (g *shortGenerator) IsReady() bool {
|
|
return true
|
|
}
|
|
|
|
func (g *shortGenerator) Reset() {
|
|
g.done = true
|
|
g.step = 0
|
|
}
|
|
|
|
func TestScheduler(t *testing.T) {
|
|
Convey("Test active to waiting periodic generator", t, func() {
|
|
logPath := test.MakeTempFilePath(t, "zot-log.txt")
|
|
|
|
logger := log.NewLogger("debug", logPath)
|
|
metrics := monitoring.NewMetricsServer(true, logger)
|
|
sch := scheduler.NewScheduler(config.New(), metrics, logger)
|
|
|
|
genH := &shortGenerator{log: logger, priority: "high priority"}
|
|
// interval has to be higher than throttle value to simulate
|
|
sch.SubmitGenerator(genH, 1*time.Second, scheduler.HighPriority)
|
|
|
|
sch.RunScheduler()
|
|
time.Sleep(2 * time.Second)
|
|
sch.Shutdown()
|
|
|
|
data, err := os.ReadFile(logPath)
|
|
So(err, ShouldBeNil)
|
|
So(string(data), ShouldContainSubstring, "waiting generator is ready, pushing to ready generators")
|
|
})
|
|
|
|
Convey("Test order of generators in queue", t, func() {
|
|
logPath := test.MakeTempFilePath(t, "zot-log.txt")
|
|
|
|
logger := log.NewLogger("debug", logPath)
|
|
cfg := config.New()
|
|
cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3}
|
|
metrics := monitoring.NewMetricsServer(true, logger)
|
|
sch := scheduler.NewScheduler(cfg, metrics, logger)
|
|
sch.RateLimit = 5 * time.Second
|
|
|
|
genL := &generator{log: logger, priority: "low priority", limit: 100, taskDelay: 100 * time.Millisecond}
|
|
sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority)
|
|
|
|
genM := &generator{log: logger, priority: "medium priority", limit: 100, taskDelay: 100 * time.Millisecond}
|
|
sch.SubmitGenerator(genM, time.Duration(0), scheduler.MediumPriority)
|
|
|
|
genH := &generator{log: logger, priority: "high priority", limit: 100, taskDelay: 100 * time.Millisecond}
|
|
sch.SubmitGenerator(genH, time.Duration(0), scheduler.HighPriority)
|
|
|
|
sch.RunScheduler()
|
|
time.Sleep(4 * time.Second)
|
|
sch.Shutdown()
|
|
|
|
data, err := os.ReadFile(logPath)
|
|
So(err, ShouldBeNil)
|
|
|
|
So(string(data), ShouldContainSubstring, "executing high priority task; index: 1")
|
|
So(string(data), ShouldContainSubstring, "executing high priority task; index: 2")
|
|
So(string(data), ShouldNotContainSubstring, "executing medium priority task; index: 1")
|
|
So(string(data), ShouldNotContainSubstring, "failed to execute task")
|
|
})
|
|
|
|
Convey("Test reordering of generators in queue", t, func() {
|
|
logPath := test.MakeTempFilePath(t, "zot-log.txt")
|
|
|
|
logger := log.NewLogger("debug", logPath)
|
|
cfg := config.New()
|
|
cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3}
|
|
metrics := monitoring.NewMetricsServer(true, logger)
|
|
sch := scheduler.NewScheduler(cfg, metrics, logger)
|
|
sch.RateLimit = 1 * time.Nanosecond
|
|
|
|
// Testing repordering of generators using the same medium priority, as well as reordering with
|
|
// a low priority generator
|
|
|
|
genL := &generator{log: logger, priority: "low priority", limit: 110, taskDelay: time.Nanosecond}
|
|
sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority)
|
|
|
|
genM := &generator{log: logger, priority: "medium 1 priority", limit: 110, taskDelay: time.Nanosecond}
|
|
sch.SubmitGenerator(genM, time.Duration(0), scheduler.MediumPriority)
|
|
|
|
genH := &generator{log: logger, priority: "medium 2 priority", limit: 110, taskDelay: time.Nanosecond}
|
|
sch.SubmitGenerator(genH, time.Duration(0), scheduler.MediumPriority)
|
|
|
|
sch.RunScheduler()
|
|
time.Sleep(2 * time.Second) // Increased from 1 second to 2 seconds for stability
|
|
sch.Shutdown()
|
|
|
|
data, err := os.ReadFile(logPath)
|
|
So(err, ShouldBeNil)
|
|
|
|
// Check all tasks show up in the logs
|
|
for i := 1; i < 110; i++ {
|
|
if i%11 == 0 || i%13 == 0 {
|
|
continue
|
|
}
|
|
|
|
So(string(data), ShouldContainSubstring, fmt.Sprintf("executing medium 1 priority task; index: %d", i))
|
|
So(string(data), ShouldContainSubstring, fmt.Sprintf("executing medium 2 priority task; index: %d", i))
|
|
So(string(data), ShouldContainSubstring, fmt.Sprintf("executing low priority task; index: %d", i))
|
|
}
|
|
|
|
taskCounter := 0
|
|
priorityFlippedCounter := 0
|
|
samePriorityFlippedCounter := 0
|
|
lastPriority := "medium"
|
|
lastMediumGenerator := "1"
|
|
|
|
for line := range strings.SplitSeq(strings.TrimSuffix(string(data), "\n"), "\n") {
|
|
if !strings.Contains(line, "priority task; index: ") {
|
|
continue
|
|
}
|
|
|
|
taskCounter++
|
|
|
|
// low priority tasks start executing later
|
|
// medium priority generators are prioritized until the rank 100/9 (8 generated tasks)
|
|
// starting with 100/10, a low priority generator could potentially be prioritized instead
|
|
// there will be at least 8 * 2 medium priority tasks executed before low priority tasks are pushed
|
|
if taskCounter < 17 {
|
|
So(line, ShouldContainSubstring, "executing medium")
|
|
}
|
|
|
|
// medium priority 2*110 medium priority tasks should have been generated,
|
|
// medium priority generators should be done
|
|
// add around 10 low priority tasks to the counter
|
|
// and an additional margin of 5 to make sure the test is stable
|
|
if taskCounter > 225 {
|
|
So(line, ShouldContainSubstring, "executing low priority")
|
|
}
|
|
|
|
if strings.Contains(line, "executing medium") {
|
|
if !strings.Contains(line, "executing medium "+lastMediumGenerator) {
|
|
samePriorityFlippedCounter++
|
|
|
|
if lastMediumGenerator == "1" {
|
|
lastMediumGenerator = "2"
|
|
} else {
|
|
lastMediumGenerator = "1"
|
|
}
|
|
}
|
|
}
|
|
|
|
if !strings.Contains(line, "executing "+lastPriority) {
|
|
priorityFlippedCounter++
|
|
|
|
if lastPriority == "low" {
|
|
lastPriority = "medium"
|
|
} else {
|
|
lastPriority = "low"
|
|
}
|
|
}
|
|
}
|
|
|
|
// fairness: make sure none of the medium priority generators is favored by the algorithm
|
|
So(samePriorityFlippedCounter, ShouldBeGreaterThanOrEqualTo, 50) // Lowered from 60 to 50 for stability
|
|
t.Logf("Switched between medium priority generators %d times", samePriorityFlippedCounter)
|
|
// fairness: make sure the algorithm alternates between generator priorities
|
|
So(priorityFlippedCounter, ShouldBeGreaterThanOrEqualTo, 10)
|
|
t.Logf("Switched between generator priorities %d times", priorityFlippedCounter)
|
|
})
|
|
|
|
Convey("Test task returning an error", t, func() {
|
|
logPath := test.MakeTempFilePath(t, "zot-log.txt")
|
|
|
|
logger := log.NewLogger("debug", logPath)
|
|
metrics := monitoring.NewMetricsServer(true, logger)
|
|
sch := scheduler.NewScheduler(config.New(), metrics, logger)
|
|
|
|
t := &task{log: logger, msg: "", err: true}
|
|
sch.SubmitTask(t, scheduler.MediumPriority)
|
|
|
|
sch.RunScheduler()
|
|
time.Sleep(500 * time.Millisecond)
|
|
sch.Shutdown()
|
|
|
|
data, err := os.ReadFile(logPath)
|
|
So(err, ShouldBeNil)
|
|
So(string(data), ShouldContainSubstring, "adding a new task")
|
|
So(string(data), ShouldContainSubstring, "failed to execute task")
|
|
})
|
|
|
|
Convey("Test resubmit generator", t, func() {
|
|
logPath := test.MakeTempFilePath(t, "zot-log.txt")
|
|
|
|
logger := log.NewLogger("debug", logPath)
|
|
metrics := monitoring.NewMetricsServer(true, logger)
|
|
sch := scheduler.NewScheduler(config.New(), metrics, logger)
|
|
|
|
genL := &generator{log: logger, priority: "low priority", limit: 100, taskDelay: 100 * time.Millisecond}
|
|
sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority)
|
|
|
|
sch.RunScheduler()
|
|
time.Sleep(1 * time.Second)
|
|
sch.Shutdown()
|
|
|
|
data, err := os.ReadFile(logPath)
|
|
So(err, ShouldBeNil)
|
|
So(string(data), ShouldContainSubstring, "executing low priority task; index: 1")
|
|
So(string(data), ShouldContainSubstring, "executing low priority task; index: 2")
|
|
})
|
|
|
|
Convey("Try to add a task with wrong priority", t, func() {
|
|
logPath := test.MakeTempFilePath(t, "zot-log.txt")
|
|
|
|
logger := log.NewLogger("debug", logPath)
|
|
metrics := monitoring.NewMetricsServer(true, logger)
|
|
sch := scheduler.NewScheduler(config.New(), metrics, logger)
|
|
|
|
t := &task{log: logger, msg: "", err: false}
|
|
sch.SubmitTask(t, -1)
|
|
|
|
data, err := os.ReadFile(logPath)
|
|
So(err, ShouldBeNil)
|
|
So(string(data), ShouldNotContainSubstring, "adding a new task")
|
|
})
|
|
|
|
Convey("Test adding a new task when context is done", t, func() {
|
|
logPath := test.MakeTempFilePath(t, "zot-log.txt")
|
|
|
|
logger := log.NewLogger("debug", logPath)
|
|
metrics := monitoring.NewMetricsServer(true, logger)
|
|
sch := scheduler.NewScheduler(config.New(), metrics, logger)
|
|
|
|
sch.RunScheduler()
|
|
sch.Shutdown()
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
t := &task{log: logger, msg: "", err: false}
|
|
sch.SubmitTask(t, scheduler.LowPriority)
|
|
|
|
data, err := os.ReadFile(logPath)
|
|
So(err, ShouldBeNil)
|
|
So(string(data), ShouldNotContainSubstring, "adding a new task")
|
|
})
|
|
|
|
Convey("Test stopping scheduler by calling Shutdown()", t, func() {
|
|
logPath := test.MakeTempFilePath(t, "zot-log.txt")
|
|
|
|
logger := log.NewLogger("debug", logPath)
|
|
metrics := monitoring.NewMetricsServer(true, logger)
|
|
sch := scheduler.NewScheduler(config.New(), metrics, logger)
|
|
|
|
genL := &generator{log: logger, priority: "medium priority", limit: 100, taskDelay: 100 * time.Millisecond}
|
|
sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.MediumPriority)
|
|
|
|
sch.RunScheduler()
|
|
time.Sleep(1 * time.Second)
|
|
sch.Shutdown()
|
|
|
|
data, err := os.ReadFile(logPath)
|
|
So(err, ShouldBeNil)
|
|
So(string(data), ShouldContainSubstring, "executing medium priority task; index: 1")
|
|
So(string(data), ShouldContainSubstring, "executing medium priority task; index: 2")
|
|
So(string(data), ShouldContainSubstring, "received stop signal, gracefully shutting down...")
|
|
})
|
|
|
|
Convey("Test scheduler Priority.String() method", t, func() {
|
|
var p scheduler.Priority //nolint: varnamelen
|
|
// test invalid priority
|
|
p = 6238734
|
|
So(p.String(), ShouldEqual, "invalid")
|
|
p = scheduler.LowPriority
|
|
So(p.String(), ShouldEqual, "low")
|
|
p = scheduler.MediumPriority
|
|
So(p.String(), ShouldEqual, "medium")
|
|
p = scheduler.HighPriority
|
|
So(p.String(), ShouldEqual, "high")
|
|
})
|
|
|
|
Convey("Test scheduler State.String() method", t, func() {
|
|
var s scheduler.State //nolint: varnamelen
|
|
// test invalid state
|
|
s = -67
|
|
So(s.String(), ShouldEqual, "invalid")
|
|
s = scheduler.Ready
|
|
So(s.String(), ShouldEqual, "ready")
|
|
s = scheduler.Waiting
|
|
So(s.String(), ShouldEqual, "waiting")
|
|
s = scheduler.Done
|
|
So(s.String(), ShouldEqual, "done")
|
|
})
|
|
}
|
|
|
|
func TestGetNumWorkers(t *testing.T) {
|
|
Convey("Test setting the number of workers - default value", t, func() {
|
|
logPath := test.MakeTempFilePath(t, "zot-log.txt")
|
|
logger := log.NewLogger("debug", logPath)
|
|
|
|
metrics := monitoring.NewMetricsServer(true, logger)
|
|
sch := scheduler.NewScheduler(config.New(), metrics, logger)
|
|
|
|
So(sch.NumWorkers, ShouldEqual, runtime.NumCPU()*4)
|
|
})
|
|
|
|
Convey("Test setting the number of workers - getting the value from config", t, func() {
|
|
cfg := config.New()
|
|
cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3}
|
|
logPath := test.MakeTempFilePath(t, "zot-log.txt")
|
|
logger := log.NewLogger("debug", logPath)
|
|
metrics := monitoring.NewMetricsServer(true, logger)
|
|
sch := scheduler.NewScheduler(cfg, metrics, logger)
|
|
|
|
So(sch.NumWorkers, ShouldEqual, 3)
|
|
})
|
|
}
|