mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-08 23:34:44 +08:00
Migrates Azure Blob Storage integration from the deprecated github.com/Azure/azure-storage-blob-go to the modern github.com/Azure/azure-sdk-for-go/sdk/storage/azblob SDK. ## Changes ### Removed Files - weed/remote_storage/azure/azure_highlevel.go - Custom upload helper no longer needed with new SDK ### Updated Files - weed/remote_storage/azure/azure_storage_client.go - Migrated from ServiceURL/ContainerURL/BlobURL to Client-based API - Updated client creation using NewClientWithSharedKeyCredential - Replaced ListBlobsFlatSegment with NewListBlobsFlatPager - Updated Download to DownloadStream with proper HTTPRange - Replaced custom uploadReaderAtToBlockBlob with UploadStream - Updated GetProperties, SetMetadata, Delete to use new client methods - Fixed metadata conversion to return map[string]*string - weed/replication/sink/azuresink/azure_sink.go - Migrated from ContainerURL to Client-based API - Updated client initialization - Replaced AppendBlobURL with AppendBlobClient - Updated error handling to use azcore.ResponseError - Added streaming.NopCloser for AppendBlock ### New Test Files - weed/remote_storage/azure/azure_storage_client_test.go - Comprehensive unit tests for all client operations - Tests for Traverse, ReadFile, WriteFile, UpdateMetadata, Delete - Tests for metadata conversion function - Benchmark tests - Integration tests (skippable without credentials) - weed/replication/sink/azuresink/azure_sink_test.go - Unit tests for Azure sink operations - Tests for CreateEntry, UpdateEntry, DeleteEntry - Tests for cleanKey function - Tests for configuration-based initialization - Integration tests (skippable without credentials) - Benchmark tests ### Dependency Updates - go.mod: Removed github.com/Azure/azure-storage-blob-go v0.15.0 - go.mod: Made github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 direct dependency - All deprecated dependencies automatically cleaned up ## API Migration Summary Old SDK → New SDK mappings: - ServiceURL → Client (service-level operations) - ContainerURL → ContainerClient - BlobURL → BlobClient - BlockBlobURL → BlockBlobClient - AppendBlobURL → AppendBlobClient - ListBlobsFlatSegment() → NewListBlobsFlatPager() - Download() → DownloadStream() - Upload() → UploadStream() - Marker-based pagination → Pager-based pagination - azblob.ResponseError → azcore.ResponseError ## Testing All tests pass: - ✅ Unit tests for metadata conversion - ✅ Unit tests for helper functions (cleanKey) - ✅ Interface implementation tests - ✅ Build successful - ✅ No compilation errors - ✅ Integration tests available (require Azure credentials) ## Benefits - ✅ Uses actively maintained SDK - ✅ Better performance with modern API design - ✅ Improved error handling - ✅ Removes ~200 lines of custom upload code - ✅ Reduces dependency count - ✅ Better async/streaming support - ✅ Future-proof against SDK deprecation ## Backward Compatibility The changes are transparent to users: - Same configuration parameters (account name, account key) - Same functionality and behavior - No changes to SeaweedFS API or user-facing features - Existing Azure storage configurations continue to work ## Breaking Changes None - this is an internal implementation change only.
168 lines
4.8 KiB
Go
168 lines
4.8 KiB
Go
package azuresink
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
|
|
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
|
|
"github.com/seaweedfs/seaweedfs/weed/replication/source"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
type AzureSink struct {
|
|
client *azblob.Client
|
|
container string
|
|
dir string
|
|
filerSource *source.FilerSource
|
|
isIncremental bool
|
|
}
|
|
|
|
func init() {
|
|
sink.Sinks = append(sink.Sinks, &AzureSink{})
|
|
}
|
|
|
|
func (g *AzureSink) GetName() string {
|
|
return "azure"
|
|
}
|
|
|
|
func (g *AzureSink) GetSinkToDirectory() string {
|
|
return g.dir
|
|
}
|
|
|
|
func (g *AzureSink) IsIncremental() bool {
|
|
return g.isIncremental
|
|
}
|
|
|
|
func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error {
|
|
g.isIncremental = configuration.GetBool(prefix + "is_incremental")
|
|
return g.initialize(
|
|
configuration.GetString(prefix+"account_name"),
|
|
configuration.GetString(prefix+"account_key"),
|
|
configuration.GetString(prefix+"container"),
|
|
configuration.GetString(prefix+"directory"),
|
|
)
|
|
}
|
|
|
|
func (g *AzureSink) SetSourceFiler(s *source.FilerSource) {
|
|
g.filerSource = s
|
|
}
|
|
|
|
func (g *AzureSink) initialize(accountName, accountKey, container, dir string) error {
|
|
g.container = container
|
|
g.dir = dir
|
|
|
|
// Create credential and client
|
|
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create Azure credential with account name:%s: %v", accountName, err)
|
|
}
|
|
|
|
serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName)
|
|
client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create Azure client: %v", err)
|
|
}
|
|
|
|
g.client = client
|
|
|
|
return nil
|
|
}
|
|
|
|
func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
|
|
|
|
key = cleanKey(key)
|
|
|
|
if isDirectory {
|
|
key = key + "/"
|
|
}
|
|
|
|
blobClient := g.client.ServiceClient().NewContainerClient(g.container).NewBlobClient(key)
|
|
_, err := blobClient.Delete(context.Background(), &blob.DeleteOptions{
|
|
DeleteSnapshots: &[]blob.DeleteSnapshotsOptionType{blob.DeleteSnapshotsOptionTypeInclude}[0],
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("azure delete %s/%s: %v", g.container, key, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
|
|
|
|
key = cleanKey(key)
|
|
|
|
if entry.IsDirectory {
|
|
return nil
|
|
}
|
|
|
|
totalSize := filer.FileSize(entry)
|
|
chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
|
|
|
|
// Create append blob client
|
|
appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key)
|
|
|
|
// Create blob with access conditions
|
|
accessConditions := &blob.AccessConditions{}
|
|
if entry.Attributes != nil && entry.Attributes.Mtime > 0 {
|
|
modifiedTime := time.Unix(entry.Attributes.Mtime, 0)
|
|
accessConditions.ModifiedAccessConditions = &blob.ModifiedAccessConditions{
|
|
IfUnmodifiedSince: &modifiedTime,
|
|
}
|
|
}
|
|
|
|
_, err := appendBlobClient.Create(context.Background(), &appendblob.CreateOptions{
|
|
AccessConditions: accessConditions,
|
|
})
|
|
|
|
if err != nil {
|
|
// Check if this is a precondition failed error (HTTP 412)
|
|
var respErr *azcore.ResponseError
|
|
if ok := errors.As(err, &respErr); ok && respErr.StatusCode == http.StatusPreconditionFailed {
|
|
glog.V(0).Infof("skip overwriting %s/%s: precondition failed", g.container, key)
|
|
return nil
|
|
}
|
|
return fmt.Errorf("azure create append blob %s/%s: %v", g.container, key, err)
|
|
}
|
|
|
|
writeFunc := func(data []byte) error {
|
|
_, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{})
|
|
return writeErr
|
|
}
|
|
|
|
if len(entry.Content) > 0 {
|
|
return writeFunc(entry.Content)
|
|
}
|
|
|
|
if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
|
|
key = cleanKey(key)
|
|
return true, g.CreateEntry(key, newEntry, signatures)
|
|
}
|
|
|
|
func cleanKey(key string) string {
|
|
if strings.HasPrefix(key, "/") {
|
|
key = key[1:]
|
|
}
|
|
return key
|
|
}
|