use filer address, not just grpc address

This commit is contained in:
chrislu
2025-10-03 13:34:48 -07:00
parent 31c4803830
commit e065bd6ccf
4 changed files with 133 additions and 13 deletions

View File

@@ -75,11 +75,8 @@ func (fds *FilerDiscoveryService) discoverFilersFromMaster(masterAddr pb.ServerA
var filers []pb.ServerAddress
for _, node := range resp.ClusterNodes {
glog.Infof("FILER DISCOVERY: Found filer HTTP address %s", node.Address)
// Convert HTTP address to gRPC address using pb.ServerAddress method
httpAddr := pb.ServerAddress(node.Address)
grpcAddr := httpAddr.ToGrpcAddress()
glog.Infof("FILER DISCOVERY: Converted to gRPC address %s", grpcAddr)
filers = append(filers, pb.ServerAddress(grpcAddr))
// Return HTTP address (lock client will convert to gRPC when needed)
filers = append(filers, pb.ServerAddress(node.Address))
}
glog.Infof("FILER DISCOVERY: Returning %d filers from master %s", len(filers), masterAddr)

View File

@@ -96,9 +96,8 @@ func NewCoordinatorRegistry(gatewayAddress string, masters []pb.ServerAddress, g
conn.Close()
if err == nil && len(resp.ClusterNodes) > 0 {
// Found a filer - use its gRPC address
httpAddr := pb.ServerAddress(resp.ClusterNodes[0].Address)
seedFiler = pb.ServerAddress(httpAddr.ToGrpcAddress())
// Found a filer - use its HTTP address (WithFilerClient will convert to gRPC automatically)
seedFiler = pb.ServerAddress(resp.ClusterNodes[0].Address)
glog.V(1).Infof("🔧 Using filer %s as seed for distributed locking (discovered from master %s)", seedFiler, master)
break
}

View File

@@ -118,7 +118,8 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
// Check if this version uses flexible format (v12+)
isFlexible := IsFlexibleVersion(1, apiVersion) // API key 1 = Fetch
// Topics count
// Topics count - write the actual number of topics in the request
// Kafka protocol: we MUST return all requested topics in the response (even with empty data)
topicsCount := len(fetchRequest.Topics)
if isFlexible {
// Flexible versions use compact array format (count + 1)
@@ -130,7 +131,8 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
}
// Process each requested topic
for _, topic := range fetchRequest.Topics {
for topicIdx, topic := range fetchRequest.Topics {
glog.Infof("FETCH: Processing topic %d/%d: %s (partitions: %d)", topicIdx+1, len(fetchRequest.Topics), topic.Name, len(topic.Partitions))
topicNameBytes := []byte(topic.Name)
// Topic name length and name
@@ -390,6 +392,98 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
Debug("Fetch v%d response constructed, size: %d bytes (flexible: %v)", apiVersion, len(response), isFlexible)
glog.Infof("FETCH RESPONSE SUMMARY: correlationID=%d topics=%d totalRecordBytes=%d totalResponseBytes=%d", correlationID, topicsCount, totalAppendedRecordBytes, len(response))
// CRITICAL BYTE ANALYSIS for offset 32 error
if len(response) > 36 {
fmt.Printf("\n🔍🔍🔍 CRITICAL: Analyzing bytes around offset 32:\n")
fmt.Printf(" Offset 28-31 (should be after first topic name): %02x %02x %02x %02x\n",
response[28], response[29], response[30], response[31])
fmt.Printf(" Offset 32-35 (ERROR LOCATION): %02x %02x %02x %02x\n",
response[32], response[33], response[34], response[35])
fmt.Printf(" Offset 36-39 (should be partition ID or next field): %02x %02x %02x %02x\n",
response[36], response[37], response[38], response[39])
// Decode as int32 (what partition count/ID would be)
val32at32 := binary.BigEndian.Uint32(response[32:36])
fmt.Printf(" Value at offset 32 as int32: %d (0x%08x)\n", val32at32, val32at32)
// Decode as int16 (what a string length would be)
val16at32 := binary.BigEndian.Uint16(response[32:34])
fmt.Printf(" Value at offset 32 as int16: %d (0x%04x)\n", val16at32, val16at32)
}
// HEX DUMP for debugging decode errors: dump first 100 bytes of response to diagnose "invalid length (off=32, len=36)" error
if len(response) > 0 && len(response) <= 200 {
// Full dump for small responses (likely empty/error responses)
fmt.Printf("\n🔍 FETCH RESPONSE HEX DUMP (FULL - %d bytes):\n", len(response))
for i := 0; i < len(response); i += 16 {
end := i + 16
if end > len(response) {
end = len(response)
}
fmt.Printf(" %04d: %02x\n", i, response[i:end])
}
// Decode structure for Fetch v7 (non-flexible)
if !isFlexible && len(response) >= 40 {
fmt.Printf(" Decoded structure:\n")
fmt.Printf(" [0-3] Throttle time ms: %d\n", int32(binary.BigEndian.Uint32(response[0:4])))
fmt.Printf(" [4-7] Num topics: %d\n", int32(binary.BigEndian.Uint32(response[4:8])))
if len(response) >= 12 {
topicNameLen := int32(binary.BigEndian.Uint16(response[8:10]))
fmt.Printf(" [8-9] Topic name len: %d\n", topicNameLen)
if topicNameLen > 0 && 10+topicNameLen <= int32(len(response)) {
fmt.Printf(" [10-%d] Topic name: %s\n", 10+topicNameLen-1, string(response[10:10+topicNameLen]))
partitionsOffset := 10 + topicNameLen
if int(partitionsOffset)+4 <= len(response) {
numPartitions := int32(binary.BigEndian.Uint32(response[partitionsOffset : partitionsOffset+4]))
fmt.Printf(" [%d-%d] Num partitions: %d\n", partitionsOffset, partitionsOffset+3, numPartitions)
// First partition data starts here
partDataOffset := partitionsOffset + 4
if int(partDataOffset)+32 <= len(response) {
fmt.Printf(" [%d] *** BYTE 32 area (ERROR LOCATION) ***\n", 32)
if 32 < len(response) {
fmt.Printf(" Byte[32] = 0x%02x (%d)\n", response[32], response[32])
}
if 33 < len(response) {
fmt.Printf(" Byte[33] = 0x%02x (%d)\n", response[33], response[33])
}
if 34 < len(response) {
fmt.Printf(" Byte[34] = 0x%02x (%d)\n", response[34], response[34])
}
if 35 < len(response) {
fmt.Printf(" Byte[35] = 0x%02x (%d)\n", response[35], response[35])
}
}
}
}
}
}
} else if len(response) > 200 {
// Partial dump for large responses
fmt.Printf("\n🔍 FETCH RESPONSE HEX DUMP (FIRST 64 bytes of %d total):\n", len(response))
dumpSize := 64
if len(response) < dumpSize {
dumpSize = len(response)
}
for i := 0; i < dumpSize; i += 16 {
end := i + 16
if end > dumpSize {
end = dumpSize
}
fmt.Printf(" %04d: %02x\n", i, response[i:end])
}
// Always show byte 32 area since that's where the error occurs
if len(response) > 36 {
fmt.Printf(" *** BYTE 32 area (ERROR LOCATION) ***\n")
fmt.Printf(" 0032: %02x\n", response[32:36])
}
}
glog.Infof("✅ FETCH RESPONSE COMPLETE: correlationID=%d version=%d size=%d bytes", correlationID, apiVersion, len(response))
return response, nil
}

View File

@@ -119,6 +119,11 @@ func (h *Handler) handleFindCoordinatorV0(correlationID uint32, requestBody []by
binary.BigEndian.PutUint32(portBytes, uint32(coordinatorPort))
response = append(response, portBytes...)
// Log the complete response for debugging
Debug("FindCoordinator v0 RESPONSE BYTES (len=%d): correlationID=%d errorCode=0 nodeID=%d host=%s port=%d",
len(response), correlationID, nodeID, coordinatorHost, coordinatorPort)
Debug("FindCoordinator v0 RESPONSE HEX: %x", response)
return response, nil
}
@@ -215,6 +220,11 @@ func (h *Handler) handleFindCoordinatorV2(correlationID uint32, requestBody []by
binary.BigEndian.PutUint32(portBytes, uint32(coordinatorPort))
response = append(response, portBytes...)
// Log the complete response for debugging
Debug("FindCoordinator v2 RESPONSE BYTES (len=%d): correlationID=%d errorCode=0 nodeID=%d host=%s port=%d",
len(response), correlationID, nodeID, coordinatorHost, coordinatorPort)
Debug("FindCoordinator v2 RESPONSE HEX: %x", response)
return response, nil
}
@@ -237,24 +247,42 @@ func (h *Handler) findCoordinatorForGroup(groupID string) (host string, port int
}
// If this gateway is the leader, handle the assignment directly
if registry.IsLeader() {
isLeader := registry.IsLeader()
Debug("findCoordinatorForGroup: registry.IsLeader() = %v for group %s", isLeader, groupID)
if isLeader {
return h.handleCoordinatorAssignmentAsLeader(groupID, registry)
}
// If not the leader, contact the leader to get/assign coordinator
// But first check if we can quickly become the leader or if there's already a leader
if leader := registry.GetLeaderAddress(); leader != "" {
leader := registry.GetLeaderAddress()
Debug("GetLeaderAddress returned: '%s' (empty=%v) for group %s", leader, leader == "", groupID)
if leader != "" {
Debug("Found existing leader %s for group %s", leader, groupID)
// If the leader is this gateway, handle assignment directly
if leader == h.GetGatewayAddress() {
return h.handleCoordinatorAssignmentAsLeader(groupID, registry)
}
return h.requestCoordinatorFromLeader(groupID, registry)
}
return h.requestCoordinatorFromLeader(groupID, registry)
// No leader exists yet - use current gateway as coordinator immediately
// to avoid 10-second timeout that causes client disconnections
Debug("No leader elected yet, using current gateway as coordinator for group %s", groupID)
gatewayAddr := h.GetGatewayAddress()
var parseErr error
host, port, parseErr = h.parseGatewayAddress(gatewayAddr)
if parseErr != nil {
Debug("Failed to parse gateway address %s: %v", gatewayAddr, parseErr)
return "localhost", 9092, 1, nil
}
nodeID = 1
return host, port, nodeID, nil
}
// handleCoordinatorAssignmentAsLeader handles coordinator assignment when this gateway is the leader
func (h *Handler) handleCoordinatorAssignmentAsLeader(groupID string, registry CoordinatorRegistryInterface) (host string, port int, nodeID int32, err error) {
Debug("handleCoordinatorAssignmentAsLeader: entered for group %s", groupID)
// Check if coordinator already exists
if assignment, err := registry.GetCoordinator(groupID); err == nil && assignment != nil {
Debug("Found existing coordinator %s (node %d) for group %s", assignment.CoordinatorAddr, assignment.CoordinatorNodeID, groupID)
@@ -263,7 +291,9 @@ func (h *Handler) handleCoordinatorAssignmentAsLeader(groupID string, registry C
// No coordinator exists, assign the requesting gateway (first-come-first-serve)
currentGateway := h.GetGatewayAddress()
Debug("handleCoordinatorAssignmentAsLeader: About to call AssignCoordinator for group %s", groupID)
assignment, err := registry.AssignCoordinator(groupID, currentGateway)
Debug("handleCoordinatorAssignmentAsLeader: AssignCoordinator returned err=%v for group %s", err, groupID)
if err != nil {
Debug("Failed to assign coordinator for group %s: %v", groupID, err)
// Fallback to current gateway