feat(scheduler): gracefully shutdown (#1951)

wait for workers to finish before exiting

should fix tests reporting they couldn't remove rootDir because it's being
written by tasks

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
peusebiu
2023-11-24 10:40:10 +02:00
committed by GitHub
parent 92837c2bcb
commit 6222dae1f0
49 changed files with 710 additions and 379 deletions
+11 -11
View File
@@ -219,7 +219,7 @@ func (p *requestsPool) doJob(ctx context.Context, job *httpJob) {
header, err := makeHEADRequest(ctx, job.url, job.username, job.password, job.config.VerifyTLS,
job.config.Debug)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}
@@ -231,7 +231,7 @@ func (p *requestsPool) doJob(ctx context.Context, job *httpJob) {
case ispec.MediaTypeImageManifest:
image, err := fetchImageManifestStruct(ctx, job)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}
@@ -242,7 +242,7 @@ func (p *requestsPool) doJob(ctx context.Context, job *httpJob) {
str, err := image.string(job.config.OutputFormat, len(job.imageName), len(job.tagName), len(platformStr), verbose)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}
@@ -250,7 +250,7 @@ func (p *requestsPool) doJob(ctx context.Context, job *httpJob) {
return
}
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
@@ -258,7 +258,7 @@ func (p *requestsPool) doJob(ctx context.Context, job *httpJob) {
case ispec.MediaTypeImageIndex:
image, err := fetchImageIndexStruct(ctx, job)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}
@@ -270,7 +270,7 @@ func (p *requestsPool) doJob(ctx context.Context, job *httpJob) {
str, err := image.string(job.config.OutputFormat, len(job.imageName), len(job.tagName), len(platformStr), verbose)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
p.outputCh <- stringResult{"", err}
@@ -278,7 +278,7 @@ func (p *requestsPool) doJob(ctx context.Context, job *httpJob) {
return
}
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
@@ -294,7 +294,7 @@ func fetchImageIndexStruct(ctx context.Context, job *httpJob) (*imageStruct, err
header, err := makeGETRequest(ctx, job.url, job.username, job.password,
job.config.VerifyTLS, job.config.Debug, &indexContent, job.config.ResultWriter)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return nil, context.Canceled
}
@@ -385,7 +385,7 @@ func fetchManifestStruct(ctx context.Context, repo, manifestReference string, se
header, err := makeGETRequest(ctx, URL, username, password,
searchConf.VerifyTLS, searchConf.Debug, &manifestResp, searchConf.ResultWriter)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return common.ManifestSummary{}, context.Canceled
}
@@ -397,7 +397,7 @@ func fetchManifestStruct(ctx context.Context, repo, manifestReference string, se
configContent, err := fetchConfig(ctx, repo, configDigest, searchConf, username, password)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return common.ManifestSummary{}, context.Canceled
}
@@ -474,7 +474,7 @@ func fetchConfig(ctx context.Context, repo, configDigest string, searchConf Sear
_, err := makeGETRequest(ctx, URL, username, password,
searchConf.VerifyTLS, searchConf.Debug, &configContent, searchConf.ResultWriter)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return ispec.Image{}, context.Canceled
}
+2 -2
View File
@@ -583,7 +583,7 @@ func TestCVESort(t *testing.T) {
}
ctlr.CveScanner = mocks.CveScannerMock{
ScanImageFn: func(image string) (map[string]cvemodel.CVE, error) {
ScanImageFn: func(ctx context.Context, image string) (map[string]cvemodel.CVE, error) {
return map[string]cvemodel.CVE{
"CVE-2023-1255": {
ID: "CVE-2023-1255",
@@ -687,7 +687,7 @@ func getMockCveScanner(metaDB mTypes.MetaDB) cveinfo.Scanner {
// MetaDB loaded with initial data now mock the scanner
// Setup test CVE data in mock scanner
scanner := mocks.CveScannerMock{
ScanImageFn: func(image string) (map[string]cvemodel.CVE, error) {
ScanImageFn: func(ctx context.Context, image string) (map[string]cvemodel.CVE, error) {
if strings.Contains(image, "zot-cve-test@sha256:db573b01") ||
image == "zot-cve-test:0.0.1" {
return map[string]cvemodel.CVE{
+13 -22
View File
@@ -415,7 +415,7 @@ func (service searchService) getReferrers(ctx context.Context, config SearchConf
referrersEndpoint, err := combineServerAndEndpointURL(config.ServURL,
fmt.Sprintf("/v2/%s/referrers/%s", repo, digest))
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return referrersResult{}, nil
}
@@ -427,7 +427,7 @@ func (service searchService) getReferrers(ctx context.Context, config SearchConf
config.Debug, &referrerResp, config.ResultWriter)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return referrersResult{}, nil
}
@@ -477,7 +477,7 @@ func (service searchService) getAllImages(ctx context.Context, config SearchConf
catalogEndPoint, err := combineServerAndEndpointURL(config.ServURL, fmt.Sprintf("%s%s",
constants.RoutePrefix, constants.ExtCatalogPrefix))
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
rch <- stringResult{"", err}
@@ -488,7 +488,7 @@ func (service searchService) getAllImages(ctx context.Context, config SearchConf
_, err = makeGETRequest(ctx, catalogEndPoint, username, password, config.VerifyTLS,
config.Debug, catalog, config.ResultWriter)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
rch <- stringResult{"", err}
@@ -522,7 +522,7 @@ func getImage(ctx context.Context, config SearchConfig, username, password, imag
tagListEndpoint, err := combineServerAndEndpointURL(config.ServURL, fmt.Sprintf("/v2/%s/tags/list", repo))
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
rch <- stringResult{"", err}
@@ -535,7 +535,7 @@ func getImage(ctx context.Context, config SearchConfig, username, password, imag
config.Debug, &tagList, config.ResultWriter)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
rch <- stringResult{"", err}
@@ -601,7 +601,7 @@ func (service searchService) getImagesByDigest(ctx context.Context, config Searc
err := service.makeGraphQLQuery(ctx, config, username, password, query, result)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
rch <- stringResult{"", err}
@@ -616,7 +616,7 @@ func (service searchService) getImagesByDigest(ctx context.Context, config Searc
fmt.Fprintln(&errBuilder, err.Message)
}
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
rch <- stringResult{"", errors.New(errBuilder.String())} //nolint: goerr113
@@ -640,15 +640,6 @@ func (service searchService) getImagesByDigest(ctx context.Context, config Searc
localWg.Wait()
}
func isContextDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
// Query using GQL, the query string is passed as a parameter
// errors are returned in the stringResult channel, the unmarshalled payload is in resultPtr.
func (service searchService) makeGraphQLQuery(ctx context.Context,
@@ -672,7 +663,7 @@ func (service searchService) makeGraphQLQuery(ctx context.Context,
func checkResultGraphQLQuery(ctx context.Context, err error, resultErrors []common.ErrorGQL,
) error {
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return nil //nolint:nilnil
}
@@ -686,7 +677,7 @@ func checkResultGraphQLQuery(ctx context.Context, err error, resultErrors []comm
fmt.Fprintln(&errBuilder, error.Message)
}
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return nil
}
@@ -705,7 +696,7 @@ func addManifestCallToPool(ctx context.Context, config SearchConfig, pool *reque
manifestEndpoint, err := combineServerAndEndpointURL(config.ServURL,
fmt.Sprintf("/v2/%s/manifests/%s", imageName, tagName))
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
rch <- stringResult{"", err}
@@ -1315,7 +1306,7 @@ func (service searchService) getRepos(ctx context.Context, config SearchConfig,
catalogEndPoint, err := combineServerAndEndpointURL(config.ServURL, fmt.Sprintf("%s%s",
constants.RoutePrefix, constants.ExtCatalogPrefix))
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
rch <- stringResult{"", err}
@@ -1326,7 +1317,7 @@ func (service searchService) getRepos(ctx context.Context, config SearchConfig,
_, err = makeGETRequest(ctx, catalogEndPoint, username, password, config.VerifyTLS,
config.Debug, catalog, config.ResultWriter)
if err != nil {
if isContextDone(ctx) {
if common.IsContextDone(ctx) {
return
}
rch <- stringResult{"", err}