Files
seaweedfs/weed/server/master_grpc_server_volume.go

374 lines
11 KiB
Go
Raw Normal View History

package weed_server
import (
"context"
"fmt"
"math"
2024-08-29 09:52:21 -07:00
"math/rand/v2"
2021-08-12 21:40:33 -07:00
"strings"
2021-05-06 18:46:14 +08:00
"sync"
"time"
2019-12-23 12:48:20 -08:00
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/topology"
"github.com/seaweedfs/raft"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
const (
volumeGrowStepCount = 2
)
func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) {
2025-09-04 15:39:56 +03:00
if ms.option.VolumeGrowthDisabled {
Migrate from deprecated azure-storage-blob-go to modern Azure SDK (#7310) * Migrate from deprecated azure-storage-blob-go to modern Azure SDK Migrates Azure Blob Storage integration from the deprecated github.com/Azure/azure-storage-blob-go to the modern github.com/Azure/azure-sdk-for-go/sdk/storage/azblob SDK. ## Changes ### Removed Files - weed/remote_storage/azure/azure_highlevel.go - Custom upload helper no longer needed with new SDK ### Updated Files - weed/remote_storage/azure/azure_storage_client.go - Migrated from ServiceURL/ContainerURL/BlobURL to Client-based API - Updated client creation using NewClientWithSharedKeyCredential - Replaced ListBlobsFlatSegment with NewListBlobsFlatPager - Updated Download to DownloadStream with proper HTTPRange - Replaced custom uploadReaderAtToBlockBlob with UploadStream - Updated GetProperties, SetMetadata, Delete to use new client methods - Fixed metadata conversion to return map[string]*string - weed/replication/sink/azuresink/azure_sink.go - Migrated from ContainerURL to Client-based API - Updated client initialization - Replaced AppendBlobURL with AppendBlobClient - Updated error handling to use azcore.ResponseError - Added streaming.NopCloser for AppendBlock ### New Test Files - weed/remote_storage/azure/azure_storage_client_test.go - Comprehensive unit tests for all client operations - Tests for Traverse, ReadFile, WriteFile, UpdateMetadata, Delete - Tests for metadata conversion function - Benchmark tests - Integration tests (skippable without credentials) - weed/replication/sink/azuresink/azure_sink_test.go - Unit tests for Azure sink operations - Tests for CreateEntry, UpdateEntry, DeleteEntry - Tests for cleanKey function - Tests for configuration-based initialization - Integration tests (skippable without credentials) - Benchmark tests ### Dependency Updates - go.mod: Removed github.com/Azure/azure-storage-blob-go v0.15.0 - go.mod: Made github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 direct dependency - All deprecated dependencies automatically cleaned up ## API Migration Summary Old SDK → New SDK mappings: - ServiceURL → Client (service-level operations) - ContainerURL → ContainerClient - BlobURL → BlobClient - BlockBlobURL → BlockBlobClient - AppendBlobURL → AppendBlobClient - ListBlobsFlatSegment() → NewListBlobsFlatPager() - Download() → DownloadStream() - Upload() → UploadStream() - Marker-based pagination → Pager-based pagination - azblob.ResponseError → azcore.ResponseError ## Testing All tests pass: - ✅ Unit tests for metadata conversion - ✅ Unit tests for helper functions (cleanKey) - ✅ Interface implementation tests - ✅ Build successful - ✅ No compilation errors - ✅ Integration tests available (require Azure credentials) ## Benefits - ✅ Uses actively maintained SDK - ✅ Better performance with modern API design - ✅ Improved error handling - ✅ Removes ~200 lines of custom upload code - ✅ Reduces dependency count - ✅ Better async/streaming support - ✅ Future-proof against SDK deprecation ## Backward Compatibility The changes are transparent to users: - Same configuration parameters (account name, account key) - Same functionality and behavior - No changes to SeaweedFS API or user-facing features - Existing Azure storage configurations continue to work ## Breaking Changes None - this is an internal implementation change only. * Address Gemini Code Assist review comments Fixed three issues identified by Gemini Code Assist: 1. HIGH: ReadFile now uses blob.CountToEnd when size is 0 - Old SDK: size=0 meant "read to end" - New SDK: size=0 means "read 0 bytes" - Fix: Use blob.CountToEnd (-1) to read entire blob from offset 2. MEDIUM: Use to.Ptr() instead of slice trick for DeleteSnapshots - Replaced &[]Type{value}[0] with to.Ptr(value) - Cleaner, more idiomatic Azure SDK pattern - Applied to both azure_storage_client.go and azure_sink.go 3. Added missing imports: - github.com/Azure/azure-sdk-for-go/sdk/azcore/to These changes improve code clarity and correctness while following Azure SDK best practices. * Address second round of Gemini Code Assist review comments Fixed all issues identified in the second review: 1. MEDIUM: Added constants for hardcoded values - Defined defaultBlockSize (4 MB) and defaultConcurrency (16) - Applied to WriteFile UploadStream options - Improves maintainability and readability 2. MEDIUM: Made DeleteFile idempotent - Now returns nil (no error) if blob doesn't exist - Uses bloberror.HasCode(err, bloberror.BlobNotFound) - Consistent with idempotent operation expectations 3. Fixed TestToMetadata test failures - Test was using lowercase 'x-amz-meta-' but constant is 'X-Amz-Meta-' - Updated test to use s3_constants.AmzUserMetaPrefix - All tests now pass Changes: - Added import: github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror - Added constants: defaultBlockSize, defaultConcurrency - Updated WriteFile to use constants - Updated DeleteFile to be idempotent - Fixed test to use correct S3 metadata prefix constant All tests pass. Build succeeds. Code follows Azure SDK best practices. * Address third round of Gemini Code Assist review comments Fixed all issues identified in the third review: 1. MEDIUM: Use bloberror.HasCode for ContainerAlreadyExists - Replaced fragile string check with bloberror.HasCode() - More robust and aligned with Azure SDK best practices - Applied to CreateBucket test 2. MEDIUM: Use bloberror.HasCode for BlobNotFound in test - Replaced generic error check with specific BlobNotFound check - Makes test more precise and verifies correct error returned - Applied to VerifyDeleted test 3. MEDIUM: Made DeleteEntry idempotent in azure_sink.go - Now returns nil (no error) if blob doesn't exist - Uses bloberror.HasCode(err, bloberror.BlobNotFound) - Consistent with DeleteFile implementation - Makes replication sink more robust to retries Changes: - Added import to azure_storage_client_test.go: bloberror - Added import to azure_sink.go: bloberror - Updated CreateBucket test to use bloberror.HasCode - Updated VerifyDeleted test to use bloberror.HasCode - Updated DeleteEntry to be idempotent All tests pass. Build succeeds. Code uses Azure SDK best practices. * Address fourth round of Gemini Code Assist review comments Fixed two critical issues identified in the fourth review: 1. HIGH: Handle BlobAlreadyExists in append blob creation - Problem: If append blob already exists, Create() fails causing replication failure - Fix: Added bloberror.HasCode(err, bloberror.BlobAlreadyExists) check - Behavior: Existing append blobs are now acceptable, appends can proceed - Impact: Makes replication sink more robust, prevents unnecessary failures - Location: azure_sink.go CreateEntry function 2. MEDIUM: Configure custom retry policy for download resiliency - Problem: Old SDK had MaxRetryRequests: 20, new SDK defaults to 3 retries - Fix: Configured policy.RetryOptions with MaxRetries: 10 - Settings: TryTimeout=1min, RetryDelay=2s, MaxRetryDelay=1min - Impact: Maintains similar resiliency in unreliable network conditions - Location: azure_storage_client.go client initialization Changes: - Added import: github.com/Azure/azure-sdk-for-go/sdk/azcore/policy - Updated NewClientWithSharedKeyCredential to include ClientOptions with retry policy - Updated CreateEntry error handling to allow BlobAlreadyExists Technical details: - Retry policy uses exponential backoff (default SDK behavior) - MaxRetries=10 provides good balance (was 20 in old SDK, default is 3) - TryTimeout prevents individual requests from hanging indefinitely - BlobAlreadyExists handling allows idempotent append operations All tests pass. Build succeeds. Code is more resilient and robust. * Update weed/replication/sink/azuresink/azure_sink.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Revert "Update weed/replication/sink/azuresink/azure_sink.go" This reverts commit 605e41cadf4aaa3bb7b1796f71233ff73d90ed72. * Address fifth round of Gemini Code Assist review comment Added retry policy to azure_sink.go for consistency and resiliency: 1. MEDIUM: Configure retry policy in azure_sink.go client - Problem: azure_sink.go was using default retry policy (3 retries) while azure_storage_client.go had custom policy (10 retries) - Fix: Added same retry policy configuration for consistency - Settings: MaxRetries=10, TryTimeout=1min, RetryDelay=2s, MaxRetryDelay=1min - Impact: Replication sink now has same resiliency as storage client - Rationale: Replication sink needs to be robust against transient network errors Changes: - Added import: github.com/Azure/azure-sdk-for-go/sdk/azcore/policy - Updated NewClientWithSharedKeyCredential call in initialize() function - Both azure_storage_client.go and azure_sink.go now have identical retry policies Benefits: - Consistency: Both Azure clients now use same retry configuration - Resiliency: Replication operations more robust to network issues - Best practices: Follows Azure SDK recommended patterns for production use All tests pass. Build succeeds. Code is consistent and production-ready. * fmt * Address sixth round of Gemini Code Assist review comment Fixed HIGH priority metadata key validation for Azure compliance: 1. HIGH: Handle metadata keys starting with digits - Problem: Azure Blob Storage requires metadata keys to be valid C# identifiers - Constraint: C# identifiers cannot start with a digit (0-9) - Issue: S3 metadata like 'x-amz-meta-123key' would fail with InvalidInput error - Fix: Prefix keys starting with digits with underscore '_' - Example: '123key' becomes '_123key', '456-test' becomes '_456_test' 2. Code improvement: Use strings.ReplaceAll for better readability - Changed from: strings.Replace(str, "-", "_", -1) - Changed to: strings.ReplaceAll(str, "-", "_") - Both are functionally equivalent, ReplaceAll is more readable Changes: - Updated toMetadata() function in azure_storage_client.go - Added digit prefix check: if key[0] >= '0' && key[0] <= '9' - Added comprehensive test case 'keys starting with digits' - Tests cover: '123key' -> '_123key', '456-test' -> '_456_test', '789' -> '_789' Technical details: - Azure SDK validates metadata keys as C# identifiers - C# identifier rules: must start with letter or underscore - Digits allowed in identifiers but not as first character - This prevents SetMetadata() and UploadStream() failures All tests pass including new test case. Build succeeds. Code is now fully compliant with Azure metadata requirements. * Address seventh round of Gemini Code Assist review comment Normalize metadata keys to lowercase for S3 compatibility: 1. MEDIUM: Convert metadata keys to lowercase - Rationale: S3 specification stores user-defined metadata keys in lowercase - Consistency: Azure Blob Storage metadata is case-insensitive - Best practice: Normalizing to lowercase ensures consistent behavior - Example: 'x-amz-meta-My-Key' -> 'my_key' (not 'My_Key') Changes: - Updated toMetadata() to apply strings.ToLower() to keys - Added comment explaining S3 lowercase normalization - Order of operations: strip prefix -> lowercase -> replace dashes -> check digits Test coverage: - Added new test case 'uppercase and mixed case keys' - Tests: 'My-Key' -> 'my_key', 'UPPERCASE' -> 'uppercase', 'MiXeD-CaSe' -> 'mixed_case' - All 6 test cases pass Benefits: - S3 compatibility: Matches S3 metadata key behavior - Azure consistency: Case-insensitive keys work predictably - Cross-platform: Same metadata keys work identically on both S3 and Azure - Prevents issues: No surprises from case-sensitive key handling Implementation: ```go key := strings.ReplaceAll(strings.ToLower(k[len(s3_constants.AmzUserMetaPrefix):]), "-", "_") ``` All tests pass. Build succeeds. Metadata handling is now fully S3-compatible. * Address eighth round of Gemini Code Assist review comments Use %w instead of %v for error wrapping across both files: 1. MEDIUM: Error wrapping in azure_storage_client.go - Problem: Using %v in fmt.Errorf loses error type information - Modern Go practice: Use %w to preserve error chains - Benefit: Enables errors.Is() and errors.As() for callers - Example: Can check for bloberror.BlobNotFound after wrapping 2. MEDIUM: Error wrapping in azure_sink.go - Applied same improvement for consistency - All error wrapping now preserves underlying errors - Improved debugging and error handling capabilities Changes applied to all fmt.Errorf calls: - azure_storage_client.go: 10 instances changed from %v to %w - Invalid credential error - Client creation error - Traverse errors - Download errors (2) - Upload error - Delete error - Create/Delete bucket errors (2) - azure_sink.go: 3 instances changed from %v to %w - Credential creation error - Client creation error - Delete entry error - Create append blob error Benefits: - Error inspection: Callers can use errors.Is(err, target) - Error unwrapping: Callers can use errors.As(err, &target) - Type preservation: Original error types maintained through wraps - Better debugging: Full error chain available for inspection - Modern Go: Follows Go 1.13+ error wrapping best practices Example usage after this change: ```go err := client.ReadFile(...) if errors.Is(err, bloberror.BlobNotFound) { // Can detect specific Azure errors even after wrapping } ``` All tests pass. Build succeeds. Error handling is now modern and robust. * Address ninth round of Gemini Code Assist review comment Improve metadata key sanitization with comprehensive character validation: 1. MEDIUM: Complete Azure C# identifier validation - Problem: Previous implementation only handled dashes, not all invalid chars - Issue: Keys like 'my.key', 'key+plus', 'key@symbol' would cause InvalidMetadata - Azure requirement: Metadata keys must be valid C# identifiers - Valid characters: letters (a-z, A-Z), digits (0-9), underscore (_) only 2. Implemented robust regex-based sanitization - Added package-level regex: `[^a-zA-Z0-9_]` - Matches ANY character that's not alphanumeric or underscore - Replaces all invalid characters with underscore - Compiled once at package init for performance Implementation details: - Regex declared at package level: var invalidMetadataChars = regexp.MustCompile(`[^a-zA-Z0-9_]`) - Avoids recompiling regex on every toMetadata() call - Efficient single-pass replacement of all invalid characters - Processing order: lowercase -> regex replace -> digit check Examples of character transformations: - Dots: 'my.key' -> 'my_key' - Plus: 'key+plus' -> 'key_plus' - At symbol: 'key@symbol' -> 'key_symbol' - Mixed: 'key-with.' -> 'key_with_' - Slash: 'key/slash' -> 'key_slash' - Combined: '123-key.value+test' -> '_123_key_value_test' Test coverage: - Added comprehensive test case 'keys with invalid characters' - Tests: dot, plus, at-symbol, dash+dot, slash - All 7 test cases pass (was 6, now 7) Benefits: - Complete Azure compliance: Handles ALL invalid characters - Robust: Works with any S3 metadata key format - Performant: Regex compiled once, reused efficiently - Maintainable: Single source of truth for valid characters - Prevents errors: No more InvalidMetadata errors during upload All tests pass. Build succeeds. Metadata sanitization is now bulletproof. * Address tenth round review - HIGH: Fix metadata key collision issue Prevent metadata loss by using hex encoding for invalid characters: 1. HIGH PRIORITY: Metadata key collision prevention - Critical Issue: Different S3 keys mapping to same Azure key causes data loss - Example collisions (BEFORE): * 'my-key' -> 'my_key' * 'my.key' -> 'my_key' ❌ COLLISION! Second overwrites first * 'my_key' -> 'my_key' ❌ All three map to same key! - Fixed with hex encoding (AFTER): * 'my-key' -> 'my_2d_key' (dash = 0x2d) * 'my.key' -> 'my_2e_key' (dot = 0x2e) * 'my_key' -> 'my_key' (underscore is valid) ✅ All three are now unique! 2. Implemented collision-proof hex encoding - Pattern: Invalid chars -> _XX_ where XX is hex code - Dash (0x2d): 'content-type' -> 'content_2d_type' - Dot (0x2e): 'my.key' -> 'my_2e_key' - Plus (0x2b): 'key+plus' -> 'key_2b_plus' - At (0x40): 'key@symbol' -> 'key_40_symbol' - Slash (0x2f): 'key/slash' -> 'key_2f_slash' 3. Created sanitizeMetadataKey() function - Encapsulates hex encoding logic - Uses ReplaceAllStringFunc for efficient transformation - Maintains digit prefix check for Azure C# identifier rules - Clear documentation with examples Implementation details: ```go func sanitizeMetadataKey(key string) string { // Replace each invalid character with _XX_ where XX is the hex code result := invalidMetadataChars.ReplaceAllStringFunc(key, func(s string) string { return fmt.Sprintf("_%02x_", s[0]) }) // Azure metadata keys cannot start with a digit if len(result) > 0 && result[0] >= '0' && result[0] <= '9' { result = "_" + result } return result } ``` Why hex encoding solves the collision problem: - Each invalid character gets unique hex representation - Two-digit hex ensures no confusion (always _XX_ format) - Preserves all information from original key - Reversible (though not needed for this use case) - Azure-compliant (hex codes don't introduce new invalid chars) Test coverage: - Updated all test expectations to match hex encoding - Added 'collision prevention' test case demonstrating uniqueness: * Tests my-key, my.key, my_key all produce different results * Proves metadata from different S3 keys won't collide - Total test cases: 8 (was 7, added collision prevention) Examples from tests: - 'content-type' -> 'content_2d_type' (0x2d = dash) - '456-test' -> '_456_2d_test' (digit prefix + dash) - 'My-Key' -> 'my_2d_key' (lowercase + hex encode dash) - 'key-with.' -> 'key_2d_with_2e_' (multiple chars: dash, dot, trailing dot) Benefits: - ✅ Zero collision risk: Every unique S3 key -> unique Azure key - ✅ Data integrity: No metadata loss from overwrites - ✅ Complete info preservation: Original key distinguishable - ✅ Azure compliant: Hex-encoded keys are valid C# identifiers - ✅ Maintainable: Clean function with clear purpose - ✅ Testable: Collision prevention explicitly tested All tests pass. Build succeeds. Metadata integrity is now guaranteed. --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-10-08 23:12:03 -07:00
glog.V(1).Infof("automatic volume grow disabled")
2025-09-04 15:39:56 +03:00
return
}
glog.V(1).Infoln("starting automatic volume grow")
start := time.Now()
newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count)
glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start))
if err != nil {
glog.V(1).Infof("automatic volume grow failed: %+v", err)
return
}
for _, newVidLocation := range newVidLocations {
ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation})
}
}
2021-05-06 18:46:14 +08:00
func (ms *MasterServer) ProcessGrowRequest() {
go func() {
ctx := context.Background()
firstRun := true
for {
if firstRun {
firstRun = false
} else {
time.Sleep(5*time.Minute + time.Duration(30*rand.Float32())*time.Second)
}
if !ms.Topo.IsLeader() {
continue
}
dcs := ms.Topo.ListDCAndRacks()
var err error
for _, vlc := range ms.Topo.ListVolumeLayoutCollections() {
vl := vlc.VolumeLayout
lastGrowCount := vl.GetLastGrowCount()
if vl.HasGrowRequest() {
continue
}
writable, crowded := vl.GetWritableVolumeCount()
mustGrow := int(lastGrowCount) - writable
vgr := vlc.ToVolumeGrowRequest()
stats.MasterVolumeLayoutWritable.WithLabelValues(vlc.Collection, vgr.DiskType, vgr.Replication, vgr.Ttl).Set(float64(writable))
stats.MasterVolumeLayoutCrowded.WithLabelValues(vlc.Collection, vgr.DiskType, vgr.Replication, vgr.Ttl).Set(float64(crowded))
switch {
case mustGrow > 0:
vgr.WritableVolumeCount = uint32(mustGrow)
_, err = ms.VolumeGrow(ctx, vgr)
case lastGrowCount > 0 && writable < int(lastGrowCount*2) && float64(crowded+volumeGrowStepCount) > float64(writable)*topology.VolumeGrowStrategy.Threshold:
vgr.WritableVolumeCount = volumeGrowStepCount
_, err = ms.VolumeGrow(ctx, vgr)
}
if err != nil {
glog.V(0).Infof("volume grow request failed: %+v", err)
}
writableVolumes := vl.CloneWritableVolumes()
for dcId, racks := range dcs {
for _, rackId := range racks {
if vl.ShouldGrowVolumesByDcAndRack(&writableVolumes, dcId, rackId) {
vgr.DataCenter = string(dcId)
vgr.Rack = string(rackId)
if lastGrowCount > 0 {
vgr.WritableVolumeCount = uint32(math.Ceil(float64(lastGrowCount) / float64(len(dcs)*len(racks))))
} else {
vgr.WritableVolumeCount = volumeGrowStepCount
}
if _, err = ms.VolumeGrow(ctx, vgr); err != nil {
glog.V(0).Infof("volume grow request for dc:%s rack:%s failed: %+v", dcId, rackId, err)
}
}
}
}
}
}
}()
2021-05-06 18:46:14 +08:00
go func() {
filter := sync.Map{}
for {
2024-04-18 08:47:45 -07:00
req, ok := <-ms.volumeGrowthRequestChan
2021-05-06 18:46:14 +08:00
if !ok {
break
}
option := req.Option
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
2021-05-06 18:46:14 +08:00
if !ms.Topo.IsLeader() {
//discard buffered requests
time.Sleep(time.Second * 1)
vl.DoneGrowRequest()
2021-05-06 18:46:14 +08:00
continue
}
// filter out identical requests being processed
found := false
filter.Range(func(k, v interface{}) bool {
2024-09-07 11:54:12 -07:00
existingReq := k.(*topology.VolumeGrowRequest)
if existingReq.Equals(req) {
2021-05-06 18:46:14 +08:00
found = true
}
return !found
})
// not atomic but it's okay
if found || (!req.Force && !vl.ShouldGrowVolumes()) {
2021-05-06 18:46:14 +08:00
glog.V(4).Infoln("discard volume grow request")
time.Sleep(time.Millisecond * 211)
vl.DoneGrowRequest()
continue
2021-05-06 18:46:14 +08:00
}
filter.Store(req, nil)
// we have lock called inside vg
2024-09-07 12:38:34 -07:00
glog.V(0).Infof("volume grow %+v", req)
go func(req *topology.VolumeGrowRequest, vl *topology.VolumeLayout) {
ms.DoAutomaticVolumeGrow(req)
vl.DoneGrowRequest()
filter.Delete(req)
}(req, vl)
2021-05-06 18:46:14 +08:00
}
}()
}
func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupVolumeRequest) (*master_pb.LookupVolumeResponse, error) {
resp := &master_pb.LookupVolumeResponse{}
2021-08-12 21:40:33 -07:00
volumeLocations := ms.lookupVolumeId(req.VolumeOrFileIds, req.Collection)
for _, volumeOrFileId := range req.VolumeOrFileIds {
vid := volumeOrFileId
commaSep := strings.Index(vid, ",")
if commaSep > 0 {
vid = vid[0:commaSep]
}
if result, found := volumeLocations[vid]; found {
var locations []*master_pb.Location
for _, loc := range result.Locations {
locations = append(locations, &master_pb.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
DataCenter: loc.DataCenter,
GrpcPort: uint32(loc.GrpcPort),
})
}
var auth string
if commaSep > 0 { // this is a file id
auth = string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, result.VolumeOrFileId))
}
resp.VolumeIdLocations = append(resp.VolumeIdLocations, &master_pb.LookupVolumeResponse_VolumeIdLocation{
VolumeOrFileId: result.VolumeOrFileId,
Locations: locations,
Error: result.Error,
Auth: auth,
})
2021-08-12 21:40:33 -07:00
}
}
return resp, nil
}
func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.StatisticsRequest) (*master_pb.StatisticsResponse, error) {
if !ms.Topo.IsLeader() {
return nil, raft.NotLeaderError
}
if req.Replication == "" {
2019-06-23 03:08:27 -07:00
req.Replication = ms.option.DefaultReplicaPlacement
}
2019-12-23 12:48:20 -08:00
replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
2019-04-18 21:43:36 -07:00
ttl, err := needle.ReadTTL(req.Ttl)
if err != nil {
return nil, err
}
2021-02-16 03:03:00 -08:00
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
stats := volumeLayout.Stats()
2022-06-26 12:21:38 -07:00
totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
resp := &master_pb.StatisticsResponse{
TotalSize: uint64(totalSize),
UsedSize: stats.UsedSize,
FileCount: stats.FileCount,
}
return resp, nil
}
2019-03-17 20:27:08 -07:00
func (ms *MasterServer) VolumeList(ctx context.Context, req *master_pb.VolumeListRequest) (*master_pb.VolumeListResponse, error) {
if !ms.Topo.IsLeader() {
return nil, raft.NotLeaderError
}
resp := &master_pb.VolumeListResponse{
2019-05-05 21:17:23 -07:00
TopologyInfo: ms.Topo.ToTopologyInfo(),
2019-06-23 03:08:27 -07:00
VolumeSizeLimitMb: uint64(ms.option.VolumeSizeLimitMB),
2019-03-17 20:27:08 -07:00
}
return resp, nil
}
func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.LookupEcVolumeRequest) (*master_pb.LookupEcVolumeResponse, error) {
if !ms.Topo.IsLeader() {
return nil, raft.NotLeaderError
}
resp := &master_pb.LookupEcVolumeResponse{}
ecLocations, found := ms.Topo.LookupEcShards(needle.VolumeId(req.VolumeId))
if !found {
return resp, fmt.Errorf("ec volume %d not found", req.VolumeId)
}
resp.VolumeId = req.VolumeId
for shardId, shardLocations := range ecLocations.Locations {
var locations []*master_pb.Location
for _, dn := range shardLocations {
locations = append(locations, &master_pb.Location{
Url: string(dn.Id()),
PublicUrl: dn.PublicUrl,
DataCenter: dn.GetDataCenterId(),
})
}
resp.ShardIdLocations = append(resp.ShardIdLocations, &master_pb.LookupEcVolumeResponse_EcShardIdLocation{
ShardId: uint32(shardId),
Locations: locations,
})
}
return resp, nil
}
2020-11-28 23:18:02 -08:00
func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumVolumeRequest) (*master_pb.VacuumVolumeResponse, error) {
if !ms.Topo.IsLeader() {
return nil, raft.NotLeaderError
}
resp := &master_pb.VacuumVolumeResponse{}
ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.option.MaxParallelVacuumPerServer, req.VolumeId, req.Collection, ms.preallocateSize, false)
2020-11-28 23:18:02 -08:00
return resp, nil
}
func (ms *MasterServer) DisableVacuum(ctx context.Context, req *master_pb.DisableVacuumRequest) (*master_pb.DisableVacuumResponse, error) {
ms.Topo.DisableVacuum()
resp := &master_pb.DisableVacuumResponse{}
return resp, nil
}
func (ms *MasterServer) EnableVacuum(ctx context.Context, req *master_pb.EnableVacuumRequest) (*master_pb.EnableVacuumResponse, error) {
ms.Topo.EnableVacuum()
resp := &master_pb.EnableVacuumResponse{}
return resp, nil
}
func (ms *MasterServer) VolumeMarkReadonly(ctx context.Context, req *master_pb.VolumeMarkReadonlyRequest) (*master_pb.VolumeMarkReadonlyResponse, error) {
if !ms.Topo.IsLeader() {
return nil, raft.NotLeaderError
}
resp := &master_pb.VolumeMarkReadonlyResponse{}
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(req.ReplicaPlacement))
vl := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, needle.LoadTTLFromUint32(req.Ttl), types.ToDiskType(req.DiskType))
dataNodes := ms.Topo.Lookup(req.Collection, needle.VolumeId(req.VolumeId))
for _, dn := range dataNodes {
if dn.Ip == req.Ip && dn.Port == int(req.Port) {
if req.IsReadonly {
vl.SetVolumeReadOnly(dn, needle.VolumeId(req.VolumeId))
} else {
vl.SetVolumeWritable(dn, needle.VolumeId(req.VolumeId))
}
}
}
return resp, nil
}
func (ms *MasterServer) VolumeGrow(ctx context.Context, req *master_pb.VolumeGrowRequest) (*master_pb.VolumeGrowResponse, error) {
if !ms.Topo.IsLeader() {
return nil, raft.NotLeaderError
}
if req.Replication == "" {
req.Replication = ms.option.DefaultReplicaPlacement
}
replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
ttl, err := needle.ReadTTL(req.Ttl)
if err != nil {
return nil, err
}
if req.DataCenter != "" && !ms.Topo.DataCenterExists(req.DataCenter) {
return nil, fmt.Errorf("data center not exists")
}
2025-06-19 13:54:54 -07:00
ver := needle.GetCurrentVersion()
volumeGrowOption := topology.VolumeGrowOption{
Collection: req.Collection,
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
DiskType: types.ToDiskType(req.DiskType),
Preallocate: ms.preallocateSize,
DataCenter: req.DataCenter,
Rack: req.Rack,
DataNode: req.DataNode,
MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
2025-06-19 13:54:54 -07:00
Version: uint32(ver),
}
volumeGrowRequest := topology.VolumeGrowRequest{
Option: &volumeGrowOption,
Count: req.WritableVolumeCount,
Force: true,
Reason: "grpc volume grow",
}
replicaCount := int64(req.WritableVolumeCount * uint32(replicaPlacement.GetCopyCount()))
if ms.Topo.AvailableSpaceFor(&volumeGrowOption) < replicaCount {
return nil, fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(&volumeGrowOption), replicaCount)
}
if !ms.Topo.DataCenterExists(volumeGrowOption.DataCenter) {
err = fmt.Errorf("data center %v not found in topology", volumeGrowOption.DataCenter)
}
ms.DoAutomaticVolumeGrow(&volumeGrowRequest)
return &master_pb.VolumeGrowResponse{}, nil
}