From 96a27493558fae907eedb7fcf6aa1b4caf0f138a Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 11 Aug 2025 16:59:55 -0700 Subject: [PATCH] adding EcVacuumTaskConfig --- weed/pb/worker.proto | 35 +---- weed/pb/worker_pb/worker.pb.go | 206 +++++++++++++++++++------- weed/worker/tasks/ec_vacuum/config.go | 21 ++- 3 files changed, 173 insertions(+), 89 deletions(-) diff --git a/weed/pb/worker.proto b/weed/pb/worker.proto index b9e3d61d0..18cd78219 100644 --- a/weed/pb/worker.proto +++ b/weed/pb/worker.proto @@ -292,39 +292,18 @@ message TaskPolicy { // Typed task-specific configuration (replaces generic map) oneof task_config { - VacuumTaskConfig vacuum_config = 5; - ErasureCodingTaskConfig erasure_coding_config = 6; - BalanceTaskConfig balance_config = 7; - ReplicationTaskConfig replication_config = 8; + EcVacuumTaskConfig ec_vacuum_config = 5; } } // Task-specific configuration messages -// VacuumTaskConfig contains vacuum-specific configuration -message VacuumTaskConfig { - double garbage_threshold = 1; // Minimum garbage ratio to trigger vacuum (0.0-1.0) - int32 min_volume_age_hours = 2; // Minimum age before vacuum is considered - int32 min_interval_seconds = 3; // Minimum time between vacuum operations on the same volume -} - -// ErasureCodingTaskConfig contains EC-specific configuration -message ErasureCodingTaskConfig { - double fullness_ratio = 1; // Minimum fullness ratio to trigger EC (0.0-1.0) - int32 quiet_for_seconds = 2; // Minimum quiet time before EC - int32 min_volume_size_mb = 3; // Minimum volume size for EC - string collection_filter = 4; // Only process volumes from specific collections -} - -// BalanceTaskConfig contains balance-specific configuration -message BalanceTaskConfig { - double imbalance_threshold = 1; // Threshold for triggering rebalancing (0.0-1.0) - int32 min_server_count = 2; // Minimum number of servers required for balancing -} - -// ReplicationTaskConfig contains replication-specific configuration -message ReplicationTaskConfig { - int32 target_replica_count = 1; // Target number of replicas +// EcVacuumTaskConfig contains EC vacuum-specific configuration +message EcVacuumTaskConfig { + double deletion_threshold = 1; // Minimum deletion ratio to trigger vacuum (0.0-1.0) + int32 min_volume_age_seconds = 2; // Minimum age before considering vacuum (in seconds) + string collection_filter = 3; // Only vacuum EC volumes in this collection (empty = all) + int32 min_size_mb = 4; // Minimum original EC volume size to consider (in MB) } // ========== Task Persistence Messages ========== diff --git a/weed/pb/worker_pb/worker.pb.go b/weed/pb/worker_pb/worker.pb.go index 7ff5a8a36..02cbe3312 100644 --- a/weed/pb/worker_pb/worker.pb.go +++ b/weed/pb/worker_pb/worker.pb.go @@ -2387,6 +2387,7 @@ type TaskPolicy struct { // *TaskPolicy_ErasureCodingConfig // *TaskPolicy_BalanceConfig // *TaskPolicy_ReplicationConfig + // *TaskPolicy_EcVacuumConfig TaskConfig isTaskPolicy_TaskConfig `protobuf_oneof:"task_config"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -2493,6 +2494,15 @@ func (x *TaskPolicy) GetReplicationConfig() *ReplicationTaskConfig { return nil } +func (x *TaskPolicy) GetEcVacuumConfig() *EcVacuumTaskConfig { + if x != nil { + if x, ok := x.TaskConfig.(*TaskPolicy_EcVacuumConfig); ok { + return x.EcVacuumConfig + } + } + return nil +} + type isTaskPolicy_TaskConfig interface { isTaskPolicy_TaskConfig() } @@ -2513,6 +2523,10 @@ type TaskPolicy_ReplicationConfig struct { ReplicationConfig *ReplicationTaskConfig `protobuf:"bytes,8,opt,name=replication_config,json=replicationConfig,proto3,oneof"` } +type TaskPolicy_EcVacuumConfig struct { + EcVacuumConfig *EcVacuumTaskConfig `protobuf:"bytes,9,opt,name=ec_vacuum_config,json=ecVacuumConfig,proto3,oneof"` +} + func (*TaskPolicy_VacuumConfig) isTaskPolicy_TaskConfig() {} func (*TaskPolicy_ErasureCodingConfig) isTaskPolicy_TaskConfig() {} @@ -2521,6 +2535,8 @@ func (*TaskPolicy_BalanceConfig) isTaskPolicy_TaskConfig() {} func (*TaskPolicy_ReplicationConfig) isTaskPolicy_TaskConfig() {} +func (*TaskPolicy_EcVacuumConfig) isTaskPolicy_TaskConfig() {} + // VacuumTaskConfig contains vacuum-specific configuration type VacuumTaskConfig struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -2749,6 +2765,75 @@ func (x *ReplicationTaskConfig) GetTargetReplicaCount() int32 { return 0 } +// EcVacuumTaskConfig contains EC vacuum-specific configuration +type EcVacuumTaskConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + DeletionThreshold float64 `protobuf:"fixed64,1,opt,name=deletion_threshold,json=deletionThreshold,proto3" json:"deletion_threshold,omitempty"` // Minimum deletion ratio to trigger vacuum (0.0-1.0) + MinVolumeAgeSeconds int32 `protobuf:"varint,2,opt,name=min_volume_age_seconds,json=minVolumeAgeSeconds,proto3" json:"min_volume_age_seconds,omitempty"` // Minimum age before considering vacuum (in seconds) + CollectionFilter string `protobuf:"bytes,3,opt,name=collection_filter,json=collectionFilter,proto3" json:"collection_filter,omitempty"` // Only vacuum EC volumes in this collection (empty = all) + MinSizeMb int32 `protobuf:"varint,4,opt,name=min_size_mb,json=minSizeMb,proto3" json:"min_size_mb,omitempty"` // Minimum original EC volume size to consider (in MB) + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *EcVacuumTaskConfig) Reset() { + *x = EcVacuumTaskConfig{} + mi := &file_worker_proto_msgTypes[31] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *EcVacuumTaskConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EcVacuumTaskConfig) ProtoMessage() {} + +func (x *EcVacuumTaskConfig) ProtoReflect() protoreflect.Message { + mi := &file_worker_proto_msgTypes[31] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EcVacuumTaskConfig.ProtoReflect.Descriptor instead. +func (*EcVacuumTaskConfig) Descriptor() ([]byte, []int) { + return file_worker_proto_rawDescGZIP(), []int{31} +} + +func (x *EcVacuumTaskConfig) GetDeletionThreshold() float64 { + if x != nil { + return x.DeletionThreshold + } + return 0 +} + +func (x *EcVacuumTaskConfig) GetMinVolumeAgeSeconds() int32 { + if x != nil { + return x.MinVolumeAgeSeconds + } + return 0 +} + +func (x *EcVacuumTaskConfig) GetCollectionFilter() string { + if x != nil { + return x.CollectionFilter + } + return "" +} + +func (x *EcVacuumTaskConfig) GetMinSizeMb() int32 { + if x != nil { + return x.MinSizeMb + } + return 0 +} + // MaintenanceTaskData represents complete task state for persistence type MaintenanceTaskData struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -2783,7 +2868,7 @@ type MaintenanceTaskData struct { func (x *MaintenanceTaskData) Reset() { *x = MaintenanceTaskData{} - mi := &file_worker_proto_msgTypes[31] + mi := &file_worker_proto_msgTypes[32] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2795,7 +2880,7 @@ func (x *MaintenanceTaskData) String() string { func (*MaintenanceTaskData) ProtoMessage() {} func (x *MaintenanceTaskData) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[31] + mi := &file_worker_proto_msgTypes[32] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2808,7 +2893,7 @@ func (x *MaintenanceTaskData) ProtoReflect() protoreflect.Message { // Deprecated: Use MaintenanceTaskData.ProtoReflect.Descriptor instead. func (*MaintenanceTaskData) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{31} + return file_worker_proto_rawDescGZIP(), []int{32} } func (x *MaintenanceTaskData) GetId() string { @@ -2993,7 +3078,7 @@ type TaskAssignmentRecord struct { func (x *TaskAssignmentRecord) Reset() { *x = TaskAssignmentRecord{} - mi := &file_worker_proto_msgTypes[32] + mi := &file_worker_proto_msgTypes[33] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3005,7 +3090,7 @@ func (x *TaskAssignmentRecord) String() string { func (*TaskAssignmentRecord) ProtoMessage() {} func (x *TaskAssignmentRecord) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[32] + mi := &file_worker_proto_msgTypes[33] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3018,7 +3103,7 @@ func (x *TaskAssignmentRecord) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskAssignmentRecord.ProtoReflect.Descriptor instead. func (*TaskAssignmentRecord) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{32} + return file_worker_proto_rawDescGZIP(), []int{33} } func (x *TaskAssignmentRecord) GetWorkerId() string { @@ -3070,7 +3155,7 @@ type TaskCreationMetrics struct { func (x *TaskCreationMetrics) Reset() { *x = TaskCreationMetrics{} - mi := &file_worker_proto_msgTypes[33] + mi := &file_worker_proto_msgTypes[34] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3082,7 +3167,7 @@ func (x *TaskCreationMetrics) String() string { func (*TaskCreationMetrics) ProtoMessage() {} func (x *TaskCreationMetrics) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[33] + mi := &file_worker_proto_msgTypes[34] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3095,7 +3180,7 @@ func (x *TaskCreationMetrics) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskCreationMetrics.ProtoReflect.Descriptor instead. func (*TaskCreationMetrics) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{33} + return file_worker_proto_rawDescGZIP(), []int{34} } func (x *TaskCreationMetrics) GetTriggerMetric() string { @@ -3152,7 +3237,7 @@ type VolumeHealthMetrics struct { func (x *VolumeHealthMetrics) Reset() { *x = VolumeHealthMetrics{} - mi := &file_worker_proto_msgTypes[34] + mi := &file_worker_proto_msgTypes[35] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3164,7 +3249,7 @@ func (x *VolumeHealthMetrics) String() string { func (*VolumeHealthMetrics) ProtoMessage() {} func (x *VolumeHealthMetrics) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[34] + mi := &file_worker_proto_msgTypes[35] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3177,7 +3262,7 @@ func (x *VolumeHealthMetrics) ProtoReflect() protoreflect.Message { // Deprecated: Use VolumeHealthMetrics.ProtoReflect.Descriptor instead. func (*VolumeHealthMetrics) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{34} + return file_worker_proto_rawDescGZIP(), []int{35} } func (x *VolumeHealthMetrics) GetTotalSize() uint64 { @@ -3262,7 +3347,7 @@ type TaskStateFile struct { func (x *TaskStateFile) Reset() { *x = TaskStateFile{} - mi := &file_worker_proto_msgTypes[35] + mi := &file_worker_proto_msgTypes[36] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3274,7 +3359,7 @@ func (x *TaskStateFile) String() string { func (*TaskStateFile) ProtoMessage() {} func (x *TaskStateFile) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[35] + mi := &file_worker_proto_msgTypes[36] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3287,7 +3372,7 @@ func (x *TaskStateFile) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskStateFile.ProtoReflect.Descriptor instead. func (*TaskStateFile) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{35} + return file_worker_proto_rawDescGZIP(), []int{36} } func (x *TaskStateFile) GetTask() *MaintenanceTaskData { @@ -3544,7 +3629,7 @@ const file_worker_proto_rawDesc = "" + "\x1edefault_check_interval_seconds\x18\x04 \x01(\x05R\x1bdefaultCheckIntervalSeconds\x1aV\n" + "\x11TaskPoliciesEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12+\n" + - "\x05value\x18\x02 \x01(\v2\x15.worker_pb.TaskPolicyR\x05value:\x028\x01\"\x82\x04\n" + + "\x05value\x18\x02 \x01(\v2\x15.worker_pb.TaskPolicyR\x05value:\x028\x01\"\xcd\x04\n" + "\n" + "TaskPolicy\x12\x18\n" + "\aenabled\x18\x01 \x01(\bR\aenabled\x12%\n" + @@ -3554,7 +3639,8 @@ const file_worker_proto_rawDesc = "" + "\rvacuum_config\x18\x05 \x01(\v2\x1b.worker_pb.VacuumTaskConfigH\x00R\fvacuumConfig\x12X\n" + "\x15erasure_coding_config\x18\x06 \x01(\v2\".worker_pb.ErasureCodingTaskConfigH\x00R\x13erasureCodingConfig\x12E\n" + "\x0ebalance_config\x18\a \x01(\v2\x1c.worker_pb.BalanceTaskConfigH\x00R\rbalanceConfig\x12Q\n" + - "\x12replication_config\x18\b \x01(\v2 .worker_pb.ReplicationTaskConfigH\x00R\x11replicationConfigB\r\n" + + "\x12replication_config\x18\b \x01(\v2 .worker_pb.ReplicationTaskConfigH\x00R\x11replicationConfig\x12I\n" + + "\x10ec_vacuum_config\x18\t \x01(\v2\x1d.worker_pb.EcVacuumTaskConfigH\x00R\x0eecVacuumConfigB\r\n" + "\vtask_config\"\xa2\x01\n" + "\x10VacuumTaskConfig\x12+\n" + "\x11garbage_threshold\x18\x01 \x01(\x01R\x10garbageThreshold\x12/\n" + @@ -3569,7 +3655,12 @@ const file_worker_proto_rawDesc = "" + "\x13imbalance_threshold\x18\x01 \x01(\x01R\x12imbalanceThreshold\x12(\n" + "\x10min_server_count\x18\x02 \x01(\x05R\x0eminServerCount\"I\n" + "\x15ReplicationTaskConfig\x120\n" + - "\x14target_replica_count\x18\x01 \x01(\x05R\x12targetReplicaCount\"\xae\a\n" + + "\x14target_replica_count\x18\x01 \x01(\x05R\x12targetReplicaCount\"\xc5\x01\n" + + "\x12EcVacuumTaskConfig\x12-\n" + + "\x12deletion_threshold\x18\x01 \x01(\x01R\x11deletionThreshold\x123\n" + + "\x16min_volume_age_seconds\x18\x02 \x01(\x05R\x13minVolumeAgeSeconds\x12+\n" + + "\x11collection_filter\x18\x03 \x01(\tR\x10collectionFilter\x12\x1e\n" + + "\vmin_size_mb\x18\x04 \x01(\x05R\tminSizeMb\"\xae\a\n" + "\x13MaintenanceTaskData\x12\x0e\n" + "\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" + "\x04type\x18\x02 \x01(\tR\x04type\x12\x1a\n" + @@ -3658,7 +3749,7 @@ func file_worker_proto_rawDescGZIP() []byte { return file_worker_proto_rawDescData } -var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 45) +var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 46) var file_worker_proto_goTypes = []any{ (*WorkerMessage)(nil), // 0: worker_pb.WorkerMessage (*AdminMessage)(nil), // 1: worker_pb.AdminMessage @@ -3691,20 +3782,21 @@ var file_worker_proto_goTypes = []any{ (*ErasureCodingTaskConfig)(nil), // 28: worker_pb.ErasureCodingTaskConfig (*BalanceTaskConfig)(nil), // 29: worker_pb.BalanceTaskConfig (*ReplicationTaskConfig)(nil), // 30: worker_pb.ReplicationTaskConfig - (*MaintenanceTaskData)(nil), // 31: worker_pb.MaintenanceTaskData - (*TaskAssignmentRecord)(nil), // 32: worker_pb.TaskAssignmentRecord - (*TaskCreationMetrics)(nil), // 33: worker_pb.TaskCreationMetrics - (*VolumeHealthMetrics)(nil), // 34: worker_pb.VolumeHealthMetrics - (*TaskStateFile)(nil), // 35: worker_pb.TaskStateFile - nil, // 36: worker_pb.WorkerRegistration.MetadataEntry - nil, // 37: worker_pb.TaskAssignment.MetadataEntry - nil, // 38: worker_pb.TaskUpdate.MetadataEntry - nil, // 39: worker_pb.TaskComplete.ResultMetadataEntry - nil, // 40: worker_pb.TaskLogMetadata.CustomDataEntry - nil, // 41: worker_pb.TaskLogEntry.FieldsEntry - nil, // 42: worker_pb.MaintenancePolicy.TaskPoliciesEntry - nil, // 43: worker_pb.MaintenanceTaskData.TagsEntry - nil, // 44: worker_pb.TaskCreationMetrics.AdditionalDataEntry + (*EcVacuumTaskConfig)(nil), // 31: worker_pb.EcVacuumTaskConfig + (*MaintenanceTaskData)(nil), // 32: worker_pb.MaintenanceTaskData + (*TaskAssignmentRecord)(nil), // 33: worker_pb.TaskAssignmentRecord + (*TaskCreationMetrics)(nil), // 34: worker_pb.TaskCreationMetrics + (*VolumeHealthMetrics)(nil), // 35: worker_pb.VolumeHealthMetrics + (*TaskStateFile)(nil), // 36: worker_pb.TaskStateFile + nil, // 37: worker_pb.WorkerRegistration.MetadataEntry + nil, // 38: worker_pb.TaskAssignment.MetadataEntry + nil, // 39: worker_pb.TaskUpdate.MetadataEntry + nil, // 40: worker_pb.TaskComplete.ResultMetadataEntry + nil, // 41: worker_pb.TaskLogMetadata.CustomDataEntry + nil, // 42: worker_pb.TaskLogEntry.FieldsEntry + nil, // 43: worker_pb.MaintenancePolicy.TaskPoliciesEntry + nil, // 44: worker_pb.MaintenanceTaskData.TagsEntry + nil, // 45: worker_pb.TaskCreationMetrics.AdditionalDataEntry } var file_worker_proto_depIdxs = []int32{ 2, // 0: worker_pb.WorkerMessage.registration:type_name -> worker_pb.WorkerRegistration @@ -3720,42 +3812,43 @@ var file_worker_proto_depIdxs = []int32{ 17, // 10: worker_pb.AdminMessage.task_cancellation:type_name -> worker_pb.TaskCancellation 19, // 11: worker_pb.AdminMessage.admin_shutdown:type_name -> worker_pb.AdminShutdown 20, // 12: worker_pb.AdminMessage.task_log_request:type_name -> worker_pb.TaskLogRequest - 36, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry + 37, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry 8, // 14: worker_pb.TaskAssignment.params:type_name -> worker_pb.TaskParams - 37, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry + 38, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry 11, // 16: worker_pb.TaskParams.sources:type_name -> worker_pb.TaskSource 12, // 17: worker_pb.TaskParams.targets:type_name -> worker_pb.TaskTarget 9, // 18: worker_pb.TaskParams.vacuum_params:type_name -> worker_pb.VacuumTaskParams 10, // 19: worker_pb.TaskParams.erasure_coding_params:type_name -> worker_pb.ErasureCodingTaskParams 13, // 20: worker_pb.TaskParams.balance_params:type_name -> worker_pb.BalanceTaskParams 14, // 21: worker_pb.TaskParams.replication_params:type_name -> worker_pb.ReplicationTaskParams - 38, // 22: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry - 39, // 23: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry + 39, // 22: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry + 40, // 23: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry 22, // 24: worker_pb.TaskLogResponse.metadata:type_name -> worker_pb.TaskLogMetadata 23, // 25: worker_pb.TaskLogResponse.log_entries:type_name -> worker_pb.TaskLogEntry - 40, // 26: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry - 41, // 27: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry + 41, // 26: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry + 42, // 27: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry 25, // 28: worker_pb.MaintenanceConfig.policy:type_name -> worker_pb.MaintenancePolicy - 42, // 29: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry + 43, // 29: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry 27, // 30: worker_pb.TaskPolicy.vacuum_config:type_name -> worker_pb.VacuumTaskConfig 28, // 31: worker_pb.TaskPolicy.erasure_coding_config:type_name -> worker_pb.ErasureCodingTaskConfig 29, // 32: worker_pb.TaskPolicy.balance_config:type_name -> worker_pb.BalanceTaskConfig 30, // 33: worker_pb.TaskPolicy.replication_config:type_name -> worker_pb.ReplicationTaskConfig - 8, // 34: worker_pb.MaintenanceTaskData.typed_params:type_name -> worker_pb.TaskParams - 32, // 35: worker_pb.MaintenanceTaskData.assignment_history:type_name -> worker_pb.TaskAssignmentRecord - 43, // 36: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry - 33, // 37: worker_pb.MaintenanceTaskData.creation_metrics:type_name -> worker_pb.TaskCreationMetrics - 34, // 38: worker_pb.TaskCreationMetrics.volume_metrics:type_name -> worker_pb.VolumeHealthMetrics - 44, // 39: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry - 31, // 40: worker_pb.TaskStateFile.task:type_name -> worker_pb.MaintenanceTaskData - 26, // 41: worker_pb.MaintenancePolicy.TaskPoliciesEntry.value:type_name -> worker_pb.TaskPolicy - 0, // 42: worker_pb.WorkerService.WorkerStream:input_type -> worker_pb.WorkerMessage - 1, // 43: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage - 43, // [43:44] is the sub-list for method output_type - 42, // [42:43] is the sub-list for method input_type - 42, // [42:42] is the sub-list for extension type_name - 42, // [42:42] is the sub-list for extension extendee - 0, // [0:42] is the sub-list for field type_name + 31, // 34: worker_pb.TaskPolicy.ec_vacuum_config:type_name -> worker_pb.EcVacuumTaskConfig + 8, // 35: worker_pb.MaintenanceTaskData.typed_params:type_name -> worker_pb.TaskParams + 33, // 36: worker_pb.MaintenanceTaskData.assignment_history:type_name -> worker_pb.TaskAssignmentRecord + 44, // 37: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry + 34, // 38: worker_pb.MaintenanceTaskData.creation_metrics:type_name -> worker_pb.TaskCreationMetrics + 35, // 39: worker_pb.TaskCreationMetrics.volume_metrics:type_name -> worker_pb.VolumeHealthMetrics + 45, // 40: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry + 32, // 41: worker_pb.TaskStateFile.task:type_name -> worker_pb.MaintenanceTaskData + 26, // 42: worker_pb.MaintenancePolicy.TaskPoliciesEntry.value:type_name -> worker_pb.TaskPolicy + 0, // 43: worker_pb.WorkerService.WorkerStream:input_type -> worker_pb.WorkerMessage + 1, // 44: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage + 44, // [44:45] is the sub-list for method output_type + 43, // [43:44] is the sub-list for method input_type + 43, // [43:43] is the sub-list for extension type_name + 43, // [43:43] is the sub-list for extension extendee + 0, // [0:43] is the sub-list for field type_name } func init() { file_worker_proto_init() } @@ -3791,6 +3884,7 @@ func file_worker_proto_init() { (*TaskPolicy_ErasureCodingConfig)(nil), (*TaskPolicy_BalanceConfig)(nil), (*TaskPolicy_ReplicationConfig)(nil), + (*TaskPolicy_EcVacuumConfig)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -3798,7 +3892,7 @@ func file_worker_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_worker_proto_rawDesc), len(file_worker_proto_rawDesc)), NumEnums: 0, - NumMessages: 45, + NumMessages: 46, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/worker/tasks/ec_vacuum/config.go b/weed/worker/tasks/ec_vacuum/config.go index 60533be41..5fdaff444 100644 --- a/weed/worker/tasks/ec_vacuum/config.go +++ b/weed/worker/tasks/ec_vacuum/config.go @@ -154,8 +154,14 @@ func (c *Config) ToTaskPolicy() *worker_pb.TaskPolicy { MaxConcurrent: int32(c.MaxConcurrent), RepeatIntervalSeconds: int32(c.ScanIntervalSeconds), CheckIntervalSeconds: int32(c.ScanIntervalSeconds), - // Note: EC vacuum specific config would go in TaskConfig field - // For now using basic policy until protobuf definitions are added + TaskConfig: &worker_pb.TaskPolicy_EcVacuumConfig{ + EcVacuumConfig: &worker_pb.EcVacuumTaskConfig{ + DeletionThreshold: c.DeletionThreshold, + MinVolumeAgeSeconds: int32(c.MinVolumeAgeSeconds), + CollectionFilter: c.CollectionFilter, + MinSizeMb: int32(c.MinSizeMB), + }, + }, } } @@ -170,9 +176,14 @@ func (c *Config) FromTaskPolicy(policy *worker_pb.TaskPolicy) error { c.MaxConcurrent = int(policy.MaxConcurrent) c.ScanIntervalSeconds = int(policy.RepeatIntervalSeconds) - // Note: EC vacuum-specific fields would be loaded from TaskConfig field - // For now using defaults until protobuf definitions are added - // Keep existing values if not specified in policy + // Load EC vacuum-specific fields from TaskConfig field + if ecVacuumConfig := policy.GetEcVacuumConfig(); ecVacuumConfig != nil { + c.DeletionThreshold = ecVacuumConfig.DeletionThreshold + c.MinVolumeAgeSeconds = int(ecVacuumConfig.MinVolumeAgeSeconds) + c.CollectionFilter = ecVacuumConfig.CollectionFilter + c.MinSizeMB = int(ecVacuumConfig.MinSizeMb) + } + // If no EcVacuumConfig found, keep existing values (defaults) return nil }