diff --git a/docs/image-stream/README.md b/docs/image-stream/README.md index e80dd9c5..c3aaa0cf 100644 --- a/docs/image-stream/README.md +++ b/docs/image-stream/README.md @@ -47,26 +47,64 @@ This size is available in the manifest as shown below: } ``` +**Temporary storage and verification:** +- Blobs are initially downloaded to a temporary location separate from the actual repository storage +- This allows for streaming to clients while download is in progress +- Once fully downloaded, the blob digest is verified against the expected digest +- Only after successful verification is the blob copied/moved to the actual repository storage +- This ensures that partial or corrupted blobs never pollute the repository storage + Sparse storage of OCI images would be supported in zot temp store for storing manifests and partially copied blobs. ### Example blob download flow 1. Client issues a request for a blob `GET https://zothub.io/v2/golang/blobs/sha256:8ec9f1fd1cf4f5152e86d09c28013ce076b8c09d3a9f5850591be40273ff877e` -2. zot checks for the blob locally, but it is not present. -3. Due to on-demand sync, zot creates a `ChunkedImageCopier` that copies the blob from a regclient `blob.Reader` to zot temp storage. regclient has a `BlobGet` method which returns a `blob.Reader` instance. [documentation](https://pkg.go.dev/github.com/regclient/regclient@v0.11.1#RegClient.BlobGet) -4. The `ChunkedImageCopier` calculates the number of chunks and begins the download to zot temp storage. -5. The client (currently open HTTP connection) has an associated `InFlightImageCopier` object that tracks at a per-object level, the number of chunks copied. It opens the temporary file where the image is being written to and registers a channel with the `ChunkedImageCopier` which announces over the go channel, the latest chunk number at the time of registration/subscription and every time a new chunk has been copied to disk. +2. **zot checks if the blob already exists in its storage:** + - **2.1 If yes:** zot serves the blob directly from storage (normal flow, no streaming needed) + - **2.2 If no:** zot proceeds with streaming sync as follows +3. Due to on-demand sync, zot creates a `BlobStreamer` (renamed from `ChunkedImageCopier`) that copies the blob from a regclient `blob.Reader` to a **temporary location** in zot temp storage. regclient has a `BlobGet` method which returns a `blob.Reader` instance. [documentation](https://pkg.go.dev/github.com/regclient/regclient@v0.11.1#RegClient.BlobGet) +4. The `BlobStreamer` calculates the number of chunks and begins the download to the temporary location in zot temp storage. +5. The client (currently open HTTP connection) has an associated `InFlightImageCopier` object that tracks at a per-object level, the number of chunks copied. It opens the temporary file where the image is being written to and registers a channel with the `BlobStreamer` which announces over the go channel, the latest chunk number at the time of registration/subscription and every time a new chunk has been copied to disk. 6. The `InFlightImageCopier` receives the value from the channel and copies `(latestChunkNumber - numChunksCopied) * chunkSize` bytes from the open file descriptor to the connection's `io.Writer` implementation until all the chunks are copied. -7. The `InFlightImageCopier` holds the connection and channel active until all the bytes are copied. If the client connection terminates during the copy, the channel is de-registered and closed. +7. The `InFlightImageCopier` holds the connection and channel active until all the bytes are copied. **If the client connection terminates during the copy, the channel is de-registered and closed. The `BlobStreamer` continues downloading to temp storage for other clients.** +8. **Once the blob is fully downloaded and verified (digest matches), the `BlobStreamer` copies it from the temporary location to the actual repository storage.** Future requests for this blob will be served directly from storage (step 2.1). Any new clients joining in during the copy will follow the same steps from 5 onwards. As many chunks as available would be copied from the disk. Once that is complete, the `InFlightImageCopier` will wait for announcements over the channel to continue copying bytes until all chunks are copied. +#### Handling client disconnections + +When a client disconnects: +- The `InFlightImageCopier` detects the error when writing to `io.Writer` +- It unsubscribes from the `BlobStreamer` channel and closes its channel +- The `BlobStreamer` continues downloading for remaining clients +- If no clients remain, the download continues to completion to avoid wasting partial work + +#### Handling upstream timeouts + +When upstream connection times out: +- The `BlobStreamer` detects the error from the `blob.Reader` +- All subscribed clients are notified of the error via their channels +- Clients receive an error response +- The partial blob in temp storage can be cleaned up or retained for retry +- On retry, if partial blob exists and upstream supports range requests, resume from last successful chunk + +#### Handling range requests + +HTTP range requests allow clients to request specific byte ranges of a blob: +- When a client makes a range request (e.g., `Range: bytes=1000-2000`), the `InFlightImageCopier` must: + 1. Calculate which chunks contain the requested byte range + 2. Wait for those specific chunks to be available from the `BlobStreamer` + 3. Seek to the correct offset in the temp file and read only the requested bytes + 4. Respect the `Range` header and respond with status `206 Partial Content` +- If the requested range is not yet available, wait for the `BlobStreamer` to download those chunks +- Multiple clients can request different ranges of the same blob concurrently + ### Scaling up to images -For an image with multiple layers, zot can download multiple layers simultaneously and make available, one `ChunkedImageCopier` for each blob being downloaded. +For an image with multiple layers, zot can download multiple layers simultaneously and make available, one `BlobStreamer` for each blob being downloaded. Clients are added on as they request. -For completed blobs, the `ChunkedImageCopier` can announce the final chunk number upon registration. +For completed blobs, the `BlobStreamer` can announce the final chunk number upon registration and serve from the actual repository storage. Manifests are not subject to this flow as they are a pre-requisite for streamed blob downloads. They would follow the usual flow where zot downloads first and then responds to the client. @@ -79,10 +117,18 @@ Manifests are not subject to this flow as they are a pre-requisite for streamed 1. Each client holds an open file descriptor to the temp file where the blob is being written to. If the number of clients are very high, it could result in a too many file descriptors open error. 2. Download speeds for the client would be impacted by the configured chunk size. -3. There are a lot of checks in regclient during image Copy which won't work if zot directly accesses the `blob.Reader`. This may need some discussion to ensure that access to completed image once all the blobs are streamed is sane. +3. There are a lot of checks in regclient during image Copy which won't work if zot directly accesses the `blob.Reader`. This may need some discussion to ensure that access to completed image once all the blobs are streamed is sane. +4. **Additional disk space required:** Blobs exist in both temporary storage (during download) and repository storage (after verification), temporarily doubling space requirements. +5. **Complexity of error handling:** Need to handle upstream timeouts, client disconnections, and partial downloads gracefully across multiple concurrent clients. +6. **Range request complexity:** Supporting HTTP range requests adds complexity to track which chunks are needed for specific byte ranges. ### Proof of concept -The `main.go` file in this directory has a mock sample of a blob download where characters in a buffer go through a simulated download into a file called `ondiskblob.txt` which represents an OCI blob being written to disk. 2 sample clients are used - 1 writing to a text file `client1.txt` and another writing to stdout. +The `main.go` file in this directory has a mock sample of a blob download where characters in a buffer go through a simulated download into a file called `ondiskblob.txt` which represents an OCI blob being written to a temporary disk location. 2 sample clients are used - 1 writing to a text file `client1.txt` and another writing to stdout. -Running the program with `go run main.go` should result in lorem ipsum text being gradually written to 3 places - the 2 text files and stdout. +The proof of concept demonstrates: +- The `BlobStreamer` writing to a temp location +- Multiple clients (`InFlightImageCopier`) reading from the temp file as chunks become available +- Clients joining at different times during the download + +Running the program with `go run main.go` should result in lorem ipsum text being gradually written to 3 places - the ondiskblob.txt temp file, client1.txt, and stdout. diff --git a/docs/image-stream/main.go b/docs/image-stream/main.go index 67a10291..3601c127 100644 --- a/docs/image-stream/main.go +++ b/docs/image-stream/main.go @@ -52,14 +52,15 @@ func (snr *simulatedNetworkReader) Read(b []byte) (n int, err error) { // InFlightImageCopier represents a client that wants to stream an image while it is being written to disk. // The data is copied first from disk up to the latest chunk and further copies wait for an announcement // over a channel when a new chunk has been written to disk. +// If the client connection is lost, the channel is de-registered but the BlobStreamer continues. type InFlightImageCopier struct { numChunksCopied int - source *ChunkedImageCopier + source *BlobStreamer dest io.Writer sync.Mutex } -func NewInFlightImageCopier(source *ChunkedImageCopier, dest io.Writer) *InFlightImageCopier { +func NewInFlightImageCopier(source *BlobStreamer, dest io.Writer) *InFlightImageCopier { return &InFlightImageCopier{ numChunksCopied: 0, source: source, @@ -111,10 +112,12 @@ func (ific *InFlightImageCopier) Copy() (err error) { return nil } -// ChunkedImageCopier is a helper that splits an image into chunks based on chunkSize -// It then copies chunks to disk. +// BlobStreamer (renamed from ChunkedImageCopier) is a writer to a temp location with many readers (clients). +// It splits a blob into chunks based on chunkSize and copies chunks to a temporary disk location. // The latest chunk number is announced to channels of subscribers. -type ChunkedImageCopier struct { +// Multiple clients can read from the temp file as chunks become available. +// Once fully downloaded and verified, the blob would be copied to actual repository storage. +type BlobStreamer struct { numChunksTotal int numChunksOnDisk int @@ -125,8 +128,8 @@ type ChunkedImageCopier struct { numClientsTotal int } -func NewChunkedImageCopier(destFilePath string, r io.Reader, numChunksTotal int) *ChunkedImageCopier { - return &ChunkedImageCopier{ +func NewBlobStreamer(destFilePath string, r io.Reader, numChunksTotal int) *BlobStreamer { + return &BlobStreamer{ numChunksTotal: numChunksTotal, onDiskPath: destFilePath, inFlightReader: r, @@ -136,34 +139,35 @@ func NewChunkedImageCopier(destFilePath string, r io.Reader, numChunksTotal int) // Everytime a new client is interested in the current blob, the client would create a subscription // here with a channel where latest chunk info is sent. -func (cic *ChunkedImageCopier) Subscribe(channel chan int) int { - cic.clientMu.Lock() - defer cic.clientMu.Unlock() +func (bs *BlobStreamer) Subscribe(channel chan int) int { + bs.clientMu.Lock() + defer bs.clientMu.Unlock() - cic.clients[cic.numClientsTotal] = channel - chanId := cic.numClientsTotal - cic.numClientsTotal++ + bs.clients[bs.numClientsTotal] = channel + chanId := bs.numClientsTotal + bs.numClientsTotal++ // Announce the current number of available chunks // TODO: should probably use a mutex lock here. go func() { - channel <- cic.numChunksOnDisk + channel <- bs.numChunksOnDisk }() return chanId } -func (cic *ChunkedImageCopier) Unsubscribe(id int) { - cic.clientMu.Lock() - defer cic.clientMu.Unlock() +func (bs *BlobStreamer) Unsubscribe(id int) { + bs.clientMu.Lock() + defer bs.clientMu.Unlock() - delete(cic.clients, id) + delete(bs.clients, id) } // Starts writing content from inFlightReader to disk while updating clients -func (cic *ChunkedImageCopier) Transfer() { +// Continues even if clients disconnect to avoid wasting partial work +func (bs *BlobStreamer) Transfer() { log.Println("starting writer") - outputFile, err := os.OpenFile(cic.onDiskPath, os.O_WRONLY|os.O_CREATE, 0o644) + outputFile, err := os.OpenFile(bs.onDiskPath, os.O_WRONLY|os.O_CREATE, 0o644) if err != nil { log.Printf("failed to open write file: %s\n", err.Error()) os.Exit(1) @@ -172,9 +176,9 @@ func (cic *ChunkedImageCopier) Transfer() { var wg sync.WaitGroup - for cic.numChunksOnDisk < cic.numChunksTotal { + for bs.numChunksOnDisk < bs.numChunksTotal { // simulates writing network resp body into a blob file with delay - _, err = io.CopyN(outputFile, cic.inFlightReader, chunkSizeBytes) + _, err = io.CopyN(outputFile, bs.inFlightReader, chunkSizeBytes) if err != nil { if !errors.Is(err, io.EOF) { log.Printf("failed to copy bytes: %s\n", err.Error()) @@ -182,22 +186,26 @@ func (cic *ChunkedImageCopier) Transfer() { } } - cic.numChunksOnDisk++ - cic.clientMu.Lock() + bs.numChunksOnDisk++ + bs.clientMu.Lock() // Update all clients about the new chunk // Clients always read the chunk from disk - for _, c := range cic.clients { + for _, c := range bs.clients { wg.Go(func() { - c <- cic.numChunksOnDisk + c <- bs.numChunksOnDisk }) } - cic.clientMu.Unlock() + bs.clientMu.Unlock() } wg.Wait() log.Println("closing writer") + // In actual implementation, after this point: + // 1. Verify blob digest + // 2. Copy from temp location to actual repository storage + // 3. Clean up temp file } func chunkCountForBuffer(b []byte) int { @@ -217,7 +225,8 @@ func main() { r := NewSimulatedNetworkReader(buff) - msf := NewChunkedImageCopier("ondiskblob.txt", r, chunkCountForBuffer(buff)) + // BlobStreamer writes to a temp location (ondiskblob.txt represents temp storage) + blobStreamer := NewBlobStreamer("ondiskblob.txt", r, chunkCountForBuffer(buff)) // client1.txt simulates an HTTP client receiving data over the network client1File, err := os.OpenFile("client1.txt", os.O_WRONLY|os.O_CREATE, 0o644) @@ -226,15 +235,15 @@ func main() { os.Exit(1) } - client1 := NewInFlightImageCopier(msf, client1File) + client1 := NewInFlightImageCopier(blobStreamer, client1File) // stdout is also used as a client and simulates another client interested in the same blob - client2 := NewInFlightImageCopier(msf, os.Stdout) + client2 := NewInFlightImageCopier(blobStreamer, os.Stdout) var wg sync.WaitGroup - // Simulates the network transfer starting first - wg.Go(msf.Transfer) + // Simulates the network transfer starting first (BlobStreamer downloading from upstream) + wg.Go(blobStreamer.Transfer) time.Sleep(10 * time.Millisecond)