mirror of
https://github.com/project-zot/zot.git
synced 2026-06-17 21:17:58 +08:00
feat(sync): address review comments
Signed-off-by: Vishwas Rajashekar <dev@vrajashkr.com>
This commit is contained in:
@@ -1176,6 +1176,12 @@ func (rh *RouteHandler) getBlobInfoFromStreamCache(digest string, response http.
|
||||
// when streaming is enabled, the blob might exist in the stream cache
|
||||
blobSize, blobMediaType, err := streamMgr.CachedBlobInfo(digest)
|
||||
if err != nil {
|
||||
if errors.Is(err, zerr.ErrBlobNotFound) {
|
||||
rh.c.Log.Debug().Str("digest", digest).Msg("blob not found in stream cache")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
rh.c.Log.Error().Err(err).Str("digest", digest).Msg("failed to check stream cache for blob existence")
|
||||
|
||||
return err
|
||||
|
||||
@@ -27,6 +27,7 @@ func TestEnableSyncExtension_StreamManager(t *testing.T) {
|
||||
metaDB := mocks.MetaDBMock{}
|
||||
storeController := storage.StoreController{}
|
||||
metrics := monitoring.NewMetricsServer(false, logger)
|
||||
t.Cleanup(metrics.Stop)
|
||||
sch := scheduler.NewScheduler(cfg, metrics, logger)
|
||||
|
||||
Convey("stream manager is nil when Stream is not set on any registry", func() {
|
||||
|
||||
@@ -25,11 +25,11 @@ type ChunkedBlobReader struct {
|
||||
onDiskPath string
|
||||
onDiskFile *os.File
|
||||
|
||||
inFlightReader *blob.BReader
|
||||
clientMu sync.RWMutex
|
||||
clientCond *sync.Cond
|
||||
clients map[int]chan int64
|
||||
numClientsTotal int
|
||||
inFlightReader *blob.BReader
|
||||
clientMu sync.RWMutex
|
||||
clientCond *sync.Cond
|
||||
clients map[int]chan int64
|
||||
nextClientId int
|
||||
|
||||
logger log.Logger
|
||||
}
|
||||
@@ -108,7 +108,7 @@ func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) {
|
||||
cbr.Unsubscribe(clientId)
|
||||
}
|
||||
|
||||
return -1, err
|
||||
return n, err
|
||||
}
|
||||
// partial read at end of stream; normalise to EOF for callers
|
||||
err = io.EOF
|
||||
@@ -119,7 +119,7 @@ func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) {
|
||||
cbr.logger.Error().Err(werr).Msg("failed to write blob data to disk")
|
||||
cbr.bytesMu.Unlock()
|
||||
|
||||
return -1, werr
|
||||
return n, werr
|
||||
}
|
||||
|
||||
cbr.numBytesReadToDisk += int64(n)
|
||||
@@ -177,9 +177,9 @@ func (cbr *ChunkedBlobReader) Subscribe() (chan int64, int) {
|
||||
|
||||
channel := make(chan int64, 1)
|
||||
|
||||
cbr.clients[cbr.numClientsTotal] = channel
|
||||
chanId := cbr.numClientsTotal
|
||||
cbr.numClientsTotal++
|
||||
cbr.clients[cbr.nextClientId] = channel
|
||||
chanId := cbr.nextClientId
|
||||
cbr.nextClientId++
|
||||
|
||||
cbr.bytesMu.RLock()
|
||||
defer cbr.bytesMu.RUnlock()
|
||||
@@ -204,8 +204,6 @@ func (cbr *ChunkedBlobReader) Unsubscribe(clientId int) {
|
||||
channel, ok := cbr.clients[clientId]
|
||||
if ok {
|
||||
close(channel)
|
||||
|
||||
cbr.numClientsTotal--
|
||||
delete(cbr.clients, clientId)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,9 +224,8 @@ func TestRead(t *testing.T) {
|
||||
errCBR.InitReader(errReader, errReader.GetDescriptor())
|
||||
|
||||
buf := make([]byte, 50)
|
||||
n, readErr := errCBR.Read(buf)
|
||||
_, readErr := errCBR.Read(buf)
|
||||
So(readErr, ShouldNotBeNil)
|
||||
So(n, ShouldEqual, -1)
|
||||
|
||||
// Channel should have been closed.
|
||||
_, open := <-bytesUpdateChan
|
||||
|
||||
@@ -7,22 +7,15 @@ load helpers_wait
|
||||
load ../port_helper
|
||||
|
||||
function verify_prerequisites() {
|
||||
if [ ! $(command -v curl) ]; then
|
||||
echo "you need to install curl as a prerequisite to running the tests" >&3
|
||||
return 1
|
||||
fi
|
||||
local ok=0
|
||||
for cmd in curl jq skopeo; do
|
||||
if ! command -v "${cmd}" &>/dev/null; then
|
||||
echo "you need to install ${cmd} as a prerequisite to running the tests" >&3
|
||||
ok=1
|
||||
fi
|
||||
done
|
||||
|
||||
if [ ! $(command -v jq) ]; then
|
||||
echo "you need to install jq as a prerequisite to running the tests" >&3
|
||||
return 1
|
||||
fi
|
||||
|
||||
if [ ! $(command -v skopeo) ]; then
|
||||
echo "you need to install skopeo as a prerequisite to running the tests" >&3
|
||||
return 1
|
||||
fi
|
||||
|
||||
return 0
|
||||
return "${ok}"
|
||||
}
|
||||
|
||||
# delete_repo_from_zot <port> <repo> <tag> <root>
|
||||
@@ -45,7 +38,7 @@ function delete_repo_from_zot() {
|
||||
curl -s -X DELETE "http://127.0.0.1:${port}/v2/${repo}/manifests/${digest}" >/dev/null
|
||||
|
||||
# delete blobs from disk
|
||||
rm -r "${root}/${repo}/blobs"
|
||||
rm -rf "${root}/${repo}/blobs"
|
||||
}
|
||||
|
||||
function setup_file() {
|
||||
@@ -125,7 +118,7 @@ EOF
|
||||
local upstream_bin="${BATS_FILE_TMPDIR}/${zot_bin_name}"
|
||||
if [ ! -f "${upstream_bin}" ]; then
|
||||
if ! curl -f -L -o "${upstream_bin}" \
|
||||
"https://github.com/project-zot/zot/releases/latest/download/${zot_bin_name}"; then
|
||||
"https://github.com/project-zot/zot/releases/download/v2.1.17/${zot_bin_name}"; then
|
||||
echo "ERROR: failed to download upstream zot release binary" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
Reference in New Issue
Block a user