copy a range of the chunk data, fix offset size in copied chunks

This commit is contained in:
chrislu 2025-03-26 15:14:02 -07:00
parent 7435f1623a
commit 74898c9bf8

View File

@ -3,7 +3,6 @@ package s3api
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
@ -449,7 +448,7 @@ func (s3a *S3ApiServer) copySingleChunk(chunk *filer_pb.FileChunk, dstPath strin
} }
// Download and upload the chunk // Download and upload the chunk
if err := s3a.transferChunkData(srcUrl, assignResult); err != nil { if err := s3a.transferChunkData(srcUrl, assignResult, chunk.Offset, int64(chunk.Size)); err != nil {
return nil, fmt.Errorf("transfer chunk data: %v", err) return nil, fmt.Errorf("transfer chunk data: %v", err)
} }
@ -483,20 +482,20 @@ func (s3a *S3ApiServer) assignNewVolume(dstPath string) (*filer_pb.AssignVolumeR
return assignResult, nil return assignResult, nil
} }
// transferChunkData downloads the chunk from source and uploads it to destination // transferChunkData downloads a portion of the chunk from source and uploads it to destination
func (s3a *S3ApiServer) transferChunkData(srcUrl string, assignResult *filer_pb.AssignVolumeResponse) error { func (s3a *S3ApiServer) transferChunkData(srcUrl string, assignResult *filer_pb.AssignVolumeResponse, offset, size int64) error {
dstUrl := fmt.Sprintf("http://%s/%s", assignResult.Location.Url, assignResult.FileId) dstUrl := fmt.Sprintf("http://%s/%s", assignResult.Location.Url, assignResult.FileId)
_, _, resp, err := util_http.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false)) // Read the chunk data using ReadUrlAsStream
var chunkData []byte
shouldRetry, err := util_http.ReadUrlAsStream(srcUrl, nil, false, false, offset, int(size), func(data []byte) {
chunkData = append(chunkData, data...)
})
if err != nil { if err != nil {
return fmt.Errorf("download chunk: %v", err) return fmt.Errorf("download chunk: %v", err)
} }
if shouldRetry {
// Read response body into byte slice return fmt.Errorf("download chunk: retry needed")
chunkData, err := io.ReadAll(resp.Body)
util_http.CloseResponse(resp)
if err != nil {
return fmt.Errorf("read chunk data: %v", err)
} }
// Upload chunk to new location // Upload chunk to new location
@ -545,26 +544,16 @@ func parseRangeHeader(rangeHeader string) (startOffset, endOffset int64, err err
// copyChunksForRange copies chunks that overlap with the specified range // copyChunksForRange copies chunks that overlap with the specified range
func (s3a *S3ApiServer) copyChunksForRange(entry *filer_pb.Entry, startOffset, endOffset int64, dstPath string) ([]*filer_pb.FileChunk, error) { func (s3a *S3ApiServer) copyChunksForRange(entry *filer_pb.Entry, startOffset, endOffset int64, dstPath string) ([]*filer_pb.FileChunk, error) {
var relevantChunks []*filer_pb.FileChunk var relevantChunks []*filer_pb.FileChunk
var currentOffset int64
// Find chunks that overlap with the range // Find chunks that overlap with the range
for _, chunk := range entry.GetChunks() { for _, chunk := range entry.GetChunks() {
chunkEnd := currentOffset + int64(chunk.Size) maxStart := max(startOffset, chunk.Offset)
if chunkEnd > startOffset && currentOffset <= endOffset { minStop := min(chunk.Offset+int64(chunk.Size), endOffset+1)
// Calculate the portion of the chunk to copy if maxStart < minStop {
chunkStart := currentOffset
chunkSize := int64(chunk.Size)
if chunkStart < startOffset {
chunkStart = startOffset
}
if chunkEnd > endOffset {
chunkSize = endOffset - chunkStart + 1
}
// Create a new chunk with adjusted offset and size // Create a new chunk with adjusted offset and size
newChunk := &filer_pb.FileChunk{ newChunk := &filer_pb.FileChunk{
Offset: chunkStart - startOffset, // Adjust offset relative to range start Offset: maxStart,
Size: uint64(chunkSize), Size: uint64(minStop - maxStart),
ModifiedTsNs: time.Now().UnixNano(), ModifiedTsNs: time.Now().UnixNano(),
ETag: chunk.ETag, ETag: chunk.ETag,
IsCompressed: chunk.IsCompressed, IsCompressed: chunk.IsCompressed,
@ -572,12 +561,11 @@ func (s3a *S3ApiServer) copyChunksForRange(entry *filer_pb.Entry, startOffset, e
} }
relevantChunks = append(relevantChunks, newChunk) relevantChunks = append(relevantChunks, newChunk)
} }
currentOffset += int64(chunk.Size)
} }
// Copy the relevant chunks // Copy the relevant chunks
dstChunks := make([]*filer_pb.FileChunk, len(relevantChunks)) dstChunks := make([]*filer_pb.FileChunk, len(relevantChunks))
executor := util.NewLimitedConcurrentExecutor(8) executor := util.NewLimitedConcurrentExecutor(4)
errChan := make(chan error, len(relevantChunks)) errChan := make(chan error, len(relevantChunks))
for i, chunk := range relevantChunks { for i, chunk := range relevantChunks {