2025-08-11 16:40:46 -07:00
package weed_server
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"strings"
"time"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func ( fs * FilerServer ) copy ( ctx context . Context , w http . ResponseWriter , r * http . Request , so * operation . StorageOption ) {
src := r . URL . Query ( ) . Get ( "cp.from" )
dst := r . URL . Path
glog . V ( 2 ) . InfofCtx ( ctx , "FilerServer.copy %v to %v" , src , dst )
var err error
if src , err = clearName ( src ) ; err != nil {
writeJsonError ( w , r , http . StatusBadRequest , err )
return
}
if dst , err = clearName ( dst ) ; err != nil {
writeJsonError ( w , r , http . StatusBadRequest , err )
return
}
src = strings . TrimRight ( src , "/" )
if src == "" {
err = fmt . Errorf ( "invalid source '/'" )
writeJsonError ( w , r , http . StatusBadRequest , err )
return
}
srcPath := util . FullPath ( src )
dstPath := util . FullPath ( dst )
if dstPath . IsLongerFileName ( so . MaxFileNameLength ) {
err = fmt . Errorf ( "dst name too long" )
writeJsonError ( w , r , http . StatusBadRequest , err )
return
}
srcEntry , err := fs . filer . FindEntry ( ctx , srcPath )
if err != nil {
err = fmt . Errorf ( "failed to get src entry '%s': %w" , src , err )
writeJsonError ( w , r , http . StatusBadRequest , err )
return
}
glog . V ( 1 ) . InfofCtx ( ctx , "FilerServer.copy source entry: content_len=%d, chunks_len=%d" , len ( srcEntry . Content ) , len ( srcEntry . GetChunks ( ) ) )
// Check if source is a directory - currently not supported for recursive copying
if srcEntry . IsDirectory ( ) {
err = fmt . Errorf ( "copy: directory copying not yet supported for '%s'" , src )
writeJsonError ( w , r , http . StatusBadRequest , err )
return
}
_ , oldName := srcPath . DirAndName ( )
finalDstPath := dstPath
// Check if destination is a directory
dstPathEntry , findErr := fs . filer . FindEntry ( ctx , dstPath )
if findErr != nil && findErr != filer_pb . ErrNotFound {
err = fmt . Errorf ( "failed to check destination path %s: %w" , dstPath , findErr )
writeJsonError ( w , r , http . StatusInternalServerError , err )
return
}
if findErr == nil && dstPathEntry . IsDirectory ( ) {
finalDstPath = dstPath . Child ( oldName )
} else {
newDir , newName := dstPath . DirAndName ( )
newName = util . Nvl ( newName , oldName )
finalDstPath = util . FullPath ( newDir ) . Child ( newName )
}
// Check if destination file already exists
// TODO: add an overwrite parameter to allow overwriting
if dstEntry , err := fs . filer . FindEntry ( ctx , finalDstPath ) ; err != nil && err != filer_pb . ErrNotFound {
err = fmt . Errorf ( "failed to check destination entry %s: %w" , finalDstPath , err )
writeJsonError ( w , r , http . StatusInternalServerError , err )
return
} else if dstEntry != nil {
err = fmt . Errorf ( "destination file %s already exists" , finalDstPath )
writeJsonError ( w , r , http . StatusConflict , err )
return
}
// Copy the file content and chunks
newEntry , err := fs . copyEntry ( ctx , srcEntry , finalDstPath , so )
if err != nil {
err = fmt . Errorf ( "failed to copy entry from '%s' to '%s': %w" , src , dst , err )
writeJsonError ( w , r , http . StatusInternalServerError , err )
return
}
if createErr := fs . filer . CreateEntry ( ctx , newEntry , true , false , nil , false , fs . filer . MaxFilenameLength ) ; createErr != nil {
err = fmt . Errorf ( "failed to create copied entry from '%s' to '%s': %w" , src , dst , createErr )
writeJsonError ( w , r , http . StatusInternalServerError , err )
return
}
glog . V ( 1 ) . InfofCtx ( ctx , "FilerServer.copy completed successfully: src='%s' -> dst='%s' (final_path='%s')" , src , dst , finalDstPath )
w . WriteHeader ( http . StatusNoContent )
}
// copyEntry creates a new entry with copied content and chunks
func ( fs * FilerServer ) copyEntry ( ctx context . Context , srcEntry * filer . Entry , dstPath util . FullPath , so * operation . StorageOption ) ( * filer . Entry , error ) {
// Create the base entry structure
// Note: For hard links, we copy the actual content but NOT the HardLinkId/HardLinkCounter
// This creates an independent copy rather than another hard link to the same content
newEntry := & filer . Entry {
FullPath : dstPath ,
// Deep copy Attr field to ensure slice independence (GroupNames, Md5)
Attr : func ( a filer . Attr ) filer . Attr {
a . GroupNames = append ( [ ] string ( nil ) , a . GroupNames ... )
a . Md5 = append ( [ ] byte ( nil ) , a . Md5 ... )
return a
} ( srcEntry . Attr ) ,
Quota : srcEntry . Quota ,
// Intentionally NOT copying HardLinkId and HardLinkCounter to create independent copy
}
// Deep copy Extended fields to ensure independence
if srcEntry . Extended != nil {
newEntry . Extended = make ( map [ string ] [ ] byte , len ( srcEntry . Extended ) )
for k , v := range srcEntry . Extended {
newEntry . Extended [ k ] = append ( [ ] byte ( nil ) , v ... )
}
}
// Deep copy Remote field to ensure independence
if srcEntry . Remote != nil {
newEntry . Remote = & filer_pb . RemoteEntry {
StorageName : srcEntry . Remote . StorageName ,
LastLocalSyncTsNs : srcEntry . Remote . LastLocalSyncTsNs ,
RemoteETag : srcEntry . Remote . RemoteETag ,
RemoteMtime : srcEntry . Remote . RemoteMtime ,
RemoteSize : srcEntry . Remote . RemoteSize ,
}
}
// Log if we're copying a hard link so we can track this behavior
if len ( srcEntry . HardLinkId ) > 0 {
glog . V ( 2 ) . InfofCtx ( ctx , "FilerServer.copyEntry: copying hard link %s (nlink=%d) as independent file" , srcEntry . FullPath , srcEntry . HardLinkCounter )
}
// Handle small files stored in Content field
if len ( srcEntry . Content ) > 0 {
// For small files, just copy the content directly
newEntry . Content = make ( [ ] byte , len ( srcEntry . Content ) )
copy ( newEntry . Content , srcEntry . Content )
glog . V ( 2 ) . InfofCtx ( ctx , "FilerServer.copyEntry: copied content directly, size=%d" , len ( newEntry . Content ) )
return newEntry , nil
}
// Handle files stored as chunks (including resolved hard link content)
if len ( srcEntry . GetChunks ( ) ) > 0 {
srcChunks := srcEntry . GetChunks ( )
2025-08-11 16:44:51 -07:00
// Create HTTP client once for reuse across all chunk operations
client := & http . Client { Timeout : 60 * time . Second }
2025-08-11 16:40:46 -07:00
// Check if any chunks are manifest chunks - these require special handling
if filer . HasChunkManifest ( srcChunks ) {
glog . V ( 2 ) . InfofCtx ( ctx , "FilerServer.copyEntry: handling manifest chunks" )
2025-08-11 16:44:51 -07:00
newChunks , err := fs . copyChunksWithManifest ( ctx , srcChunks , so , client )
2025-08-11 16:40:46 -07:00
if err != nil {
return nil , fmt . Errorf ( "failed to copy chunks with manifest: %w" , err )
}
newEntry . Chunks = newChunks
glog . V ( 2 ) . InfofCtx ( ctx , "FilerServer.copyEntry: copied manifest chunks, count=%d" , len ( newChunks ) )
} else {
// Regular chunks without manifest - copy directly
2025-08-11 16:44:51 -07:00
newChunks , err := fs . copyChunks ( ctx , srcChunks , so , client )
2025-08-11 16:40:46 -07:00
if err != nil {
return nil , fmt . Errorf ( "failed to copy chunks: %w" , err )
}
newEntry . Chunks = newChunks
glog . V ( 2 ) . InfofCtx ( ctx , "FilerServer.copyEntry: copied regular chunks, count=%d" , len ( newChunks ) )
}
return newEntry , nil
}
// Empty file case (or hard link with no content - should not happen if hard link was properly resolved)
if len ( srcEntry . HardLinkId ) > 0 {
glog . WarningfCtx ( ctx , "FilerServer.copyEntry: hard link %s appears to have no content - this may indicate an issue with hard link resolution" , srcEntry . FullPath )
}
glog . V ( 2 ) . InfofCtx ( ctx , "FilerServer.copyEntry: empty file, no content or chunks to copy" )
return newEntry , nil
}
// copyChunks creates new chunks by copying data from source chunks using parallel streaming approach
2025-08-11 16:44:51 -07:00
func ( fs * FilerServer ) copyChunks ( ctx context . Context , srcChunks [ ] * filer_pb . FileChunk , so * operation . StorageOption , client * http . Client ) ( [ ] * filer_pb . FileChunk , error ) {
2025-08-11 16:40:46 -07:00
if len ( srcChunks ) == 0 {
return nil , nil
}
// Optimize: Batch volume lookup for all chunks to reduce RPC calls
volumeLocationsMap , err := fs . batchLookupVolumeLocations ( ctx , srcChunks )
if err != nil {
return nil , fmt . Errorf ( "failed to lookup volume locations: %w" , err )
}
// Parallel chunk copying with concurrency control using errgroup
const maxConcurrentChunks = 8 // Match SeaweedFS standard for parallel operations
// Pre-allocate result slice to maintain order
newChunks := make ( [ ] * filer_pb . FileChunk , len ( srcChunks ) )
// Use errgroup for cleaner concurrency management
g , gCtx := errgroup . WithContext ( ctx )
g . SetLimit ( maxConcurrentChunks ) // Limit concurrent goroutines
// Validate that all chunk locations are available before starting any concurrent work
for _ , chunk := range srcChunks {
volumeId := chunk . Fid . VolumeId
locations , ok := volumeLocationsMap [ volumeId ]
if ! ok || len ( locations ) == 0 {
return nil , fmt . Errorf ( "no locations found for volume %d" , volumeId )
}
}
glog . V ( 2 ) . InfofCtx ( ctx , "FilerServer.copyChunks: starting parallel copy of %d chunks with max concurrency %d" , len ( srcChunks ) , maxConcurrentChunks )
// Launch goroutines for each chunk
for i , srcChunk := range srcChunks {
// Capture loop variables for goroutine closure
chunkIndex := i
chunk := srcChunk
chunkLocations := volumeLocationsMap [ srcChunk . Fid . VolumeId ]
g . Go ( func ( ) error {
glog . V ( 3 ) . InfofCtx ( gCtx , "FilerServer.copyChunks: copying chunk %d/%d, size=%d" , chunkIndex + 1 , len ( srcChunks ) , chunk . Size )
// Use streaming copy to avoid loading entire chunk into memory
newChunk , err := fs . streamCopyChunk ( gCtx , chunk , so , client , chunkLocations )
if err != nil {
return fmt . Errorf ( "failed to copy chunk %d (%s): %w" , chunkIndex + 1 , chunk . GetFileIdString ( ) , err )
}
// Store result at correct index to maintain order
newChunks [ chunkIndex ] = newChunk
glog . V ( 4 ) . InfofCtx ( gCtx , "FilerServer.copyChunks: successfully copied chunk %d/%d" , chunkIndex + 1 , len ( srcChunks ) )
return nil
} )
}
// Wait for all chunks to complete and return first error (if any)
if err := g . Wait ( ) ; err != nil {
return nil , err
}
// Verify all chunks were copied (shouldn't happen if no errors, but safety check)
for i , chunk := range newChunks {
if chunk == nil {
return nil , fmt . Errorf ( "chunk %d was not copied (internal error)" , i )
}
}
glog . V ( 2 ) . InfofCtx ( ctx , "FilerServer.copyChunks: successfully completed parallel copy of %d chunks" , len ( srcChunks ) )
return newChunks , nil
}
// copyChunksWithManifest handles copying chunks that include manifest chunks
2025-08-11 16:44:51 -07:00
func ( fs * FilerServer ) copyChunksWithManifest ( ctx context . Context , srcChunks [ ] * filer_pb . FileChunk , so * operation . StorageOption , client * http . Client ) ( [ ] * filer_pb . FileChunk , error ) {
2025-08-11 16:40:46 -07:00
if len ( srcChunks ) == 0 {
return nil , nil
}
glog . V ( 2 ) . InfofCtx ( ctx , "FilerServer.copyChunksWithManifest: processing %d chunks (some are manifests)" , len ( srcChunks ) )
// Separate manifest chunks from regular data chunks
manifestChunks , nonManifestChunks := filer . SeparateManifestChunks ( srcChunks )
var newChunks [ ] * filer_pb . FileChunk
// First, copy all non-manifest chunks directly
if len ( nonManifestChunks ) > 0 {
glog . V ( 3 ) . InfofCtx ( ctx , "FilerServer.copyChunksWithManifest: copying %d non-manifest chunks" , len ( nonManifestChunks ) )
2025-08-11 16:44:51 -07:00
newNonManifestChunks , err := fs . copyChunks ( ctx , nonManifestChunks , so , client )
2025-08-11 16:40:46 -07:00
if err != nil {
return nil , fmt . Errorf ( "failed to copy non-manifest chunks: %w" , err )
}
newChunks = append ( newChunks , newNonManifestChunks ... )
}
// Process each manifest chunk separately
for i , manifestChunk := range manifestChunks {
glog . V ( 3 ) . InfofCtx ( ctx , "FilerServer.copyChunksWithManifest: processing manifest chunk %d/%d" , i + 1 , len ( manifestChunks ) )
// Resolve the manifest chunk to get the actual data chunks it references
lookupFileIdFn := func ( ctx context . Context , fileId string ) ( urls [ ] string , err error ) {
return fs . filer . MasterClient . GetLookupFileIdFunction ( ) ( ctx , fileId )
}
resolvedChunks , err := filer . ResolveOneChunkManifest ( ctx , lookupFileIdFn , manifestChunk )
if err != nil {
return nil , fmt . Errorf ( "failed to resolve manifest chunk %s: %w" , manifestChunk . GetFileIdString ( ) , err )
}
glog . V ( 4 ) . InfofCtx ( ctx , "FilerServer.copyChunksWithManifest: resolved manifest chunk %s to %d data chunks" ,
manifestChunk . GetFileIdString ( ) , len ( resolvedChunks ) )
// Copy all the resolved data chunks (use recursive copyChunksWithManifest to handle nested manifests)
2025-08-11 16:44:51 -07:00
newResolvedChunks , err := fs . copyChunksWithManifest ( ctx , resolvedChunks , so , client )
2025-08-11 16:40:46 -07:00
if err != nil {
return nil , fmt . Errorf ( "failed to copy resolved chunks from manifest %s: %w" , manifestChunk . GetFileIdString ( ) , err )
}
// Create a new manifest chunk that references the copied data chunks
2025-08-11 16:44:51 -07:00
newManifestChunk , err := fs . createManifestChunk ( ctx , newResolvedChunks , manifestChunk , so , client )
2025-08-11 16:40:46 -07:00
if err != nil {
return nil , fmt . Errorf ( "failed to create new manifest chunk: %w" , err )
}
newChunks = append ( newChunks , newManifestChunk )
glog . V ( 4 ) . InfofCtx ( ctx , "FilerServer.copyChunksWithManifest: created new manifest chunk %s for %d resolved chunks" ,
newManifestChunk . GetFileIdString ( ) , len ( newResolvedChunks ) )
}
glog . V ( 2 ) . InfofCtx ( ctx , "FilerServer.copyChunksWithManifest: completed copying %d total chunks (%d manifest, %d regular)" ,
len ( newChunks ) , len ( manifestChunks ) , len ( nonManifestChunks ) )
return newChunks , nil
}
// createManifestChunk creates a new manifest chunk that references the provided data chunks
2025-08-11 16:44:51 -07:00
func ( fs * FilerServer ) createManifestChunk ( ctx context . Context , dataChunks [ ] * filer_pb . FileChunk , originalManifest * filer_pb . FileChunk , so * operation . StorageOption , client * http . Client ) ( * filer_pb . FileChunk , error ) {
2025-08-11 16:40:46 -07:00
// Create the manifest data structure
filer_pb . BeforeEntrySerialization ( dataChunks )
manifestData := & filer_pb . FileChunkManifest {
Chunks : dataChunks ,
}
// Serialize the manifest
data , err := proto . Marshal ( manifestData )
if err != nil {
return nil , fmt . Errorf ( "failed to marshal manifest: %w" , err )
}
// Save the manifest data as a new chunk
saveFunc := func ( reader io . Reader , name string , offset int64 , tsNs int64 ) ( chunk * filer_pb . FileChunk , err error ) {
// Assign a new file ID
fileId , urlLocation , auth , assignErr := fs . assignNewFileInfo ( ctx , so )
if assignErr != nil {
return nil , fmt . Errorf ( "failed to assign file ID for manifest: %w" , assignErr )
}
// Upload the manifest data
err = fs . uploadData ( ctx , reader , urlLocation , string ( auth ) , client )
if err != nil {
return nil , fmt . Errorf ( "failed to upload manifest data: %w" , err )
}
// Create the chunk metadata
chunk = & filer_pb . FileChunk {
FileId : fileId ,
Offset : offset ,
Size : uint64 ( len ( data ) ) ,
}
return chunk , nil
}
manifestChunk , err := saveFunc ( bytes . NewReader ( data ) , "" , originalManifest . Offset , 0 )
if err != nil {
return nil , fmt . Errorf ( "failed to save manifest chunk: %w" , err )
}
// Set manifest-specific properties
manifestChunk . IsChunkManifest = true
manifestChunk . Size = originalManifest . Size
return manifestChunk , nil
}
// uploadData uploads data to a volume server
func ( fs * FilerServer ) uploadData ( ctx context . Context , reader io . Reader , urlLocation , auth string , client * http . Client ) error {
req , err := http . NewRequestWithContext ( ctx , "PUT" , urlLocation , reader )
if err != nil {
return fmt . Errorf ( "failed to create upload request: %w" , err )
}
if auth != "" {
req . Header . Set ( "Authorization" , "Bearer " + auth )
}
resp , err := client . Do ( req )
if err != nil {
return fmt . Errorf ( "failed to upload data: %w" , err )
}
defer resp . Body . Close ( )
if resp . StatusCode != http . StatusCreated && resp . StatusCode != http . StatusOK {
body , readErr := io . ReadAll ( resp . Body )
if readErr != nil {
return fmt . Errorf ( "upload failed with status %d, and failed to read response: %w" , resp . StatusCode , readErr )
}
return fmt . Errorf ( "upload failed with status %d: %s" , resp . StatusCode , string ( body ) )
}
return nil
}
// batchLookupVolumeLocations performs a single batched lookup for all unique volume IDs in the chunks
func ( fs * FilerServer ) batchLookupVolumeLocations ( ctx context . Context , chunks [ ] * filer_pb . FileChunk ) ( map [ uint32 ] [ ] operation . Location , error ) {
// Collect unique volume IDs and their string representations to avoid repeated conversions
volumeIdMap := make ( map [ uint32 ] string )
for _ , chunk := range chunks {
vid := chunk . Fid . VolumeId
if _ , found := volumeIdMap [ vid ] ; ! found {
volumeIdMap [ vid ] = fmt . Sprintf ( "%d" , vid )
}
}
if len ( volumeIdMap ) == 0 {
return make ( map [ uint32 ] [ ] operation . Location ) , nil
}
// Convert to slice of strings for the lookup call
volumeIdStrs := make ( [ ] string , 0 , len ( volumeIdMap ) )
for _ , vidStr := range volumeIdMap {
volumeIdStrs = append ( volumeIdStrs , vidStr )
}
// Perform single batched lookup
lookupResult , err := operation . LookupVolumeIds ( fs . filer . GetMaster , fs . grpcDialOption , volumeIdStrs )
if err != nil {
return nil , fmt . Errorf ( "failed to lookup volumes: %w" , err )
}
// Convert result to map of volumeId -> locations
volumeLocationsMap := make ( map [ uint32 ] [ ] operation . Location )
for volumeId , volumeIdStr := range volumeIdMap {
if volumeLocations , ok := lookupResult [ volumeIdStr ] ; ok && len ( volumeLocations . Locations ) > 0 {
volumeLocationsMap [ volumeId ] = volumeLocations . Locations
}
}
return volumeLocationsMap , nil
}
// streamCopyChunk copies a chunk using streaming to minimize memory usage
func ( fs * FilerServer ) streamCopyChunk ( ctx context . Context , srcChunk * filer_pb . FileChunk , so * operation . StorageOption , client * http . Client , locations [ ] operation . Location ) ( * filer_pb . FileChunk , error ) {
// Assign a new file ID for destination
fileId , urlLocation , auth , err := fs . assignNewFileInfo ( ctx , so )
if err != nil {
return nil , fmt . Errorf ( "failed to assign new file ID: %w" , err )
}
// Try all available locations for source chunk until one succeeds
fileIdString := srcChunk . GetFileIdString ( )
var lastErr error
for i , location := range locations {
srcUrl := fmt . Sprintf ( "http://%s/%s" , location . Url , fileIdString )
glog . V ( 4 ) . InfofCtx ( ctx , "FilerServer.streamCopyChunk: attempting streaming copy from %s to %s (attempt %d/%d)" , srcUrl , urlLocation , i + 1 , len ( locations ) )
// Perform streaming copy using HTTP client
err := fs . performStreamCopy ( ctx , srcUrl , urlLocation , string ( auth ) , srcChunk . Size , client )
if err != nil {
lastErr = err
glog . V ( 2 ) . InfofCtx ( ctx , "FilerServer.streamCopyChunk: failed streaming copy from %s: %v" , srcUrl , err )
continue
}
// Success - create chunk metadata
newChunk := & filer_pb . FileChunk {
FileId : fileId ,
Offset : srcChunk . Offset ,
Size : srcChunk . Size ,
ETag : srcChunk . ETag ,
}
glog . V ( 4 ) . InfofCtx ( ctx , "FilerServer.streamCopyChunk: successfully streamed %d bytes" , srcChunk . Size )
return newChunk , nil
}
// All locations failed
return nil , fmt . Errorf ( "failed to stream copy chunk from any location: %w" , lastErr )
}
// performStreamCopy performs the actual streaming copy from source URL to destination URL
func ( fs * FilerServer ) performStreamCopy ( ctx context . Context , srcUrl , dstUrl , auth string , expectedSize uint64 , client * http . Client ) error {
// Create HTTP request to read from source
req , err := http . NewRequestWithContext ( ctx , "GET" , srcUrl , nil )
if err != nil {
return fmt . Errorf ( "failed to create source request: %v" , err )
}
// Perform source request
resp , err := client . Do ( req )
if err != nil {
return fmt . Errorf ( "failed to read from source: %v" , err )
}
defer resp . Body . Close ( )
if resp . StatusCode != http . StatusOK {
return fmt . Errorf ( "source returned status %d" , resp . StatusCode )
}
// Create HTTP request to write to destination
dstReq , err := http . NewRequestWithContext ( ctx , "PUT" , dstUrl , resp . Body )
if err != nil {
return fmt . Errorf ( "failed to create destination request: %v" , err )
}
dstReq . ContentLength = int64 ( expectedSize )
// Set authorization header if provided
if auth != "" {
dstReq . Header . Set ( "Authorization" , "Bearer " + auth )
}
dstReq . Header . Set ( "Content-Type" , "application/octet-stream" )
// Perform destination request
dstResp , err := client . Do ( dstReq )
if err != nil {
return fmt . Errorf ( "failed to write to destination: %v" , err )
}
defer dstResp . Body . Close ( )
if dstResp . StatusCode != http . StatusCreated && dstResp . StatusCode != http . StatusOK {
// Read error response body for more details
body , readErr := io . ReadAll ( dstResp . Body )
if readErr != nil {
return fmt . Errorf ( "destination returned status %d, and failed to read body: %w" , dstResp . StatusCode , readErr )
}
return fmt . Errorf ( "destination returned status %d: %s" , dstResp . StatusCode , string ( body ) )
}
glog . V ( 4 ) . InfofCtx ( ctx , "FilerServer.performStreamCopy: successfully streamed data from %s to %s" , srcUrl , dstUrl )
return nil
}