From 193d6ed64eee595fd678c37b0cf17d6b825dbe65 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 6 Oct 2025 16:40:16 -0700 Subject: [PATCH] api key --- weed/mq/kafka/protocol/flexible_versions.go | 28 +-- weed/mq/kafka/protocol/handler.go | 244 +++++++++++--------- weed/mq/kafka/protocol/metrics.go | 50 ++-- 3 files changed, 175 insertions(+), 147 deletions(-) diff --git a/weed/mq/kafka/protocol/flexible_versions.go b/weed/mq/kafka/protocol/flexible_versions.go index f58d66088..5984f18ed 100644 --- a/weed/mq/kafka/protocol/flexible_versions.go +++ b/weed/mq/kafka/protocol/flexible_versions.go @@ -196,32 +196,32 @@ func DecodeTaggedFields(data []byte) (*TaggedFields, int, error) { // IsFlexibleVersion determines if an API version uses flexible versions // This is API-specific and based on when each API adopted flexible versions func IsFlexibleVersion(apiKey, apiVersion uint16) bool { - switch apiKey { - case 18: // ApiVersions + switch APIKey(apiKey) { + case APIKeyApiVersions: return apiVersion >= 3 - case 3: // Metadata + case APIKeyMetadata: return apiVersion >= 9 - case 1: // Fetch + case APIKeyFetch: return apiVersion >= 12 - case 0: // Produce + case APIKeyProduce: return apiVersion >= 9 - case 11: // JoinGroup + case APIKeyJoinGroup: return apiVersion >= 6 - case 14: // SyncGroup + case APIKeySyncGroup: return apiVersion >= 4 - case 8: // OffsetCommit + case APIKeyOffsetCommit: return apiVersion >= 8 - case 9: // OffsetFetch + case APIKeyOffsetFetch: return apiVersion >= 6 - case 10: // FindCoordinator + case APIKeyFindCoordinator: return apiVersion >= 3 - case 12: // Heartbeat + case APIKeyHeartbeat: return apiVersion >= 4 - case 13: // LeaveGroup + case APIKeyLeaveGroup: return apiVersion >= 4 - case 19: // CreateTopics + case APIKeyCreateTopics: return apiVersion >= 2 - case 20: // DeleteTopics + case APIKeyDeleteTopics: return apiVersion >= 4 default: return false diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index f8d3d2d48..a69e5fd1f 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -90,6 +90,32 @@ const ( DefaultKafkaNamespace = "kafka" ) +// APIKey represents a Kafka API key type for better type safety +type APIKey uint16 + +// Kafka API Keys +const ( + APIKeyProduce APIKey = 0 + APIKeyFetch APIKey = 1 + APIKeyListOffsets APIKey = 2 + APIKeyMetadata APIKey = 3 + APIKeyOffsetCommit APIKey = 8 + APIKeyOffsetFetch APIKey = 9 + APIKeyFindCoordinator APIKey = 10 + APIKeyJoinGroup APIKey = 11 + APIKeyHeartbeat APIKey = 12 + APIKeyLeaveGroup APIKey = 13 + APIKeySyncGroup APIKey = 14 + APIKeyDescribeGroups APIKey = 15 + APIKeyListGroups APIKey = 16 + APIKeyApiVersions APIKey = 18 + APIKeyCreateTopics APIKey = 19 + APIKeyDeleteTopics APIKey = 20 + APIKeyInitProducerId APIKey = 22 + APIKeyDescribeConfigs APIKey = 32 + APIKeyDescribeCluster APIKey = 60 +) + // SeaweedMQHandlerInterface defines the interface for SeaweedMQ integration type SeaweedMQHandlerInterface interface { TopicExists(topic string) bool @@ -395,10 +421,10 @@ func (h *Handler) GetCoordinatorRegistry() CoordinatorRegistryInterface { // isDataPlaneAPI returns true if the API key is a data plane operation (Fetch, Produce) // Data plane operations can be slow and may block on I/O func isDataPlaneAPI(apiKey uint16) bool { - switch apiKey { - case 0: // Produce + switch APIKey(apiKey) { + case APIKeyProduce: return true - case 1: // Fetch + case APIKeyFetch: return true default: return false @@ -700,12 +726,12 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) - // Debug("Parsed header - API Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(apiKey), apiVersion, correlationID) + // Debug("Parsed header - API Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID) // Validate API version against what we support Debug("VALIDATING API VERSION: Key=%d, Version=%d", apiKey, apiVersion) if err := h.validateAPIVersion(apiKey, apiVersion); err != nil { - glog.Errorf("API VERSION VALIDATION FAILED: Key=%d (%s), Version=%d, error=%v", apiKey, getAPIName(apiKey), apiVersion, err) + glog.Errorf("API VERSION VALIDATION FAILED: Key=%d (%s), Version=%d, error=%v", apiKey, getAPIName(APIKey(apiKey)), apiVersion, err) // Return proper Kafka error response for unsupported version response, writeErr := h.buildUnsupportedVersionResponse(correlationID, apiKey, apiVersion) if writeErr != nil { @@ -730,11 +756,11 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // CRITICAL DEBUG: Log that validation passed glog.V(4).Infof("API VERSION VALIDATION PASSED: Key=%d (%s), Version=%d, Correlation=%d - proceeding to header parsing", - apiKey, getAPIName(apiKey), apiVersion, correlationID) + apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID) // Extract request body - special handling for ApiVersions requests var requestBody []byte - if apiKey == 18 && apiVersion >= 3 { + if apiKey == uint16(APIKeyApiVersions) && apiVersion >= 3 { // ApiVersions v3+ uses client_software_name + client_software_version, not client_id bodyOffset := 8 // Skip api_key(2) + api_version(2) + correlation_id(4) @@ -772,7 +798,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { if parseErr != nil { // CRITICAL: Log the parsing error for debugging glog.Errorf("REQUEST HEADER PARSING FAILED: API=%d (%s) v%d, correlation=%d, error=%v, msgLen=%d", - apiKey, getAPIName(apiKey), apiVersion, correlationID, parseErr, len(messageBuf)) + apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID, parseErr, len(messageBuf)) // Fall back to basic header parsing if flexible version parsing fails Debug("Flexible header parsing failed, using basic parsing: %v", parseErr) @@ -793,7 +819,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { bodyOffset += int(clientIDLen) } requestBody = messageBuf[bodyOffset:] - glog.Infof("FALLBACK PARSING SUCCESS: API=%d (%s) v%d, bodyLen=%d", apiKey, getAPIName(apiKey), apiVersion, len(requestBody)) + glog.Infof("FALLBACK PARSING SUCCESS: API=%d (%s) v%d, bodyLen=%d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, len(requestBody)) } else { // Use the successfully parsed request body requestBody = parsedRequestBody @@ -828,7 +854,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // CRITICAL: Route request to appropriate processor // Control plane: Fast, never blocks (Metadata, Heartbeat, etc.) // Data plane: Can be slow (Fetch, Produce) - Debug("API REQUEST - Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(apiKey), apiVersion, correlationID) + Debug("API REQUEST - Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID) req := &kafkaRequest{ correlationID: correlationID, @@ -866,7 +892,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { // Record request start time for latency tracking requestStart := time.Now() - apiName := getAPIName(req.apiKey) + apiName := getAPIName(APIKey(req.apiKey)) Debug("PROCESSING API REQUEST: Key=%d (%s), Version=%d, Correlation=%d", req.apiKey, apiName, req.apiVersion, req.correlationID) @@ -874,76 +900,76 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { var response []byte var err error - switch req.apiKey { - case 18: // ApiVersions + switch APIKey(req.apiKey) { + case APIKeyApiVersions: Debug("-> ApiVersions v%d", req.apiVersion) response, err = h.handleApiVersions(req.correlationID, req.apiVersion) - case 3: // Metadata + case APIKeyMetadata: Debug("-> Metadata v%d", req.apiVersion) response, err = h.handleMetadata(req.correlationID, req.apiVersion, req.requestBody) - case 2: // ListOffsets + case APIKeyListOffsets: response, err = h.handleListOffsets(req.correlationID, req.apiVersion, req.requestBody) - case 19: // CreateTopics + case APIKeyCreateTopics: response, err = h.handleCreateTopics(req.correlationID, req.apiVersion, req.requestBody) - case 20: // DeleteTopics + case APIKeyDeleteTopics: response, err = h.handleDeleteTopics(req.correlationID, req.requestBody) - case 0: // Produce + case APIKeyProduce: Debug("-> Produce v%d", req.apiVersion) response, err = h.handleProduce(req.correlationID, req.apiVersion, req.requestBody) - case 1: // Fetch + case APIKeyFetch: Debug("-> Fetch v%d", req.apiVersion) response, err = h.handleFetch(req.ctx, req.correlationID, req.apiVersion, req.requestBody) - case 11: // JoinGroup + case APIKeyJoinGroup: Debug("-> JoinGroup v%d", req.apiVersion) response, err = h.handleJoinGroup(req.correlationID, req.apiVersion, req.requestBody) - case 14: // SyncGroup + case APIKeySyncGroup: Debug("-> SyncGroup v%d", req.apiVersion) response, err = h.handleSyncGroup(req.correlationID, req.apiVersion, req.requestBody) - case 8: // OffsetCommit + case APIKeyOffsetCommit: Debug("-> OffsetCommit") response, err = h.handleOffsetCommit(req.correlationID, req.apiVersion, req.requestBody) - case 9: // OffsetFetch + case APIKeyOffsetFetch: Debug("-> OffsetFetch v%d", req.apiVersion) response, err = h.handleOffsetFetch(req.correlationID, req.apiVersion, req.requestBody) - case 10: // FindCoordinator + case APIKeyFindCoordinator: Debug("-> FindCoordinator v%d", req.apiVersion) response, err = h.handleFindCoordinator(req.correlationID, req.apiVersion, req.requestBody) - case 12: // Heartbeat + case APIKeyHeartbeat: Debug("-> Heartbeat v%d", req.apiVersion) response, err = h.handleHeartbeat(req.correlationID, req.apiVersion, req.requestBody) - case 13: // LeaveGroup + case APIKeyLeaveGroup: response, err = h.handleLeaveGroup(req.correlationID, req.apiVersion, req.requestBody) - case 15: // DescribeGroups + case APIKeyDescribeGroups: Debug("DescribeGroups request received, correlation: %d, version: %d", req.correlationID, req.apiVersion) response, err = h.handleDescribeGroups(req.correlationID, req.apiVersion, req.requestBody) - case 16: // ListGroups + case APIKeyListGroups: Debug("ListGroups request received, correlation: %d, version: %d", req.correlationID, req.apiVersion) response, err = h.handleListGroups(req.correlationID, req.apiVersion, req.requestBody) - case 32: // DescribeConfigs + case APIKeyDescribeConfigs: Debug("DescribeConfigs request received, correlation: %d, version: %d", req.correlationID, req.apiVersion) response, err = h.handleDescribeConfigs(req.correlationID, req.apiVersion, req.requestBody) - case 60: // DescribeCluster + case APIKeyDescribeCluster: Debug("-> DescribeCluster v%d", req.apiVersion) response, err = h.handleDescribeCluster(req.correlationID, req.apiVersion, req.requestBody) - case 22: // InitProducerId + case APIKeyInitProducerId: Debug("-> InitProducerId v%d", req.apiVersion) response, err = h.handleInitProducerId(req.correlationID, req.apiVersion, req.requestBody) @@ -965,32 +991,32 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { // ApiKeyInfo represents supported API key information type ApiKeyInfo struct { - ApiKey uint16 + ApiKey APIKey MinVersion uint16 MaxVersion uint16 } // SupportedApiKeys defines all supported API keys and their version ranges var SupportedApiKeys = []ApiKeyInfo{ - {18, 0, 4}, // ApiVersions - support up to v4 for Kafka 8.0.0 compatibility - {3, 0, 7}, // Metadata - support up to v7 - {0, 0, 7}, // Produce - {1, 0, 7}, // Fetch - {2, 0, 2}, // ListOffsets - {19, 0, 5}, // CreateTopics - {20, 0, 4}, // DeleteTopics - {10, 0, 3}, // FindCoordinator - v3+ supports flexible responses - {11, 0, 6}, // JoinGroup - {14, 0, 5}, // SyncGroup - {8, 0, 2}, // OffsetCommit - {9, 0, 5}, // OffsetFetch - {12, 0, 4}, // Heartbeat - {13, 0, 4}, // LeaveGroup - {15, 0, 5}, // DescribeGroups - {16, 0, 4}, // ListGroups - {32, 0, 4}, // DescribeConfigs - {22, 0, 4}, // InitProducerId - support up to v4 for transactional producers - {60, 0, 1}, // DescribeCluster - for AdminClient compatibility (KIP-919) + {APIKeyApiVersions, 0, 4}, // ApiVersions - support up to v4 for Kafka 8.0.0 compatibility + {APIKeyMetadata, 0, 7}, // Metadata - support up to v7 + {APIKeyProduce, 0, 7}, // Produce + {APIKeyFetch, 0, 7}, // Fetch + {APIKeyListOffsets, 0, 2}, // ListOffsets + {APIKeyCreateTopics, 0, 5}, // CreateTopics + {APIKeyDeleteTopics, 0, 4}, // DeleteTopics + {APIKeyFindCoordinator, 0, 3}, // FindCoordinator - v3+ supports flexible responses + {APIKeyJoinGroup, 0, 6}, // JoinGroup + {APIKeySyncGroup, 0, 5}, // SyncGroup + {APIKeyOffsetCommit, 0, 2}, // OffsetCommit + {APIKeyOffsetFetch, 0, 5}, // OffsetFetch + {APIKeyHeartbeat, 0, 4}, // Heartbeat + {APIKeyLeaveGroup, 0, 4}, // LeaveGroup + {APIKeyDescribeGroups, 0, 5}, // DescribeGroups + {APIKeyListGroups, 0, 4}, // ListGroups + {APIKeyDescribeConfigs, 0, 4}, // DescribeConfigs + {APIKeyInitProducerId, 0, 4}, // InitProducerId - support up to v4 for transactional producers + {APIKeyDescribeCluster, 0, 1}, // DescribeCluster - for AdminClient compatibility (KIP-919) } func (h *Handler) handleApiVersions(correlationID uint32, apiVersion uint16) ([]byte, error) { @@ -2657,29 +2683,29 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( // validateAPIVersion checks if we support the requested API version func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error { - supportedVersions := map[uint16][2]uint16{ - 18: {0, 4}, // ApiVersions: v0-v4 (Kafka 8.0.0 compatibility) - 3: {0, 7}, // Metadata: v0-v7 - 0: {0, 7}, // Produce: v0-v7 - 1: {0, 7}, // Fetch: v0-v7 - 2: {0, 2}, // ListOffsets: v0-v2 - 19: {0, 5}, // CreateTopics: v0-v5 (updated to match implementation) - 20: {0, 4}, // DeleteTopics: v0-v4 - 10: {0, 3}, // FindCoordinator: v0-v3 (v3+ uses flexible format) - 11: {0, 6}, // JoinGroup: cap to v6 (first flexible version) - 14: {0, 5}, // SyncGroup: v0-v5 - 8: {0, 2}, // OffsetCommit: v0-v2 - 9: {0, 5}, // OffsetFetch: v0-v5 (updated to match implementation) - 12: {0, 4}, // Heartbeat: v0-v4 - 13: {0, 4}, // LeaveGroup: v0-v4 - 15: {0, 5}, // DescribeGroups: v0-v5 - 16: {0, 4}, // ListGroups: v0-v4 - 32: {0, 4}, // DescribeConfigs: v0-v4 - 22: {0, 4}, // InitProducerId: v0-v4 - 60: {0, 1}, // DescribeCluster: v0-v1 (KIP-919, AdminClient compatibility) + supportedVersions := map[APIKey][2]uint16{ + APIKeyApiVersions: {0, 4}, // ApiVersions: v0-v4 (Kafka 8.0.0 compatibility) + APIKeyMetadata: {0, 7}, // Metadata: v0-v7 + APIKeyProduce: {0, 7}, // Produce: v0-v7 + APIKeyFetch: {0, 7}, // Fetch: v0-v7 + APIKeyListOffsets: {0, 2}, // ListOffsets: v0-v2 + APIKeyCreateTopics: {0, 5}, // CreateTopics: v0-v5 (updated to match implementation) + APIKeyDeleteTopics: {0, 4}, // DeleteTopics: v0-v4 + APIKeyFindCoordinator: {0, 3}, // FindCoordinator: v0-v3 (v3+ uses flexible format) + APIKeyJoinGroup: {0, 6}, // JoinGroup: cap to v6 (first flexible version) + APIKeySyncGroup: {0, 5}, // SyncGroup: v0-v5 + APIKeyOffsetCommit: {0, 2}, // OffsetCommit: v0-v2 + APIKeyOffsetFetch: {0, 5}, // OffsetFetch: v0-v5 (updated to match implementation) + APIKeyHeartbeat: {0, 4}, // Heartbeat: v0-v4 + APIKeyLeaveGroup: {0, 4}, // LeaveGroup: v0-v4 + APIKeyDescribeGroups: {0, 5}, // DescribeGroups: v0-v5 + APIKeyListGroups: {0, 4}, // ListGroups: v0-v4 + APIKeyDescribeConfigs: {0, 4}, // DescribeConfigs: v0-v4 + APIKeyInitProducerId: {0, 4}, // InitProducerId: v0-v4 + APIKeyDescribeCluster: {0, 1}, // DescribeCluster: v0-v1 (KIP-919, AdminClient compatibility) } - if versionRange, exists := supportedVersions[apiKey]; exists { + if versionRange, exists := supportedVersions[APIKey(apiKey)]; exists { minVer, maxVer := versionRange[0], versionRange[1] if apiVersion < minVer || apiVersion > maxVer { return fmt.Errorf("unsupported API version %d for API key %d (supported: %d-%d)", @@ -2723,44 +2749,46 @@ func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, reques } // getAPIName returns a human-readable name for Kafka API keys (for debugging) -func getAPIName(apiKey uint16) string { +func getAPIName(apiKey APIKey) string { switch apiKey { - case 0: + case APIKeyProduce: return "Produce" - case 1: + case APIKeyFetch: return "Fetch" - case 2: + case APIKeyListOffsets: return "ListOffsets" - case 3: + case APIKeyMetadata: return "Metadata" - case 8: + case APIKeyOffsetCommit: return "OffsetCommit" - case 9: + case APIKeyOffsetFetch: return "OffsetFetch" - case 10: + case APIKeyFindCoordinator: return "FindCoordinator" - case 11: + case APIKeyJoinGroup: return "JoinGroup" - case 12: + case APIKeyHeartbeat: return "Heartbeat" - case 13: + case APIKeyLeaveGroup: return "LeaveGroup" - case 14: + case APIKeySyncGroup: return "SyncGroup" - case 15: + case APIKeyDescribeGroups: return "DescribeGroups" - case 16: + case APIKeyListGroups: return "ListGroups" - case 18: + case APIKeyApiVersions: return "ApiVersions" - case 19: + case APIKeyCreateTopics: return "CreateTopics" - case 20: + case APIKeyDeleteTopics: return "DeleteTopics" - case 32: + case APIKeyDescribeConfigs: return "DescribeConfigs" - case 22: + case APIKeyInitProducerId: return "InitProducerId" + case APIKeyDescribeCluster: + return "DescribeCluster" default: return "Unknown" } @@ -2920,42 +2948,42 @@ func isFlexibleResponse(apiKey uint16, apiVersion uint16) bool { // Reference: kafka-go/protocol/response.go:119 and sarama/response_header.go:21 // Flexible responses have headerVersion >= 1, which adds tagged fields after correlation ID - switch apiKey { - case 0: // Produce + switch APIKey(apiKey) { + case APIKeyProduce: return apiVersion >= 9 - case 1: // Fetch + case APIKeyFetch: return apiVersion >= 12 - case 3: // Metadata + case APIKeyMetadata: // Metadata v9+ uses flexible responses (v7-8 use compact arrays/strings but NOT flexible headers) return apiVersion >= 9 - case 8: // OffsetCommit + case APIKeyOffsetCommit: return apiVersion >= 8 - case 9: // OffsetFetch + case APIKeyOffsetFetch: return apiVersion >= 6 - case 10: // FindCoordinator + case APIKeyFindCoordinator: return apiVersion >= 3 - case 11: // JoinGroup + case APIKeyJoinGroup: return apiVersion >= 6 - case 12: // Heartbeat + case APIKeyHeartbeat: return apiVersion >= 4 - case 13: // LeaveGroup + case APIKeyLeaveGroup: return apiVersion >= 4 - case 14: // SyncGroup + case APIKeySyncGroup: return apiVersion >= 4 - case 18: // ApiVersions + case APIKeyApiVersions: // CRITICAL: AdminClient compatibility requires header version 0 (no tagged fields) // Even though ApiVersions v3+ technically supports flexible responses, AdminClient // expects the header to NOT include tagged fields. This is a known quirk. return false // Always use non-flexible header for ApiVersions - case 19: // CreateTopics + case APIKeyCreateTopics: return apiVersion >= 5 - case 20: // DeleteTopics + case APIKeyDeleteTopics: return apiVersion >= 4 - case 22: // InitProducerId + case APIKeyInitProducerId: return apiVersion >= 2 // Flexible from v2+ (KIP-360) - case 32: // DescribeConfigs + case APIKeyDescribeConfigs: return apiVersion >= 4 - case 60: // DescribeCluster + case APIKeyDescribeCluster: return true // All versions (0+) are flexible default: // For unknown APIs, assume non-flexible (safer default) diff --git a/weed/mq/kafka/protocol/metrics.go b/weed/mq/kafka/protocol/metrics.go index 511354678..b4bcd98dd 100644 --- a/weed/mq/kafka/protocol/metrics.go +++ b/weed/mq/kafka/protocol/metrics.go @@ -11,18 +11,18 @@ type Metrics struct { // Request counters by API key requestCounts map[uint16]*int64 errorCounts map[uint16]*int64 - + // Latency tracking latencySum map[uint16]*int64 // Total latency in microseconds latencyCount map[uint16]*int64 // Number of requests for average calculation - + // Connection metrics activeConnections int64 totalConnections int64 - + // Mutex for map operations mu sync.RWMutex - + // Start time for uptime calculation startTime time.Time } @@ -38,10 +38,10 @@ type APIMetrics struct { // ConnectionMetrics represents connection-related metrics type ConnectionMetrics struct { - ActiveConnections int64 `json:"active_connections"` - TotalConnections int64 `json:"total_connections"` - UptimeSeconds int64 `json:"uptime_seconds"` - StartTime time.Time `json:"start_time"` + ActiveConnections int64 `json:"active_connections"` + TotalConnections int64 `json:"total_connections"` + UptimeSeconds int64 `json:"uptime_seconds"` + StartTime time.Time `json:"start_time"` } // MetricsSnapshot represents a complete metrics snapshot @@ -65,7 +65,7 @@ func NewMetrics() *Metrics { // RecordRequest records a successful request with latency func (m *Metrics) RecordRequest(apiKey uint16, latency time.Duration) { m.ensureCounters(apiKey) - + atomic.AddInt64(m.requestCounts[apiKey], 1) atomic.AddInt64(m.latencySum[apiKey], latency.Microseconds()) atomic.AddInt64(m.latencyCount[apiKey], 1) @@ -74,7 +74,7 @@ func (m *Metrics) RecordRequest(apiKey uint16, latency time.Duration) { // RecordError records an error for a specific API func (m *Metrics) RecordError(apiKey uint16, latency time.Duration) { m.ensureCounters(apiKey) - + atomic.AddInt64(m.requestCounts[apiKey], 1) atomic.AddInt64(m.errorCounts[apiKey], 1) atomic.AddInt64(m.latencySum[apiKey], latency.Microseconds()) @@ -96,29 +96,29 @@ func (m *Metrics) RecordDisconnection() { func (m *Metrics) GetSnapshot() MetricsSnapshot { m.mu.RLock() defer m.mu.RUnlock() - + apis := make([]APIMetrics, 0, len(m.requestCounts)) - + for apiKey, requestCount := range m.requestCounts { requests := atomic.LoadInt64(requestCount) errors := atomic.LoadInt64(m.errorCounts[apiKey]) latencySum := atomic.LoadInt64(m.latencySum[apiKey]) latencyCount := atomic.LoadInt64(m.latencyCount[apiKey]) - + var avgLatencyMs float64 if latencyCount > 0 { avgLatencyMs = float64(latencySum) / float64(latencyCount) / 1000.0 // Convert to milliseconds } - + apis = append(apis, APIMetrics{ APIKey: apiKey, - APIName: getAPIName(apiKey), + APIName: getAPIName(APIKey(apiKey)), RequestCount: requests, ErrorCount: errors, AvgLatencyMs: avgLatencyMs, }) } - + return MetricsSnapshot{ APIs: apis, Connections: ConnectionMetrics{ @@ -134,20 +134,20 @@ func (m *Metrics) GetSnapshot() MetricsSnapshot { // GetAPIMetrics returns metrics for a specific API func (m *Metrics) GetAPIMetrics(apiKey uint16) APIMetrics { m.ensureCounters(apiKey) - + requests := atomic.LoadInt64(m.requestCounts[apiKey]) errors := atomic.LoadInt64(m.errorCounts[apiKey]) latencySum := atomic.LoadInt64(m.latencySum[apiKey]) latencyCount := atomic.LoadInt64(m.latencyCount[apiKey]) - + var avgLatencyMs float64 if latencyCount > 0 { avgLatencyMs = float64(latencySum) / float64(latencyCount) / 1000.0 } - + return APIMetrics{ APIKey: apiKey, - APIName: getAPIName(apiKey), + APIName: getAPIName(APIKey(apiKey)), RequestCount: requests, ErrorCount: errors, AvgLatencyMs: avgLatencyMs, @@ -168,14 +168,14 @@ func (m *Metrics) GetConnectionMetrics() ConnectionMetrics { func (m *Metrics) Reset() { m.mu.Lock() defer m.mu.Unlock() - + for apiKey := range m.requestCounts { atomic.StoreInt64(m.requestCounts[apiKey], 0) atomic.StoreInt64(m.errorCounts[apiKey], 0) atomic.StoreInt64(m.latencySum[apiKey], 0) atomic.StoreInt64(m.latencyCount[apiKey], 0) } - + atomic.StoreInt64(&m.activeConnections, 0) atomic.StoreInt64(&m.totalConnections, 0) m.startTime = time.Now() @@ -189,15 +189,15 @@ func (m *Metrics) ensureCounters(apiKey uint16) { return } m.mu.RUnlock() - + m.mu.Lock() defer m.mu.Unlock() - + // Double-check after acquiring write lock if _, exists := m.requestCounts[apiKey]; exists { return } - + m.requestCounts[apiKey] = new(int64) m.errorCounts[apiKey] = new(int64) m.latencySum[apiKey] = new(int64)