mirror of
https://github.com/project-zot/zot.git
synced 2026-06-17 21:17:58 +08:00
Multipart download enhancements (#4021)
* fix(api): set blob response Content-Type from OCI descriptor Blob HEAD responses had no Content-Type and GET responses echoed the request's Accept header verbatim, which produced missing or malformed media types and left multipart/byteranges parts without a per-part Content-Type. This breaks OCI distribution-spec conformance and consumers like stargz-snapshotter that need a well-formed layer media type. Add a blobResponseMediaType helper that resolves the descriptor's MediaType via GetBlobDescriptorFromRepo and falls back to application/octet-stream. Use it in CheckBlob (HEAD), GetBlob full (200), GetBlob single-range (206), and per-part in writeMultipartRanges (206 multipart). Lookup is deferred until after the blob is known to exist. Cover the new behaviour with mock-based unit tests in routes_test.go and end-to-end assertions in TestPullRange. Signed-off-by: Andrei Aaron <andreifdaaron@gmail.com> * perf(api): stream multipart blob ranges lazily with precomputed Content-Length writeMultipartRanges previously opened every range reader up front and emitted no Content-Length, so an N-range request held N concurrent storage readers (and their fds / read buffers) per response window and forced chunked encoding on HTTP/1.1 — neither friendly to proxies nor to fan-out scenarios like stargz lazy pulls. Signed-off-by: Andrei Aaron <andreifdaaron@gmail.com> --------- Signed-off-by: Andrei Aaron <andreifdaaron@gmail.com>
This commit is contained in:
@@ -11247,6 +11247,10 @@ func TestPullRange(t *testing.T) {
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.Header().Get("Accept-Ranges"), ShouldEqual, "bytes")
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
|
||||
// HEAD on an unreferenced blob (no manifest/index) must still
|
||||
// advertise a valid Content-Type. The descriptor lookup fails
|
||||
// here, so we expect the application/octet-stream fallback.
|
||||
So(resp.Header().Get("Content-Type"), ShouldEqual, "application/octet-stream")
|
||||
})
|
||||
|
||||
Convey("Get a range of bytes", func() {
|
||||
@@ -11255,6 +11259,11 @@ func TestPullRange(t *testing.T) {
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusPartialContent)
|
||||
So(resp.Header().Get("Content-Length"), ShouldEqual, strconv.Itoa(len(content)))
|
||||
So(resp.Body(), ShouldResemble, content)
|
||||
// Single-range 206: descriptor lookup fails (blob is not
|
||||
// referenced by any manifest), so Content-Type falls back to
|
||||
// application/octet-stream rather than echoing the request
|
||||
// Accept header.
|
||||
So(resp.Header().Get("Content-Type"), ShouldEqual, "application/octet-stream")
|
||||
|
||||
resp, err = resty.R().SetHeader("Range", "bytes=0-100").Get(blobLoc)
|
||||
So(err, ShouldBeNil)
|
||||
@@ -11318,6 +11327,11 @@ func TestPullRange(t *testing.T) {
|
||||
part, err := multipartReader.NextPart()
|
||||
So(err, ShouldBeNil)
|
||||
So(part.Header.Get("Content-Range"), ShouldEqual, "bytes 0-1/10")
|
||||
// Per-part Content-Type reflects the descriptor-aware
|
||||
// resolution. For an unreferenced blob this is the
|
||||
// application/octet-stream fallback; for a real OCI layer it
|
||||
// would carry the manifest's layer media type.
|
||||
So(part.Header.Get("Content-Type"), ShouldEqual, "application/octet-stream")
|
||||
|
||||
partBody, err := io.ReadAll(part)
|
||||
So(err, ShouldBeNil)
|
||||
@@ -11326,6 +11340,7 @@ func TestPullRange(t *testing.T) {
|
||||
part, err = multipartReader.NextPart()
|
||||
So(err, ShouldBeNil)
|
||||
So(part.Header.Get("Content-Range"), ShouldEqual, "bytes 4-6/10")
|
||||
So(part.Header.Get("Content-Type"), ShouldEqual, "application/octet-stream")
|
||||
|
||||
partBody, err = io.ReadAll(part)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
+218
-65
@@ -13,6 +13,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/textproto"
|
||||
@@ -1025,6 +1026,36 @@ func canMount(userAc *reqCtx.UserAccessControl, imgStore storageTypes.ImageStore
|
||||
return canMount, nil
|
||||
}
|
||||
|
||||
// resolveBlobResponseMediaType resolves the OCI media type to advertise for a blob via
|
||||
// the repo's index/manifests. If the descriptor lookup fails (or the descriptor
|
||||
// has no media type), it falls back to application/octet-stream.
|
||||
//
|
||||
// Use this for Content-Type on HEAD/GET blob responses to satisfy OCI
|
||||
// distribution-spec conformance and consumers like stargz-snapshotter that
|
||||
// require a non-empty, well-formed media type.
|
||||
func resolveBlobResponseMediaType(
|
||||
imgStore storageTypes.ImageStore,
|
||||
repo string,
|
||||
digest godigest.Digest,
|
||||
logger log.Logger,
|
||||
) string {
|
||||
desc, err := storageCommon.GetBlobDescriptorFromRepo(imgStore, repo, digest, logger)
|
||||
if err == nil && desc.MediaType != "" {
|
||||
// Descriptor media types originate from manifest JSON and are not
|
||||
// necessarily validated. Ensure we only emit a header-safe, parseable
|
||||
// media type; otherwise fall back to application/octet-stream.
|
||||
//
|
||||
// ParseMediaType also strips parameters so we only propagate the base
|
||||
// type (e.g. "application/vnd.oci.image.layer.v1.tar+gzip").
|
||||
mediaType, _, parseErr := mime.ParseMediaType(desc.MediaType)
|
||||
if parseErr == nil && mediaType != "" {
|
||||
return mediaType
|
||||
}
|
||||
}
|
||||
|
||||
return constants.BinaryMediaType
|
||||
}
|
||||
|
||||
// CheckBlob godoc
|
||||
// @Summary Check image blob/layer
|
||||
// @Description Check an image's blob/layer given a digest
|
||||
@@ -1118,6 +1149,7 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re
|
||||
|
||||
response.Header().Set("Content-Length", strconv.FormatInt(blen, 10))
|
||||
response.Header().Set("Accept-Ranges", "bytes")
|
||||
response.Header().Set("Content-Type", resolveBlobResponseMediaType(imgStore, name, digest, rh.c.Log))
|
||||
response.Header().Set(constants.DistContentDigestKey, digest.String())
|
||||
response.WriteHeader(http.StatusOK)
|
||||
}
|
||||
@@ -1133,18 +1165,6 @@ func (r httpRange) length() int64 {
|
||||
return r.end - r.start + 1
|
||||
}
|
||||
|
||||
type blobRangeReader struct {
|
||||
httpRange
|
||||
|
||||
reader io.ReadCloser
|
||||
}
|
||||
|
||||
func closeRangeReaders(rangeReaders []blobRangeReader) {
|
||||
for _, rangeReader := range rangeReaders {
|
||||
_ = rangeReader.reader.Close()
|
||||
}
|
||||
}
|
||||
|
||||
/* parseRangeHeader validates the "Range" HTTP header and returns normalized byte ranges. */
|
||||
func parseRangeHeader(contentRange string, size int64) ([]httpRange, error) {
|
||||
if size <= 0 || !strings.HasPrefix(contentRange, "bytes=") {
|
||||
@@ -1250,35 +1270,152 @@ func coalesceRanges(ranges []httpRange) []httpRange {
|
||||
return coalesced
|
||||
}
|
||||
|
||||
func writeMultipartRanges(response http.ResponseWriter, ranges []blobRangeReader, bsize int64,
|
||||
// openRangeFunc lazily opens one range reader at a time for the
|
||||
// multipart streaming goroutine. The reader must supply at least
|
||||
// r.length() bytes from the start of r; writeMultipartRanges copies
|
||||
// exactly that many per part and ignores any surplus. A short reader
|
||||
// truncates the already-flushed 206 response.
|
||||
type openRangeFunc func(r httpRange) (io.ReadCloser, error)
|
||||
|
||||
// multipartByterangesContentType is the media type of the response body
|
||||
// produced by writeMultipartRanges, modulo the boundary parameter.
|
||||
const multipartByterangesContentType = "multipart/byteranges"
|
||||
|
||||
// computeMultipartBodyLength returns the exact wire length of the
|
||||
// multipart/byteranges body we will emit for these ranges, used to
|
||||
// advertise Content-Length on the response. We run a real
|
||||
// mime/multipart Writer against a counting sink rather than
|
||||
// hard-coding the wire format, so the answer stays correct if the
|
||||
// stdlib ever tweaks header ordering or whitespace.
|
||||
func computeMultipartBodyLength(ranges []httpRange, mediaType, boundary string, size int64) int64 {
|
||||
var counter byteCountingWriter
|
||||
|
||||
writer := multipart.NewWriter(&counter)
|
||||
if err := writer.SetBoundary(boundary); err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
for _, rng := range ranges {
|
||||
partHeader := buildMultipartPartHeader(rng, mediaType, size)
|
||||
if _, err := writer.CreatePart(partHeader); err != nil {
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
if err := writer.Close(); err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
total := counter.n
|
||||
for _, rng := range ranges {
|
||||
total += rng.length()
|
||||
}
|
||||
|
||||
return total
|
||||
}
|
||||
|
||||
func buildMultipartPartHeader(rng httpRange, mediaType string, size int64) textproto.MIMEHeader {
|
||||
partHeader := textproto.MIMEHeader{}
|
||||
partHeader.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", rng.start, rng.end, size))
|
||||
|
||||
// RFC 9110 §15.3.7 lets us omit per-part Content-Type, but when the
|
||||
// caller has resolved a real media type (descriptor lookup succeeded
|
||||
// or fell back to application/octet-stream) we propagate it so OCI
|
||||
// clients don't have to re-derive it from the index for every part.
|
||||
if mediaType != "" {
|
||||
partHeader.Set("Content-Type", mediaType)
|
||||
}
|
||||
|
||||
return partHeader
|
||||
}
|
||||
|
||||
type byteCountingWriter struct{ n int64 }
|
||||
|
||||
func (c *byteCountingWriter) Write(p []byte) (int, error) {
|
||||
c.n += int64(len(p))
|
||||
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// writeMultipartRanges streams a multipart/byteranges 206 response.
|
||||
//
|
||||
// The response advertises a precomputed Content-Length and opens range
|
||||
// readers lazily — one at a time, inside a producer goroutine writing
|
||||
// to an io.Pipe. Compared with the previous fan-out version, this
|
||||
// halves the worst-case file-descriptor / read-buffer footprint when
|
||||
// multiple ranges of a large blob are requested.
|
||||
//
|
||||
// Trade-off: lazy opening means the response status (206) and headers
|
||||
// have already been flushed by the time we attempt to read range bodies.
|
||||
// Per-range read errors mid-stream therefore manifest as a hard-closed
|
||||
// connection (via pipe.CloseWithError) and a truncated body — they
|
||||
// cannot be turned into a 5xx. Errors that originate from the storage
|
||||
// layer's metadata path (e.g. a deleted blob) would historically have
|
||||
// produced a 4xx; under this design they too truncate. The 16-range
|
||||
// cap and coalesceRanges already bound the worst case, and the eager
|
||||
// CheckBlob earlier in GetBlob still rejects the obvious "blob does
|
||||
// not exist" case before we get here.
|
||||
func writeMultipartRanges(
|
||||
response http.ResponseWriter,
|
||||
ranges []httpRange,
|
||||
bsize int64,
|
||||
mediaType string,
|
||||
openRange openRangeFunc,
|
||||
logger log.Logger,
|
||||
) {
|
||||
writer := multipart.NewWriter(response)
|
||||
defer func() {
|
||||
if err := writer.Close(); err != nil {
|
||||
logger.Error().Err(err).Msg("failed to close multipart range response")
|
||||
}
|
||||
}()
|
||||
pipeReader, pipeWriter := io.Pipe()
|
||||
defer func() { _ = pipeReader.Close() }()
|
||||
|
||||
response.Header().Set("Content-Type", "multipart/byteranges; boundary="+writer.Boundary())
|
||||
writer := multipart.NewWriter(pipeWriter)
|
||||
|
||||
contentLength := computeMultipartBodyLength(ranges, mediaType, writer.Boundary(), bsize)
|
||||
|
||||
response.Header().Set("Content-Type", multipartByterangesContentType+"; boundary="+writer.Boundary())
|
||||
response.Header().Set("Content-Length", strconv.FormatInt(contentLength, 10))
|
||||
response.WriteHeader(http.StatusPartialContent)
|
||||
|
||||
for _, rangeReader := range ranges {
|
||||
partHeader := textproto.MIMEHeader{}
|
||||
partHeader.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", rangeReader.start, rangeReader.end, bsize))
|
||||
go func() {
|
||||
// CloseWithError(nil) is documented to be equivalent to Close(),
|
||||
// so the success path ends the pipe cleanly with EOF.
|
||||
var pipeErr error
|
||||
defer func() { _ = pipeWriter.CloseWithError(pipeErr) }()
|
||||
|
||||
part, err := writer.CreatePart(partHeader)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("failed to create multipart range response")
|
||||
for _, rng := range ranges {
|
||||
var part io.Writer
|
||||
|
||||
return
|
||||
part, pipeErr = writer.CreatePart(buildMultipartPartHeader(rng, mediaType, bsize))
|
||||
if pipeErr != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var reader io.ReadCloser
|
||||
|
||||
reader, pipeErr = openRange(rng)
|
||||
if pipeErr != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, copyErr := io.CopyN(part, reader, rng.length())
|
||||
closeErr := reader.Close()
|
||||
|
||||
if copyErr != nil {
|
||||
pipeErr = copyErr
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if closeErr != nil {
|
||||
pipeErr = closeErr
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := io.Copy(part, rangeReader.reader); err != nil {
|
||||
logger.Error().Err(err).Msg("failed to copy range into multipart response")
|
||||
pipeErr = writer.Close()
|
||||
}()
|
||||
|
||||
return
|
||||
}
|
||||
if _, err := io.CopyN(response, pipeReader, contentLength); err != nil {
|
||||
logger.Error().Err(err).Msg("failed to copy multipart range data into http response")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1314,8 +1451,6 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ
|
||||
|
||||
digest := godigest.Digest(digestStr)
|
||||
|
||||
mediaType := request.Header.Get("Accept")
|
||||
|
||||
contentRange := request.Header.Get("Range")
|
||||
_, rangeHeaderPresent := request.Header["Range"]
|
||||
|
||||
@@ -1354,6 +1489,10 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ
|
||||
return
|
||||
}
|
||||
|
||||
// Resolve the response Content-Type from the blob's OCI descriptor (if
|
||||
// any), with a fallback to application/octet-stream.
|
||||
mediaType := resolveBlobResponseMediaType(imgStore, name, digest, rh.c.Log)
|
||||
|
||||
ranges, err := parseRangeHeader(contentRange, bsize)
|
||||
if err != nil {
|
||||
response.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", bsize))
|
||||
@@ -1362,44 +1501,53 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ
|
||||
return
|
||||
}
|
||||
|
||||
rangeReaders := make([]blobRangeReader, 0, len(ranges))
|
||||
defer func() { closeRangeReaders(rangeReaders) }()
|
||||
if len(ranges) > 1 {
|
||||
response.Header().Set(constants.DistContentDigestKey, digest.String())
|
||||
|
||||
for _, httpRange := range ranges {
|
||||
repo, blen, _, err := imgStore.GetBlobPartial(name, digest, mediaType, httpRange.start, httpRange.end)
|
||||
if err != nil {
|
||||
writeBlobError(err)
|
||||
// Multipart: lazy opener invoked one range at a time inside the
|
||||
// streaming goroutine. We do not pre-verify the per-range length
|
||||
// here; once the 206 headers are flushed there's no way to turn
|
||||
// a mismatch into a 5xx, so writeMultipartRanges relies on
|
||||
// io.CopyN to enforce it on the wire.
|
||||
writeMultipartRanges(response, ranges, bsize, mediaType,
|
||||
func(rng httpRange) (io.ReadCloser, error) {
|
||||
reader, _, _, err := imgStore.GetBlobPartial(name, digest, mediaType, rng.start, rng.end)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if blen != httpRange.length() {
|
||||
_ = repo.Close()
|
||||
|
||||
rh.c.Log.Error().
|
||||
Int64("expected", httpRange.length()).
|
||||
Int64("actual", blen).
|
||||
Msg("unexpected partial blob length")
|
||||
response.WriteHeader(http.StatusInternalServerError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
rangeReaders = append(rangeReaders, blobRangeReader{httpRange: httpRange, reader: repo})
|
||||
}
|
||||
|
||||
response.Header().Set(constants.DistContentDigestKey, digest.String())
|
||||
|
||||
if len(rangeReaders) > 1 {
|
||||
writeMultipartRanges(response, rangeReaders, bsize, rh.c.Log)
|
||||
return reader, err
|
||||
},
|
||||
rh.c.Log,
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
rangeReader := rangeReaders[0]
|
||||
response.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", rangeReader.start, rangeReader.end, bsize))
|
||||
// Single range: eager open + length sanity check + stream. Headers
|
||||
// haven't been flushed yet so we can still return a 5xx if the
|
||||
// storage layer hands us a reader with the wrong size.
|
||||
rng := ranges[0]
|
||||
|
||||
reader, blen, _, err := imgStore.GetBlobPartial(name, digest, mediaType, rng.start, rng.end)
|
||||
if err != nil {
|
||||
writeBlobError(err)
|
||||
|
||||
return
|
||||
}
|
||||
defer func() { _ = reader.Close() }()
|
||||
|
||||
if blen != rng.length() {
|
||||
rh.c.Log.Error().
|
||||
Int64("expected", rng.length()).
|
||||
Int64("actual", blen).
|
||||
Msg("unexpected partial blob length")
|
||||
response.WriteHeader(http.StatusInternalServerError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
response.Header().Set(constants.DistContentDigestKey, digest.String())
|
||||
response.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", rng.start, rng.end, bsize))
|
||||
WriteDataFromReader(
|
||||
response, http.StatusPartialContent, rangeReader.length(), mediaType, rangeReader.reader, rh.c.Log,
|
||||
response, http.StatusPartialContent, rng.length(), mediaType, reader, rh.c.Log,
|
||||
)
|
||||
|
||||
return
|
||||
@@ -1409,6 +1557,12 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ
|
||||
|
||||
var blen int64
|
||||
|
||||
// Resolve the response Content-Type from the blob's OCI descriptor
|
||||
// (if any), with a fallback to application/octet-stream. This lookup
|
||||
// may require an additional repo index/manifest walk before we read
|
||||
// the blob, but preserves a more specific Content-Type when available.
|
||||
mediaType := resolveBlobResponseMediaType(imgStore, name, digest, rh.c.Log)
|
||||
|
||||
repo, blen, err := imgStore.GetBlob(name, digest, mediaType)
|
||||
if err != nil {
|
||||
writeBlobError(err)
|
||||
@@ -1421,7 +1575,6 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ
|
||||
response.Header().Set("Content-Length", strconv.FormatInt(blen, 10))
|
||||
response.Header().Set(constants.DistContentDigestKey, digest.String())
|
||||
|
||||
// return the blob data
|
||||
WriteDataFromReader(response, http.StatusOK, blen, mediaType, repo, rh.c.Log)
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user