address comments

This commit is contained in:
chrislu
2025-11-17 17:20:43 -08:00
parent 1b238f368a
commit b6cc7297af
2 changed files with 54 additions and 36 deletions

View File

@@ -64,7 +64,10 @@ func TestSSES3MultipartChunkViewDecryption(t *testing.T) {
// Verify decryption works with the chunk's IV // Verify decryption works with the chunk's IV
decryptedData := make([]byte, len(ciphertext)) decryptedData := make([]byte, len(ciphertext))
decryptBlock, _ := aes.NewCipher(key) decryptBlock, err := aes.NewCipher(key)
if err != nil {
t.Fatalf("Failed to create decrypt cipher: %v", err)
}
decryptStream := cipher.NewCTR(decryptBlock, chunkIV) decryptStream := cipher.NewCTR(decryptBlock, chunkIV)
decryptStream.XORKeyStream(decryptedData, ciphertext) decryptStream.XORKeyStream(decryptedData, ciphertext)

View File

@@ -732,24 +732,7 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
return err return err
} }
// Set standard HTTP headers from entry metadata // Get chunks and validate BEFORE setting headers
// IMPORTANT: Set ALL headers BEFORE calling WriteHeader (headers are ignored after WriteHeader)
tHeaderSet := time.Now()
s3a.setResponseHeaders(w, entry, totalSize)
// Override/add range-specific headers if this is a range request
if isRangeRequest {
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize))
w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
}
headerSetTime = time.Since(tHeaderSet)
// Now write status code (headers are all set)
if isRangeRequest {
w.WriteHeader(http.StatusPartialContent)
}
// Get chunks
chunks := entry.GetChunks() chunks := entry.GetChunks()
glog.Infof("streamFromVolumeServers: entry has %d chunks, totalSize=%d, isRange=%v, offset=%d, size=%d", glog.Infof("streamFromVolumeServers: entry has %d chunks, totalSize=%d, isRange=%v, offset=%d, size=%d",
len(chunks), totalSize, isRangeRequest, offset, size) len(chunks), totalSize, isRangeRequest, offset, size)
@@ -759,17 +742,13 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
if totalSize > 0 && len(entry.Content) == 0 { if totalSize > 0 && len(entry.Content) == 0 {
glog.Errorf("streamFromVolumeServers: Data integrity error - entry reports size %d but has no content or chunks", totalSize) glog.Errorf("streamFromVolumeServers: Data integrity error - entry reports size %d but has no content or chunks", totalSize)
// IMPORTANT: Write error status before returning, since headers haven't been written yet // IMPORTANT: Write error status before returning, since headers haven't been written yet
// Clear any headers set by setResponseHeaders to avoid misleading Content-Length
w.Header().Del("Content-Length")
w.Header().Del("Content-Range")
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Internal Server Error: data integrity issue (size %d reported but no content available)", totalSize)
return fmt.Errorf("data integrity error: size %d reported but no content available", totalSize) return fmt.Errorf("data integrity error: size %d reported but no content available", totalSize)
} }
// Empty object - need to set headers before writing status // Empty object - set headers and write status
if !isRangeRequest { s3a.setResponseHeaders(w, entry, totalSize)
// Headers were already set by setResponseHeaders above w.WriteHeader(http.StatusOK)
w.WriteHeader(http.StatusOK)
}
return nil return nil
} }
@@ -778,7 +757,8 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
glog.Infof(" GET Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size) glog.Infof(" GET Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size)
} }
// Create lookup function via filer client (reuse shared helper) // CRITICAL: Resolve chunks and prepare stream BEFORE WriteHeader
// This ensures we can write proper error responses if these operations fail
ctx := r.Context() ctx := r.Context()
lookupFileIdFn := s3a.createLookupFileIdFunction() lookupFileIdFn := s3a.createLookupFileIdFunction()
@@ -788,7 +768,8 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
chunkResolveTime = time.Since(tChunkResolve) chunkResolveTime = time.Since(tChunkResolve)
if err != nil { if err != nil {
glog.Errorf("streamFromVolumeServers: failed to resolve chunks: %v", err) glog.Errorf("streamFromVolumeServers: failed to resolve chunks: %v", err)
// Don't try to write headers if we already wrote them for range request w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Internal Server Error: failed to resolve chunks")
return fmt.Errorf("failed to resolve chunks: %v", err) return fmt.Errorf("failed to resolve chunks: %v", err)
} }
@@ -810,10 +791,29 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
streamPrepTime = time.Since(tStreamPrep) streamPrepTime = time.Since(tStreamPrep)
if err != nil { if err != nil {
glog.Errorf("streamFromVolumeServers: failed to prepare stream: %v", err) glog.Errorf("streamFromVolumeServers: failed to prepare stream: %v", err)
// Don't try to write headers if we already wrote them for range request w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Internal Server Error: failed to prepare stream")
return fmt.Errorf("failed to prepare stream: %v", err) return fmt.Errorf("failed to prepare stream: %v", err)
} }
// All validation and preparation successful - NOW set headers and write status
tHeaderSet := time.Now()
s3a.setResponseHeaders(w, entry, totalSize)
// Override/add range-specific headers if this is a range request
if isRangeRequest {
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize))
w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
}
headerSetTime = time.Since(tHeaderSet)
// Now write status code (headers are all set, stream is ready)
if isRangeRequest {
w.WriteHeader(http.StatusPartialContent)
} else {
w.WriteHeader(http.StatusOK)
}
// Stream directly to response // Stream directly to response
tStreamExec := time.Now() tStreamExec := time.Now()
glog.Infof("streamFromVolumeServers: starting streamFn, offset=%d, size=%d", offset, size) glog.Infof("streamFromVolumeServers: starting streamFn, offset=%d, size=%d", offset, size)
@@ -1458,21 +1458,28 @@ func (s3a *S3ApiServer) decryptSSES3ChunkView(ctx context.Context, fileChunk *fi
return nil, fmt.Errorf("failed to fetch full chunk: %w", err) return nil, fmt.Errorf("failed to fetch full chunk: %w", err)
} }
// Use the chunk's IV directly (already adjusted for part offset during encryption) // Use the chunk's IV directly (already adjusted for block offset during encryption)
// Note: SSE-S3 stores the offset-adjusted IV in chunk metadata, unlike SSE-C which stores base IV + PartOffset // Note: SSE-S3 encryption flow:
// 1. Upload: CreateSSES3EncryptedReaderWithBaseIV(reader, key, baseIV, partOffset)
// calls calculateIVWithOffset(baseIV, partOffset) → (blockAlignedIV, skip)
// The blockAlignedIV is stored in chunk metadata
// 2. Download: We decrypt the FULL chunk from offset 0 using that blockAlignedIV
// Then skip to chunkView.OffsetInChunk in the PLAINTEXT (not ciphertext)
// This differs from SSE-C which stores base IV + PartOffset and calculates IV during decryption
// No ivSkip needed here because we're decrypting from chunk start (offset 0)
iv := chunkSSES3Metadata.IV iv := chunkSSES3Metadata.IV
glog.V(4).Infof("Decrypting multipart SSE-S3 chunk %s with chunk-specific IV length=%d", glog.V(4).Infof("Decrypting multipart SSE-S3 chunk %s with chunk-specific IV length=%d",
chunkView.FileId, len(iv)) chunkView.FileId, len(iv))
// Decrypt the full chunk // Decrypt the full chunk starting from offset 0
decryptedReader, decryptErr := CreateSSES3DecryptedReader(fullChunkReader, chunkSSES3Metadata, iv) decryptedReader, decryptErr := CreateSSES3DecryptedReader(fullChunkReader, chunkSSES3Metadata, iv)
if decryptErr != nil { if decryptErr != nil {
fullChunkReader.Close() fullChunkReader.Close()
return nil, fmt.Errorf("failed to create SSE-S3 decrypted reader: %w", decryptErr) return nil, fmt.Errorf("failed to create SSE-S3 decrypted reader: %w", decryptErr)
} }
// Skip to position within chunk and limit to ViewSize // Skip to position within the decrypted chunk (plaintext offset, not ciphertext offset)
if chunkView.OffsetInChunk > 0 { if chunkView.OffsetInChunk > 0 {
_, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk)
if err != nil { if err != nil {
@@ -2873,8 +2880,16 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Con
glog.V(4).Infof("Decrypting SSE-C chunk %s with IV=%x, PartOffset=%d", glog.V(4).Infof("Decrypting SSE-C chunk %s with IV=%x, PartOffset=%d",
chunk.GetFileIdString(), chunkIV[:8], ssecMetadata.PartOffset) chunk.GetFileIdString(), chunkIV[:8], ssecMetadata.PartOffset)
// Note: For multipart SSE-C, each part was encrypted with offset=0 // Note: SSE-C multipart behavior (differs from SSE-KMS/SSE-S3):
// So we don't need to adjust the IV with PartOffset - just use the stored IV directly // - Upload: CreateSSECEncryptedReader generates RANDOM IV per part (no base IV + offset)
// - Metadata: PartOffset is stored but not used during encryption
// - Decryption: Use stored random IV directly (no offset adjustment needed)
//
// This differs from:
// - SSE-KMS/SSE-S3: Use base IV + calculateIVWithOffset(partOffset) during encryption
// - CopyObject: Applies calculateIVWithOffset to SSE-C (which may be incorrect)
//
// TODO: Investigate CopyObject SSE-C PartOffset handling for consistency
decryptedChunkReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV) decryptedChunkReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV)
if decErr != nil { if decErr != nil {
chunkReader.Close() chunkReader.Close()