Files
zot/pkg/extensions/lint/lint.go
T
2026-05-29 15:58:51 +00:00

291 lines
8.0 KiB
Go

//go:build lint
package lint
import (
"encoding/json"
"fmt"
"strings"
godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
zerr "zotregistry.dev/zot/v2/errors"
zcommon "zotregistry.dev/zot/v2/pkg/common"
"zotregistry.dev/zot/v2/pkg/extensions/config"
"zotregistry.dev/zot/v2/pkg/log"
"zotregistry.dev/zot/v2/pkg/meta"
mTypes "zotregistry.dev/zot/v2/pkg/meta/types"
storageCommon "zotregistry.dev/zot/v2/pkg/storage/common"
storageTypes "zotregistry.dev/zot/v2/pkg/storage/types"
)
type Linter struct {
config *config.LintConfig
signatureVerifier mTypes.ImageTrustStore
trustStoreReady bool
log log.Logger
}
func NewLinter(config *config.LintConfig, log log.Logger) *Linter {
return &Linter{
config: config,
log: log,
}
}
func (linter *Linter) SetSignatureVerifier(signatureVerifier mTypes.ImageTrustStore, trustStoreReady bool) {
linter.signatureVerifier = signatureVerifier
linter.trustStoreReady = trustStoreReady
}
func (linter *Linter) isEnabled() bool {
return linter.config != nil && (linter.config.Enable == nil || *linter.config.Enable)
}
func (linter *Linter) CheckMandatoryAnnotations(repo string, manifestDigest godigest.Digest,
imgStore storageTypes.ImageStore,
) (bool, error) {
if linter.config == nil {
return true, nil
}
if !linter.isEnabled() || len(linter.config.MandatoryAnnotations) == 0 {
return true, nil
}
mandatoryAnnotationsList := linter.config.MandatoryAnnotations
content, err := imgStore.GetBlobContent(repo, manifestDigest)
if err != nil {
linter.log.Error().Err(err).Str("component", "linter").Msg("failed to get image manifest")
return false, err
}
var manifest ispec.Manifest
if err := json.Unmarshal(content, &manifest); err != nil {
linter.log.Error().Err(err).Str("component", "linter").Msg("failed to unmarshal manifest JSON")
return false, err
}
mandatoryAnnotationsMap := make(map[string]bool)
for _, annotation := range mandatoryAnnotationsList {
mandatoryAnnotationsMap[annotation] = false
}
manifestAnnotations := manifest.Annotations
for annotation := range manifestAnnotations {
if _, ok := mandatoryAnnotationsMap[annotation]; ok {
mandatoryAnnotationsMap[annotation] = true
}
}
missingAnnotations := getMissingAnnotations(mandatoryAnnotationsMap)
if len(missingAnnotations) == 0 {
return true, nil
}
// if there are mandatory annotations missing in the manifest, get config and check these annotations too
configDigest := manifest.Config.Digest
content, err = imgStore.GetBlobContent(repo, configDigest)
if err != nil {
linter.log.Error().Err(err).Str("component", "linter").Msg("failed to get config JSON " +
configDigest.String())
return false, err
}
var imageConfig ispec.Image
if err := json.Unmarshal(content, &imageConfig); err != nil {
linter.log.Error().Err(err).Str("component", "linter").Msg("failed to unmarshal config JSON " + configDigest.String())
return false, err
}
configAnnotations := imageConfig.Config.Labels
for annotation := range configAnnotations {
if _, ok := mandatoryAnnotationsMap[annotation]; ok {
mandatoryAnnotationsMap[annotation] = true
}
}
missingAnnotations = getMissingAnnotations(mandatoryAnnotationsMap)
if len(missingAnnotations) > 0 {
msg := fmt.Sprintf("\nlinter: manifest %s\nor config %s\nis missing the next annotations: %s",
string(manifestDigest), string(configDigest), missingAnnotations)
linter.log.Error().Msg(msg)
return false, zerr.NewError(zerr.ErrImageLintAnnotations).AddDetail("missingAnnotations", msg)
}
return true, nil
}
func (linter *Linter) CheckMandatorySignatures(repo string, manifestDigest godigest.Digest,
imgStore storageTypes.ImageStore,
) (bool, error) {
if linter.config == nil || !linter.isEnabled() || len(linter.config.MandatorySignatures) == 0 {
return true, nil
}
mandatory := false
for _, mandatoryRepo := range linter.config.MandatorySignatures {
if mandatoryRepo == "*" || mandatoryRepo == "**" || repo == mandatoryRepo {
mandatory = true
break
}
}
if !mandatory {
return true, nil
}
if linter.signatureVerifier == nil || !linter.trustStoreReady {
msg := fmt.Sprintf("mandatory signatures lint for repository %q requires a configured trust store", repo)
return false, zerr.NewError(zerr.ErrImageLintAnnotations).AddDetail("missingSignatures", msg)
}
isTrusted, err := linter.hasTrustedSignature(repo, manifestDigest, imgStore)
if err != nil {
return false, err
}
if !isTrusted {
msg := fmt.Sprintf("manifest %s in repository %s does not have a trusted signature", manifestDigest, repo)
return false, zerr.NewError(zerr.ErrImageLintAnnotations).AddDetail("missingSignatures", msg)
}
return true, nil
}
func (linter *Linter) hasTrustedSignature(repo string, manifestDigest godigest.Digest,
imgStore storageTypes.ImageStore,
) (bool, error) {
index, err := storageCommon.GetIndex(imgStore, repo, linter.log)
if err != nil {
return false, err
}
manifestBlob, err := imgStore.GetBlobContent(repo, manifestDigest)
if err != nil {
return false, err
}
imageMeta := mTypes.ImageMeta{
MediaType: ispec.MediaTypeImageManifest,
Digest: manifestDigest,
Size: int64(len(manifestBlob)),
}
for _, descriptor := range index.Manifests {
if descriptor.Digest == manifestDigest {
continue
}
signatureBlob, err := imgStore.GetBlobContent(repo, descriptor.Digest)
if err != nil {
continue
}
var signatureManifest ispec.Manifest
if err := json.Unmarshal(signatureBlob, &signatureManifest); err != nil {
continue
}
signatureType, isImageSignature := getSignatureType(descriptor, signatureManifest, manifestDigest)
if !isImageSignature {
continue
}
signatureLayers, err := meta.GetSignatureLayersInfo(repo,
descriptor.Annotations[ispec.AnnotationRefName], descriptor.Digest.String(), signatureType,
signatureBlob, imgStore, linter.log)
if err != nil {
continue
}
for _, signatureLayer := range signatureLayers {
_, _, trusted, err := linter.signatureVerifier.VerifySignature(signatureType,
signatureLayer.LayerContent, signatureLayer.SignatureKey, manifestDigest, imageMeta, repo)
if err != nil {
continue
}
if trusted {
return true, nil
}
}
}
return false, nil
}
func getSignatureType(descriptor ispec.Descriptor, signatureManifest ispec.Manifest, manifestDigest godigest.Digest) (string, bool) {
artifactType := zcommon.GetManifestArtifactType(signatureManifest)
if signatureManifest.Subject != nil && signatureManifest.Subject.Digest == manifestDigest {
switch {
case zcommon.IsArtifactTypeCosign(artifactType):
return zcommon.CosignSignature, true
case artifactType == zcommon.ArtifactTypeNotation:
return zcommon.NotationSignature, true
}
}
tag := descriptor.Annotations[ispec.AnnotationRefName]
if zcommon.IsCosignSignature(tag) {
signedDigest, err := getDigestFromCosignTag(tag)
if err == nil && signedDigest == manifestDigest {
return zcommon.CosignSignature, true
}
}
return "", false
}
func getDigestFromCosignTag(tag string) (godigest.Digest, error) {
const (
cosignPrefix = "sha256-"
cosignSuffix = ".sig"
)
if !strings.HasPrefix(tag, cosignPrefix) || !strings.HasSuffix(tag, cosignSuffix) {
return "", zerr.ErrBadManifest
}
encodedDigest := strings.TrimSuffix(strings.TrimPrefix(tag, cosignPrefix), cosignSuffix)
return godigest.NewDigestFromEncoded(godigest.SHA256, encodedDigest), nil
}
func (linter *Linter) Lint(repo string, manifestDigest godigest.Digest,
imageStore storageTypes.ImageStore,
) (bool, error) {
pass, err := linter.CheckMandatoryAnnotations(repo, manifestDigest, imageStore)
if err != nil || !pass {
return pass, err
}
return linter.CheckMandatorySignatures(repo, manifestDigest, imageStore)
}
func getMissingAnnotations(mandatoryAnnotationsMap map[string]bool) []string {
var missingAnnotations []string
for annotation, flag := range mandatoryAnnotationsMap {
if !flag {
missingAnnotations = append(missingAnnotations, annotation)
}
}
return missingAnnotations
}