task ask admin for master address

This commit is contained in:
chrislu
2025-08-12 23:47:12 -07:00
parent c0e6d00bd3
commit 1f8ba5c958
9 changed files with 347 additions and 65 deletions

View File

@@ -631,3 +631,36 @@ func findClientAddress(ctx context.Context) string {
}
return pr.Addr.String()
}
// GetMasterAddresses returns master server addresses to worker
func (s *WorkerGrpcServer) GetMasterAddresses(ctx context.Context, req *worker_pb.GetMasterAddressesRequest) (*worker_pb.GetMasterAddressesResponse, error) {
glog.V(1).Infof("Worker %s requesting master addresses", req.WorkerId)
// Get master addresses from admin server
if s.adminServer.masterClient == nil {
return nil, fmt.Errorf("admin server has no master client configured")
}
// Get current master leader and all master addresses
masterAddresses := s.adminServer.masterClient.GetMasters(ctx)
if len(masterAddresses) == 0 {
return nil, fmt.Errorf("no master addresses available")
}
// Try to get the current leader
leader := s.adminServer.masterClient.GetMaster(ctx)
// Convert pb.ServerAddress slice to string slice
masterAddressStrings := make([]string, len(masterAddresses))
for i, addr := range masterAddresses {
masterAddressStrings[i] = string(addr)
}
response := &worker_pb.GetMasterAddressesResponse{
MasterAddresses: masterAddressStrings,
PrimaryMaster: string(leader),
}
glog.V(1).Infof("Returning %d master addresses to worker %s, leader: %s", len(masterAddresses), req.WorkerId, leader)
return response, nil
}

View File

@@ -94,7 +94,7 @@ func GrpcDial(ctx context.Context, address string, waitForReady bool, opts ...gr
options = append(options, opt)
}
}
return grpc.DialContext(ctx, address, options...)
return grpc.NewClient(address, options...)
}
func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialOption) (*versionedGrpcClient, error) {

View File

@@ -8,6 +8,9 @@ option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb";
service WorkerService {
// WorkerStream maintains a bidirectional stream for worker communication
rpc WorkerStream(stream WorkerMessage) returns (stream AdminMessage);
// GetMasterAddresses returns master server addresses for worker tasks
rpc GetMasterAddresses(GetMasterAddressesRequest) returns (GetMasterAddressesResponse);
}
// WorkerMessage represents messages from worker to admin
@@ -385,4 +388,15 @@ message TaskStateFile {
MaintenanceTaskData task = 1;
int64 last_updated = 2;
string admin_version = 3;
}
// GetMasterAddressesRequest sent by worker to get master server addresses
message GetMasterAddressesRequest {
string worker_id = 1; // Worker identification
}
// GetMasterAddressesResponse returns master addresses to worker
message GetMasterAddressesResponse {
repeated string master_addresses = 1; // List of available master addresses
string primary_master = 2; // Primary master address (if applicable)
}

View File

@@ -3197,6 +3197,104 @@ func (x *TaskStateFile) GetAdminVersion() string {
return ""
}
// GetMasterAddressesRequest sent by worker to get master server addresses
type GetMasterAddressesRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
WorkerId string `protobuf:"bytes,1,opt,name=worker_id,json=workerId,proto3" json:"worker_id,omitempty"` // Worker identification
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GetMasterAddressesRequest) Reset() {
*x = GetMasterAddressesRequest{}
mi := &file_worker_proto_msgTypes[34]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GetMasterAddressesRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetMasterAddressesRequest) ProtoMessage() {}
func (x *GetMasterAddressesRequest) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[34]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetMasterAddressesRequest.ProtoReflect.Descriptor instead.
func (*GetMasterAddressesRequest) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{34}
}
func (x *GetMasterAddressesRequest) GetWorkerId() string {
if x != nil {
return x.WorkerId
}
return ""
}
// GetMasterAddressesResponse returns master addresses to worker
type GetMasterAddressesResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
MasterAddresses []string `protobuf:"bytes,1,rep,name=master_addresses,json=masterAddresses,proto3" json:"master_addresses,omitempty"` // List of available master addresses
PrimaryMaster string `protobuf:"bytes,2,opt,name=primary_master,json=primaryMaster,proto3" json:"primary_master,omitempty"` // Primary master address (if applicable)
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GetMasterAddressesResponse) Reset() {
*x = GetMasterAddressesResponse{}
mi := &file_worker_proto_msgTypes[35]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GetMasterAddressesResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetMasterAddressesResponse) ProtoMessage() {}
func (x *GetMasterAddressesResponse) ProtoReflect() protoreflect.Message {
mi := &file_worker_proto_msgTypes[35]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetMasterAddressesResponse.ProtoReflect.Descriptor instead.
func (*GetMasterAddressesResponse) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{35}
}
func (x *GetMasterAddressesResponse) GetMasterAddresses() []string {
if x != nil {
return x.MasterAddresses
}
return nil
}
func (x *GetMasterAddressesResponse) GetPrimaryMaster() string {
if x != nil {
return x.PrimaryMaster
}
return ""
}
var File_worker_proto protoreflect.FileDescriptor
const file_worker_proto_rawDesc = "" +
@@ -3525,9 +3623,15 @@ const file_worker_proto_rawDesc = "" +
"\rTaskStateFile\x122\n" +
"\x04task\x18\x01 \x01(\v2\x1e.worker_pb.MaintenanceTaskDataR\x04task\x12!\n" +
"\flast_updated\x18\x02 \x01(\x03R\vlastUpdated\x12#\n" +
"\radmin_version\x18\x03 \x01(\tR\fadminVersion2V\n" +
"\radmin_version\x18\x03 \x01(\tR\fadminVersion\"8\n" +
"\x19GetMasterAddressesRequest\x12\x1b\n" +
"\tworker_id\x18\x01 \x01(\tR\bworkerId\"n\n" +
"\x1aGetMasterAddressesResponse\x12)\n" +
"\x10master_addresses\x18\x01 \x03(\tR\x0fmasterAddresses\x12%\n" +
"\x0eprimary_master\x18\x02 \x01(\tR\rprimaryMaster2\xb9\x01\n" +
"\rWorkerService\x12E\n" +
"\fWorkerStream\x12\x18.worker_pb.WorkerMessage\x1a\x17.worker_pb.AdminMessage(\x010\x01B2Z0github.com/seaweedfs/seaweedfs/weed/pb/worker_pbb\x06proto3"
"\fWorkerStream\x12\x18.worker_pb.WorkerMessage\x1a\x17.worker_pb.AdminMessage(\x010\x01\x12a\n" +
"\x12GetMasterAddresses\x12$.worker_pb.GetMasterAddressesRequest\x1a%.worker_pb.GetMasterAddressesResponseB2Z0github.com/seaweedfs/seaweedfs/weed/pb/worker_pbb\x06proto3"
var (
file_worker_proto_rawDescOnce sync.Once
@@ -3541,51 +3645,53 @@ func file_worker_proto_rawDescGZIP() []byte {
return file_worker_proto_rawDescData
}
var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 43)
var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 45)
var file_worker_proto_goTypes = []any{
(*WorkerMessage)(nil), // 0: worker_pb.WorkerMessage
(*AdminMessage)(nil), // 1: worker_pb.AdminMessage
(*WorkerRegistration)(nil), // 2: worker_pb.WorkerRegistration
(*RegistrationResponse)(nil), // 3: worker_pb.RegistrationResponse
(*WorkerHeartbeat)(nil), // 4: worker_pb.WorkerHeartbeat
(*HeartbeatResponse)(nil), // 5: worker_pb.HeartbeatResponse
(*TaskRequest)(nil), // 6: worker_pb.TaskRequest
(*TaskAssignment)(nil), // 7: worker_pb.TaskAssignment
(*TaskParams)(nil), // 8: worker_pb.TaskParams
(*VacuumTaskParams)(nil), // 9: worker_pb.VacuumTaskParams
(*ErasureCodingTaskParams)(nil), // 10: worker_pb.ErasureCodingTaskParams
(*TaskSource)(nil), // 11: worker_pb.TaskSource
(*TaskTarget)(nil), // 12: worker_pb.TaskTarget
(*BalanceTaskParams)(nil), // 13: worker_pb.BalanceTaskParams
(*ReplicationTaskParams)(nil), // 14: worker_pb.ReplicationTaskParams
(*TaskUpdate)(nil), // 15: worker_pb.TaskUpdate
(*TaskComplete)(nil), // 16: worker_pb.TaskComplete
(*TaskCancellation)(nil), // 17: worker_pb.TaskCancellation
(*WorkerShutdown)(nil), // 18: worker_pb.WorkerShutdown
(*AdminShutdown)(nil), // 19: worker_pb.AdminShutdown
(*TaskLogRequest)(nil), // 20: worker_pb.TaskLogRequest
(*TaskLogResponse)(nil), // 21: worker_pb.TaskLogResponse
(*TaskLogMetadata)(nil), // 22: worker_pb.TaskLogMetadata
(*TaskLogEntry)(nil), // 23: worker_pb.TaskLogEntry
(*MaintenanceConfig)(nil), // 24: worker_pb.MaintenanceConfig
(*MaintenancePolicy)(nil), // 25: worker_pb.MaintenancePolicy
(*TaskPolicy)(nil), // 26: worker_pb.TaskPolicy
(*ErasureCodingTaskConfig)(nil), // 27: worker_pb.ErasureCodingTaskConfig
(*EcVacuumTaskConfig)(nil), // 28: worker_pb.EcVacuumTaskConfig
(*MaintenanceTaskData)(nil), // 29: worker_pb.MaintenanceTaskData
(*TaskAssignmentRecord)(nil), // 30: worker_pb.TaskAssignmentRecord
(*TaskCreationMetrics)(nil), // 31: worker_pb.TaskCreationMetrics
(*VolumeHealthMetrics)(nil), // 32: worker_pb.VolumeHealthMetrics
(*TaskStateFile)(nil), // 33: worker_pb.TaskStateFile
nil, // 34: worker_pb.WorkerRegistration.MetadataEntry
nil, // 35: worker_pb.TaskAssignment.MetadataEntry
nil, // 36: worker_pb.TaskUpdate.MetadataEntry
nil, // 37: worker_pb.TaskComplete.ResultMetadataEntry
nil, // 38: worker_pb.TaskLogMetadata.CustomDataEntry
nil, // 39: worker_pb.TaskLogEntry.FieldsEntry
nil, // 40: worker_pb.MaintenancePolicy.TaskPoliciesEntry
nil, // 41: worker_pb.MaintenanceTaskData.TagsEntry
nil, // 42: worker_pb.TaskCreationMetrics.AdditionalDataEntry
(*WorkerMessage)(nil), // 0: worker_pb.WorkerMessage
(*AdminMessage)(nil), // 1: worker_pb.AdminMessage
(*WorkerRegistration)(nil), // 2: worker_pb.WorkerRegistration
(*RegistrationResponse)(nil), // 3: worker_pb.RegistrationResponse
(*WorkerHeartbeat)(nil), // 4: worker_pb.WorkerHeartbeat
(*HeartbeatResponse)(nil), // 5: worker_pb.HeartbeatResponse
(*TaskRequest)(nil), // 6: worker_pb.TaskRequest
(*TaskAssignment)(nil), // 7: worker_pb.TaskAssignment
(*TaskParams)(nil), // 8: worker_pb.TaskParams
(*VacuumTaskParams)(nil), // 9: worker_pb.VacuumTaskParams
(*ErasureCodingTaskParams)(nil), // 10: worker_pb.ErasureCodingTaskParams
(*TaskSource)(nil), // 11: worker_pb.TaskSource
(*TaskTarget)(nil), // 12: worker_pb.TaskTarget
(*BalanceTaskParams)(nil), // 13: worker_pb.BalanceTaskParams
(*ReplicationTaskParams)(nil), // 14: worker_pb.ReplicationTaskParams
(*TaskUpdate)(nil), // 15: worker_pb.TaskUpdate
(*TaskComplete)(nil), // 16: worker_pb.TaskComplete
(*TaskCancellation)(nil), // 17: worker_pb.TaskCancellation
(*WorkerShutdown)(nil), // 18: worker_pb.WorkerShutdown
(*AdminShutdown)(nil), // 19: worker_pb.AdminShutdown
(*TaskLogRequest)(nil), // 20: worker_pb.TaskLogRequest
(*TaskLogResponse)(nil), // 21: worker_pb.TaskLogResponse
(*TaskLogMetadata)(nil), // 22: worker_pb.TaskLogMetadata
(*TaskLogEntry)(nil), // 23: worker_pb.TaskLogEntry
(*MaintenanceConfig)(nil), // 24: worker_pb.MaintenanceConfig
(*MaintenancePolicy)(nil), // 25: worker_pb.MaintenancePolicy
(*TaskPolicy)(nil), // 26: worker_pb.TaskPolicy
(*ErasureCodingTaskConfig)(nil), // 27: worker_pb.ErasureCodingTaskConfig
(*EcVacuumTaskConfig)(nil), // 28: worker_pb.EcVacuumTaskConfig
(*MaintenanceTaskData)(nil), // 29: worker_pb.MaintenanceTaskData
(*TaskAssignmentRecord)(nil), // 30: worker_pb.TaskAssignmentRecord
(*TaskCreationMetrics)(nil), // 31: worker_pb.TaskCreationMetrics
(*VolumeHealthMetrics)(nil), // 32: worker_pb.VolumeHealthMetrics
(*TaskStateFile)(nil), // 33: worker_pb.TaskStateFile
(*GetMasterAddressesRequest)(nil), // 34: worker_pb.GetMasterAddressesRequest
(*GetMasterAddressesResponse)(nil), // 35: worker_pb.GetMasterAddressesResponse
nil, // 36: worker_pb.WorkerRegistration.MetadataEntry
nil, // 37: worker_pb.TaskAssignment.MetadataEntry
nil, // 38: worker_pb.TaskUpdate.MetadataEntry
nil, // 39: worker_pb.TaskComplete.ResultMetadataEntry
nil, // 40: worker_pb.TaskLogMetadata.CustomDataEntry
nil, // 41: worker_pb.TaskLogEntry.FieldsEntry
nil, // 42: worker_pb.MaintenancePolicy.TaskPoliciesEntry
nil, // 43: worker_pb.MaintenanceTaskData.TagsEntry
nil, // 44: worker_pb.TaskCreationMetrics.AdditionalDataEntry
}
var file_worker_proto_depIdxs = []int32{
2, // 0: worker_pb.WorkerMessage.registration:type_name -> worker_pb.WorkerRegistration
@@ -3601,37 +3707,39 @@ var file_worker_proto_depIdxs = []int32{
17, // 10: worker_pb.AdminMessage.task_cancellation:type_name -> worker_pb.TaskCancellation
19, // 11: worker_pb.AdminMessage.admin_shutdown:type_name -> worker_pb.AdminShutdown
20, // 12: worker_pb.AdminMessage.task_log_request:type_name -> worker_pb.TaskLogRequest
34, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry
36, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry
8, // 14: worker_pb.TaskAssignment.params:type_name -> worker_pb.TaskParams
35, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry
37, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry
11, // 16: worker_pb.TaskParams.sources:type_name -> worker_pb.TaskSource
12, // 17: worker_pb.TaskParams.targets:type_name -> worker_pb.TaskTarget
9, // 18: worker_pb.TaskParams.vacuum_params:type_name -> worker_pb.VacuumTaskParams
10, // 19: worker_pb.TaskParams.erasure_coding_params:type_name -> worker_pb.ErasureCodingTaskParams
13, // 20: worker_pb.TaskParams.balance_params:type_name -> worker_pb.BalanceTaskParams
14, // 21: worker_pb.TaskParams.replication_params:type_name -> worker_pb.ReplicationTaskParams
36, // 22: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry
37, // 23: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry
38, // 22: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry
39, // 23: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry
22, // 24: worker_pb.TaskLogResponse.metadata:type_name -> worker_pb.TaskLogMetadata
23, // 25: worker_pb.TaskLogResponse.log_entries:type_name -> worker_pb.TaskLogEntry
38, // 26: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry
39, // 27: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry
40, // 26: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry
41, // 27: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry
25, // 28: worker_pb.MaintenanceConfig.policy:type_name -> worker_pb.MaintenancePolicy
40, // 29: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry
42, // 29: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry
27, // 30: worker_pb.TaskPolicy.erasure_coding_config:type_name -> worker_pb.ErasureCodingTaskConfig
28, // 31: worker_pb.TaskPolicy.ec_vacuum_config:type_name -> worker_pb.EcVacuumTaskConfig
8, // 32: worker_pb.MaintenanceTaskData.typed_params:type_name -> worker_pb.TaskParams
30, // 33: worker_pb.MaintenanceTaskData.assignment_history:type_name -> worker_pb.TaskAssignmentRecord
41, // 34: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry
43, // 34: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry
31, // 35: worker_pb.MaintenanceTaskData.creation_metrics:type_name -> worker_pb.TaskCreationMetrics
32, // 36: worker_pb.TaskCreationMetrics.volume_metrics:type_name -> worker_pb.VolumeHealthMetrics
42, // 37: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry
44, // 37: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry
29, // 38: worker_pb.TaskStateFile.task:type_name -> worker_pb.MaintenanceTaskData
26, // 39: worker_pb.MaintenancePolicy.TaskPoliciesEntry.value:type_name -> worker_pb.TaskPolicy
0, // 40: worker_pb.WorkerService.WorkerStream:input_type -> worker_pb.WorkerMessage
1, // 41: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage
41, // [41:42] is the sub-list for method output_type
40, // [40:41] is the sub-list for method input_type
34, // 41: worker_pb.WorkerService.GetMasterAddresses:input_type -> worker_pb.GetMasterAddressesRequest
1, // 42: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage
35, // 43: worker_pb.WorkerService.GetMasterAddresses:output_type -> worker_pb.GetMasterAddressesResponse
42, // [42:44] is the sub-list for method output_type
40, // [40:42] is the sub-list for method input_type
40, // [40:40] is the sub-list for extension type_name
40, // [40:40] is the sub-list for extension extendee
0, // [0:40] is the sub-list for field type_name
@@ -3675,7 +3783,7 @@ func file_worker_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_worker_proto_rawDesc), len(file_worker_proto_rawDesc)),
NumEnums: 0,
NumMessages: 43,
NumMessages: 45,
NumExtensions: 0,
NumServices: 1,
},

View File

@@ -19,7 +19,8 @@ import (
const _ = grpc.SupportPackageIsVersion9
const (
WorkerService_WorkerStream_FullMethodName = "/worker_pb.WorkerService/WorkerStream"
WorkerService_WorkerStream_FullMethodName = "/worker_pb.WorkerService/WorkerStream"
WorkerService_GetMasterAddresses_FullMethodName = "/worker_pb.WorkerService/GetMasterAddresses"
)
// WorkerServiceClient is the client API for WorkerService service.
@@ -30,6 +31,8 @@ const (
type WorkerServiceClient interface {
// WorkerStream maintains a bidirectional stream for worker communication
WorkerStream(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[WorkerMessage, AdminMessage], error)
// GetMasterAddresses returns master server addresses for worker tasks
GetMasterAddresses(ctx context.Context, in *GetMasterAddressesRequest, opts ...grpc.CallOption) (*GetMasterAddressesResponse, error)
}
type workerServiceClient struct {
@@ -53,6 +56,16 @@ func (c *workerServiceClient) WorkerStream(ctx context.Context, opts ...grpc.Cal
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type WorkerService_WorkerStreamClient = grpc.BidiStreamingClient[WorkerMessage, AdminMessage]
func (c *workerServiceClient) GetMasterAddresses(ctx context.Context, in *GetMasterAddressesRequest, opts ...grpc.CallOption) (*GetMasterAddressesResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(GetMasterAddressesResponse)
err := c.cc.Invoke(ctx, WorkerService_GetMasterAddresses_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// WorkerServiceServer is the server API for WorkerService service.
// All implementations must embed UnimplementedWorkerServiceServer
// for forward compatibility.
@@ -61,6 +74,8 @@ type WorkerService_WorkerStreamClient = grpc.BidiStreamingClient[WorkerMessage,
type WorkerServiceServer interface {
// WorkerStream maintains a bidirectional stream for worker communication
WorkerStream(grpc.BidiStreamingServer[WorkerMessage, AdminMessage]) error
// GetMasterAddresses returns master server addresses for worker tasks
GetMasterAddresses(context.Context, *GetMasterAddressesRequest) (*GetMasterAddressesResponse, error)
mustEmbedUnimplementedWorkerServiceServer()
}
@@ -74,6 +89,9 @@ type UnimplementedWorkerServiceServer struct{}
func (UnimplementedWorkerServiceServer) WorkerStream(grpc.BidiStreamingServer[WorkerMessage, AdminMessage]) error {
return status.Errorf(codes.Unimplemented, "method WorkerStream not implemented")
}
func (UnimplementedWorkerServiceServer) GetMasterAddresses(context.Context, *GetMasterAddressesRequest) (*GetMasterAddressesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetMasterAddresses not implemented")
}
func (UnimplementedWorkerServiceServer) mustEmbedUnimplementedWorkerServiceServer() {}
func (UnimplementedWorkerServiceServer) testEmbeddedByValue() {}
@@ -102,13 +120,36 @@ func _WorkerService_WorkerStream_Handler(srv interface{}, stream grpc.ServerStre
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type WorkerService_WorkerStreamServer = grpc.BidiStreamingServer[WorkerMessage, AdminMessage]
func _WorkerService_GetMasterAddresses_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetMasterAddressesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(WorkerServiceServer).GetMasterAddresses(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: WorkerService_GetMasterAddresses_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WorkerServiceServer).GetMasterAddresses(ctx, req.(*GetMasterAddressesRequest))
}
return interceptor(ctx, in, info, handler)
}
// WorkerService_ServiceDesc is the grpc.ServiceDesc for WorkerService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var WorkerService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "worker_pb.WorkerService",
HandlerType: (*WorkerServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Methods: []grpc.MethodDesc{
{
MethodName: "GetMasterAddresses",
Handler: _WorkerService_GetMasterAddresses_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "WorkerStream",

View File

@@ -25,6 +25,12 @@ import (
"google.golang.org/grpc"
)
// Compile-time interface compliance checks
var (
_ types.TaskWithGrpcDial = (*EcVacuumTask)(nil)
_ types.TaskWithAdminAddress = (*EcVacuumTask)(nil)
)
// EcVacuumTask represents an EC vacuum task that collects, decodes, and re-encodes EC volumes
type EcVacuumTask struct {
*base.BaseTask
@@ -33,6 +39,7 @@ type EcVacuumTask struct {
sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits
tempDir string
grpcDialOption grpc.DialOption
adminAddress string // admin server address for API calls
masterAddress pb.ServerAddress // master server address for activation RPC
cleanupGracePeriod time.Duration // grace period before cleaning up old generation (1 minute default)
topologyTaskID string // links to ActiveTopology task for capacity tracking
@@ -1172,6 +1179,11 @@ func (t *EcVacuumTask) SetGrpcDialOption(option grpc.DialOption) {
t.grpcDialOption = option
}
// SetAdminAddress sets the admin server address for API calls
func (t *EcVacuumTask) SetAdminAddress(address string) {
t.adminAddress = address
}
// SetMasterAddress sets the master server address for generation activation
func (t *EcVacuumTask) SetMasterAddress(address pb.ServerAddress) {
t.masterAddress = address
@@ -1181,3 +1193,56 @@ func (t *EcVacuumTask) SetMasterAddress(address pb.ServerAddress) {
func (t *EcVacuumTask) SetCleanupGracePeriod(period time.Duration) {
t.cleanupGracePeriod = period
}
// fetchMasterAddressFromAdmin gets master addresses from admin server
func (t *EcVacuumTask) fetchMasterAddressFromAdmin() error {
// Use admin address provided by worker
if t.adminAddress == "" {
return fmt.Errorf("admin server address not provided by worker - cannot fetch master addresses")
}
// Convert admin HTTP address to gRPC address (HTTP port + 10000)
grpcAddress := pb.ServerToGrpcAddress(t.adminAddress)
t.LogInfo("Fetching master address from admin server", map[string]interface{}{
"admin_address": grpcAddress,
})
// Create gRPC connection to admin server
conn, err := grpc.NewClient(grpcAddress, t.grpcDialOption)
if err != nil {
return fmt.Errorf("failed to connect to admin server at %s: %w", grpcAddress, err)
}
defer conn.Close()
// Create worker service client
client := worker_pb.NewWorkerServiceClient(conn)
// Call GetMasterAddresses API
resp, err := client.GetMasterAddresses(context.Background(), &worker_pb.GetMasterAddressesRequest{
WorkerId: t.ID(), // Use task ID as worker ID for logging
})
if err != nil {
return fmt.Errorf("failed to get master addresses from admin: %w", err)
}
if len(resp.MasterAddresses) == 0 {
return fmt.Errorf("no master addresses returned from admin server")
}
// Use primary master if available, otherwise first address
masterAddress := resp.PrimaryMaster
if masterAddress == "" && len(resp.MasterAddresses) > 0 {
masterAddress = resp.MasterAddresses[0]
}
t.masterAddress = pb.ServerAddress(masterAddress)
t.LogInfo("Successfully obtained master address from admin server", map[string]interface{}{
"master_address": masterAddress,
"available_masters": resp.MasterAddresses,
"primary_master": resp.PrimaryMaster,
})
return nil
}

View File

@@ -11,8 +11,11 @@ import (
// performSafetyChecks performs comprehensive safety verification before cleanup
func (t *EcVacuumTask) performSafetyChecks() error {
// Get master address from admin server if not already set
if t.masterAddress == "" {
return fmt.Errorf("CRITICAL: cannot perform safety checks - master address not set")
if err := t.fetchMasterAddressFromAdmin(); err != nil {
return fmt.Errorf("CRITICAL: cannot perform safety checks - failed to get master address: %w", err)
}
}
// Safety Check 1: Verify master connectivity and volume existence

View File

@@ -9,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"google.golang.org/grpc"
)
// Task defines the core task interface that all tasks must implement
@@ -37,6 +38,18 @@ type TaskWithLogging interface {
Logger
}
// TaskWithGrpcDial defines tasks that need gRPC dial options
type TaskWithGrpcDial interface {
Task
SetGrpcDialOption(option grpc.DialOption)
}
// TaskWithAdminAddress defines tasks that need admin server address
type TaskWithAdminAddress interface {
Task
SetAdminAddress(address string)
}
// Logger defines standard logging interface
type Logger interface {
Info(msg string, args ...interface{})

View File

@@ -14,7 +14,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
// Import task packages to trigger their auto-registration
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/ec_vacuum"
@@ -448,11 +447,17 @@ func (w *Worker) executeTask(task *types.TaskInput) {
}
// Pass worker's gRPC dial option to task if it supports it
if grpcTask, ok := taskInstance.(interface{ SetGrpcDialOption(grpc.DialOption) }); ok {
if grpcTask, ok := taskInstance.(types.TaskWithGrpcDial); ok {
grpcTask.SetGrpcDialOption(w.config.GrpcDialOption)
glog.V(2).Infof("Set gRPC dial option for task %s", task.ID)
}
// Pass worker's admin server address to task if it supports it
if adminTask, ok := taskInstance.(types.TaskWithAdminAddress); ok {
adminTask.SetAdminAddress(w.config.AdminServer)
glog.V(2).Infof("Set admin server address for task %s", task.ID)
}
// Task execution uses the new unified Task interface
glog.V(2).Infof("Executing task %s in working directory: %s", task.ID, taskWorkingDir)