This commit is contained in:
chrislu
2025-10-06 16:40:16 -07:00
parent 9556889879
commit 193d6ed64e
3 changed files with 175 additions and 147 deletions

View File

@@ -196,32 +196,32 @@ func DecodeTaggedFields(data []byte) (*TaggedFields, int, error) {
// IsFlexibleVersion determines if an API version uses flexible versions
// This is API-specific and based on when each API adopted flexible versions
func IsFlexibleVersion(apiKey, apiVersion uint16) bool {
switch apiKey {
case 18: // ApiVersions
switch APIKey(apiKey) {
case APIKeyApiVersions:
return apiVersion >= 3
case 3: // Metadata
case APIKeyMetadata:
return apiVersion >= 9
case 1: // Fetch
case APIKeyFetch:
return apiVersion >= 12
case 0: // Produce
case APIKeyProduce:
return apiVersion >= 9
case 11: // JoinGroup
case APIKeyJoinGroup:
return apiVersion >= 6
case 14: // SyncGroup
case APIKeySyncGroup:
return apiVersion >= 4
case 8: // OffsetCommit
case APIKeyOffsetCommit:
return apiVersion >= 8
case 9: // OffsetFetch
case APIKeyOffsetFetch:
return apiVersion >= 6
case 10: // FindCoordinator
case APIKeyFindCoordinator:
return apiVersion >= 3
case 12: // Heartbeat
case APIKeyHeartbeat:
return apiVersion >= 4
case 13: // LeaveGroup
case APIKeyLeaveGroup:
return apiVersion >= 4
case 19: // CreateTopics
case APIKeyCreateTopics:
return apiVersion >= 2
case 20: // DeleteTopics
case APIKeyDeleteTopics:
return apiVersion >= 4
default:
return false

View File

@@ -90,6 +90,32 @@ const (
DefaultKafkaNamespace = "kafka"
)
// APIKey represents a Kafka API key type for better type safety
type APIKey uint16
// Kafka API Keys
const (
APIKeyProduce APIKey = 0
APIKeyFetch APIKey = 1
APIKeyListOffsets APIKey = 2
APIKeyMetadata APIKey = 3
APIKeyOffsetCommit APIKey = 8
APIKeyOffsetFetch APIKey = 9
APIKeyFindCoordinator APIKey = 10
APIKeyJoinGroup APIKey = 11
APIKeyHeartbeat APIKey = 12
APIKeyLeaveGroup APIKey = 13
APIKeySyncGroup APIKey = 14
APIKeyDescribeGroups APIKey = 15
APIKeyListGroups APIKey = 16
APIKeyApiVersions APIKey = 18
APIKeyCreateTopics APIKey = 19
APIKeyDeleteTopics APIKey = 20
APIKeyInitProducerId APIKey = 22
APIKeyDescribeConfigs APIKey = 32
APIKeyDescribeCluster APIKey = 60
)
// SeaweedMQHandlerInterface defines the interface for SeaweedMQ integration
type SeaweedMQHandlerInterface interface {
TopicExists(topic string) bool
@@ -395,10 +421,10 @@ func (h *Handler) GetCoordinatorRegistry() CoordinatorRegistryInterface {
// isDataPlaneAPI returns true if the API key is a data plane operation (Fetch, Produce)
// Data plane operations can be slow and may block on I/O
func isDataPlaneAPI(apiKey uint16) bool {
switch apiKey {
case 0: // Produce
switch APIKey(apiKey) {
case APIKeyProduce:
return true
case 1: // Fetch
case APIKeyFetch:
return true
default:
return false
@@ -700,12 +726,12 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
apiVersion := binary.BigEndian.Uint16(messageBuf[2:4])
correlationID := binary.BigEndian.Uint32(messageBuf[4:8])
// Debug("Parsed header - API Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(apiKey), apiVersion, correlationID)
// Debug("Parsed header - API Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID)
// Validate API version against what we support
Debug("VALIDATING API VERSION: Key=%d, Version=%d", apiKey, apiVersion)
if err := h.validateAPIVersion(apiKey, apiVersion); err != nil {
glog.Errorf("API VERSION VALIDATION FAILED: Key=%d (%s), Version=%d, error=%v", apiKey, getAPIName(apiKey), apiVersion, err)
glog.Errorf("API VERSION VALIDATION FAILED: Key=%d (%s), Version=%d, error=%v", apiKey, getAPIName(APIKey(apiKey)), apiVersion, err)
// Return proper Kafka error response for unsupported version
response, writeErr := h.buildUnsupportedVersionResponse(correlationID, apiKey, apiVersion)
if writeErr != nil {
@@ -730,11 +756,11 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// CRITICAL DEBUG: Log that validation passed
glog.V(4).Infof("API VERSION VALIDATION PASSED: Key=%d (%s), Version=%d, Correlation=%d - proceeding to header parsing",
apiKey, getAPIName(apiKey), apiVersion, correlationID)
apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID)
// Extract request body - special handling for ApiVersions requests
var requestBody []byte
if apiKey == 18 && apiVersion >= 3 {
if apiKey == uint16(APIKeyApiVersions) && apiVersion >= 3 {
// ApiVersions v3+ uses client_software_name + client_software_version, not client_id
bodyOffset := 8 // Skip api_key(2) + api_version(2) + correlation_id(4)
@@ -772,7 +798,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
if parseErr != nil {
// CRITICAL: Log the parsing error for debugging
glog.Errorf("REQUEST HEADER PARSING FAILED: API=%d (%s) v%d, correlation=%d, error=%v, msgLen=%d",
apiKey, getAPIName(apiKey), apiVersion, correlationID, parseErr, len(messageBuf))
apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID, parseErr, len(messageBuf))
// Fall back to basic header parsing if flexible version parsing fails
Debug("Flexible header parsing failed, using basic parsing: %v", parseErr)
@@ -793,7 +819,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
bodyOffset += int(clientIDLen)
}
requestBody = messageBuf[bodyOffset:]
glog.Infof("FALLBACK PARSING SUCCESS: API=%d (%s) v%d, bodyLen=%d", apiKey, getAPIName(apiKey), apiVersion, len(requestBody))
glog.Infof("FALLBACK PARSING SUCCESS: API=%d (%s) v%d, bodyLen=%d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, len(requestBody))
} else {
// Use the successfully parsed request body
requestBody = parsedRequestBody
@@ -828,7 +854,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// CRITICAL: Route request to appropriate processor
// Control plane: Fast, never blocks (Metadata, Heartbeat, etc.)
// Data plane: Can be slow (Fetch, Produce)
Debug("API REQUEST - Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(apiKey), apiVersion, correlationID)
Debug("API REQUEST - Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID)
req := &kafkaRequest{
correlationID: correlationID,
@@ -866,7 +892,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
// Record request start time for latency tracking
requestStart := time.Now()
apiName := getAPIName(req.apiKey)
apiName := getAPIName(APIKey(req.apiKey))
Debug("PROCESSING API REQUEST: Key=%d (%s), Version=%d, Correlation=%d",
req.apiKey, apiName, req.apiVersion, req.correlationID)
@@ -874,76 +900,76 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
var response []byte
var err error
switch req.apiKey {
case 18: // ApiVersions
switch APIKey(req.apiKey) {
case APIKeyApiVersions:
Debug("-> ApiVersions v%d", req.apiVersion)
response, err = h.handleApiVersions(req.correlationID, req.apiVersion)
case 3: // Metadata
case APIKeyMetadata:
Debug("-> Metadata v%d", req.apiVersion)
response, err = h.handleMetadata(req.correlationID, req.apiVersion, req.requestBody)
case 2: // ListOffsets
case APIKeyListOffsets:
response, err = h.handleListOffsets(req.correlationID, req.apiVersion, req.requestBody)
case 19: // CreateTopics
case APIKeyCreateTopics:
response, err = h.handleCreateTopics(req.correlationID, req.apiVersion, req.requestBody)
case 20: // DeleteTopics
case APIKeyDeleteTopics:
response, err = h.handleDeleteTopics(req.correlationID, req.requestBody)
case 0: // Produce
case APIKeyProduce:
Debug("-> Produce v%d", req.apiVersion)
response, err = h.handleProduce(req.correlationID, req.apiVersion, req.requestBody)
case 1: // Fetch
case APIKeyFetch:
Debug("-> Fetch v%d", req.apiVersion)
response, err = h.handleFetch(req.ctx, req.correlationID, req.apiVersion, req.requestBody)
case 11: // JoinGroup
case APIKeyJoinGroup:
Debug("-> JoinGroup v%d", req.apiVersion)
response, err = h.handleJoinGroup(req.correlationID, req.apiVersion, req.requestBody)
case 14: // SyncGroup
case APIKeySyncGroup:
Debug("-> SyncGroup v%d", req.apiVersion)
response, err = h.handleSyncGroup(req.correlationID, req.apiVersion, req.requestBody)
case 8: // OffsetCommit
case APIKeyOffsetCommit:
Debug("-> OffsetCommit")
response, err = h.handleOffsetCommit(req.correlationID, req.apiVersion, req.requestBody)
case 9: // OffsetFetch
case APIKeyOffsetFetch:
Debug("-> OffsetFetch v%d", req.apiVersion)
response, err = h.handleOffsetFetch(req.correlationID, req.apiVersion, req.requestBody)
case 10: // FindCoordinator
case APIKeyFindCoordinator:
Debug("-> FindCoordinator v%d", req.apiVersion)
response, err = h.handleFindCoordinator(req.correlationID, req.apiVersion, req.requestBody)
case 12: // Heartbeat
case APIKeyHeartbeat:
Debug("-> Heartbeat v%d", req.apiVersion)
response, err = h.handleHeartbeat(req.correlationID, req.apiVersion, req.requestBody)
case 13: // LeaveGroup
case APIKeyLeaveGroup:
response, err = h.handleLeaveGroup(req.correlationID, req.apiVersion, req.requestBody)
case 15: // DescribeGroups
case APIKeyDescribeGroups:
Debug("DescribeGroups request received, correlation: %d, version: %d", req.correlationID, req.apiVersion)
response, err = h.handleDescribeGroups(req.correlationID, req.apiVersion, req.requestBody)
case 16: // ListGroups
case APIKeyListGroups:
Debug("ListGroups request received, correlation: %d, version: %d", req.correlationID, req.apiVersion)
response, err = h.handleListGroups(req.correlationID, req.apiVersion, req.requestBody)
case 32: // DescribeConfigs
case APIKeyDescribeConfigs:
Debug("DescribeConfigs request received, correlation: %d, version: %d", req.correlationID, req.apiVersion)
response, err = h.handleDescribeConfigs(req.correlationID, req.apiVersion, req.requestBody)
case 60: // DescribeCluster
case APIKeyDescribeCluster:
Debug("-> DescribeCluster v%d", req.apiVersion)
response, err = h.handleDescribeCluster(req.correlationID, req.apiVersion, req.requestBody)
case 22: // InitProducerId
case APIKeyInitProducerId:
Debug("-> InitProducerId v%d", req.apiVersion)
response, err = h.handleInitProducerId(req.correlationID, req.apiVersion, req.requestBody)
@@ -965,32 +991,32 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
// ApiKeyInfo represents supported API key information
type ApiKeyInfo struct {
ApiKey uint16
ApiKey APIKey
MinVersion uint16
MaxVersion uint16
}
// SupportedApiKeys defines all supported API keys and their version ranges
var SupportedApiKeys = []ApiKeyInfo{
{18, 0, 4}, // ApiVersions - support up to v4 for Kafka 8.0.0 compatibility
{3, 0, 7}, // Metadata - support up to v7
{0, 0, 7}, // Produce
{1, 0, 7}, // Fetch
{2, 0, 2}, // ListOffsets
{19, 0, 5}, // CreateTopics
{20, 0, 4}, // DeleteTopics
{10, 0, 3}, // FindCoordinator - v3+ supports flexible responses
{11, 0, 6}, // JoinGroup
{14, 0, 5}, // SyncGroup
{8, 0, 2}, // OffsetCommit
{9, 0, 5}, // OffsetFetch
{12, 0, 4}, // Heartbeat
{13, 0, 4}, // LeaveGroup
{15, 0, 5}, // DescribeGroups
{16, 0, 4}, // ListGroups
{32, 0, 4}, // DescribeConfigs
{22, 0, 4}, // InitProducerId - support up to v4 for transactional producers
{60, 0, 1}, // DescribeCluster - for AdminClient compatibility (KIP-919)
{APIKeyApiVersions, 0, 4}, // ApiVersions - support up to v4 for Kafka 8.0.0 compatibility
{APIKeyMetadata, 0, 7}, // Metadata - support up to v7
{APIKeyProduce, 0, 7}, // Produce
{APIKeyFetch, 0, 7}, // Fetch
{APIKeyListOffsets, 0, 2}, // ListOffsets
{APIKeyCreateTopics, 0, 5}, // CreateTopics
{APIKeyDeleteTopics, 0, 4}, // DeleteTopics
{APIKeyFindCoordinator, 0, 3}, // FindCoordinator - v3+ supports flexible responses
{APIKeyJoinGroup, 0, 6}, // JoinGroup
{APIKeySyncGroup, 0, 5}, // SyncGroup
{APIKeyOffsetCommit, 0, 2}, // OffsetCommit
{APIKeyOffsetFetch, 0, 5}, // OffsetFetch
{APIKeyHeartbeat, 0, 4}, // Heartbeat
{APIKeyLeaveGroup, 0, 4}, // LeaveGroup
{APIKeyDescribeGroups, 0, 5}, // DescribeGroups
{APIKeyListGroups, 0, 4}, // ListGroups
{APIKeyDescribeConfigs, 0, 4}, // DescribeConfigs
{APIKeyInitProducerId, 0, 4}, // InitProducerId - support up to v4 for transactional producers
{APIKeyDescribeCluster, 0, 1}, // DescribeCluster - for AdminClient compatibility (KIP-919)
}
func (h *Handler) handleApiVersions(correlationID uint32, apiVersion uint16) ([]byte, error) {
@@ -2657,29 +2683,29 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) (
// validateAPIVersion checks if we support the requested API version
func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error {
supportedVersions := map[uint16][2]uint16{
18: {0, 4}, // ApiVersions: v0-v4 (Kafka 8.0.0 compatibility)
3: {0, 7}, // Metadata: v0-v7
0: {0, 7}, // Produce: v0-v7
1: {0, 7}, // Fetch: v0-v7
2: {0, 2}, // ListOffsets: v0-v2
19: {0, 5}, // CreateTopics: v0-v5 (updated to match implementation)
20: {0, 4}, // DeleteTopics: v0-v4
10: {0, 3}, // FindCoordinator: v0-v3 (v3+ uses flexible format)
11: {0, 6}, // JoinGroup: cap to v6 (first flexible version)
14: {0, 5}, // SyncGroup: v0-v5
8: {0, 2}, // OffsetCommit: v0-v2
9: {0, 5}, // OffsetFetch: v0-v5 (updated to match implementation)
12: {0, 4}, // Heartbeat: v0-v4
13: {0, 4}, // LeaveGroup: v0-v4
15: {0, 5}, // DescribeGroups: v0-v5
16: {0, 4}, // ListGroups: v0-v4
32: {0, 4}, // DescribeConfigs: v0-v4
22: {0, 4}, // InitProducerId: v0-v4
60: {0, 1}, // DescribeCluster: v0-v1 (KIP-919, AdminClient compatibility)
supportedVersions := map[APIKey][2]uint16{
APIKeyApiVersions: {0, 4}, // ApiVersions: v0-v4 (Kafka 8.0.0 compatibility)
APIKeyMetadata: {0, 7}, // Metadata: v0-v7
APIKeyProduce: {0, 7}, // Produce: v0-v7
APIKeyFetch: {0, 7}, // Fetch: v0-v7
APIKeyListOffsets: {0, 2}, // ListOffsets: v0-v2
APIKeyCreateTopics: {0, 5}, // CreateTopics: v0-v5 (updated to match implementation)
APIKeyDeleteTopics: {0, 4}, // DeleteTopics: v0-v4
APIKeyFindCoordinator: {0, 3}, // FindCoordinator: v0-v3 (v3+ uses flexible format)
APIKeyJoinGroup: {0, 6}, // JoinGroup: cap to v6 (first flexible version)
APIKeySyncGroup: {0, 5}, // SyncGroup: v0-v5
APIKeyOffsetCommit: {0, 2}, // OffsetCommit: v0-v2
APIKeyOffsetFetch: {0, 5}, // OffsetFetch: v0-v5 (updated to match implementation)
APIKeyHeartbeat: {0, 4}, // Heartbeat: v0-v4
APIKeyLeaveGroup: {0, 4}, // LeaveGroup: v0-v4
APIKeyDescribeGroups: {0, 5}, // DescribeGroups: v0-v5
APIKeyListGroups: {0, 4}, // ListGroups: v0-v4
APIKeyDescribeConfigs: {0, 4}, // DescribeConfigs: v0-v4
APIKeyInitProducerId: {0, 4}, // InitProducerId: v0-v4
APIKeyDescribeCluster: {0, 1}, // DescribeCluster: v0-v1 (KIP-919, AdminClient compatibility)
}
if versionRange, exists := supportedVersions[apiKey]; exists {
if versionRange, exists := supportedVersions[APIKey(apiKey)]; exists {
minVer, maxVer := versionRange[0], versionRange[1]
if apiVersion < minVer || apiVersion > maxVer {
return fmt.Errorf("unsupported API version %d for API key %d (supported: %d-%d)",
@@ -2723,44 +2749,46 @@ func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, reques
}
// getAPIName returns a human-readable name for Kafka API keys (for debugging)
func getAPIName(apiKey uint16) string {
func getAPIName(apiKey APIKey) string {
switch apiKey {
case 0:
case APIKeyProduce:
return "Produce"
case 1:
case APIKeyFetch:
return "Fetch"
case 2:
case APIKeyListOffsets:
return "ListOffsets"
case 3:
case APIKeyMetadata:
return "Metadata"
case 8:
case APIKeyOffsetCommit:
return "OffsetCommit"
case 9:
case APIKeyOffsetFetch:
return "OffsetFetch"
case 10:
case APIKeyFindCoordinator:
return "FindCoordinator"
case 11:
case APIKeyJoinGroup:
return "JoinGroup"
case 12:
case APIKeyHeartbeat:
return "Heartbeat"
case 13:
case APIKeyLeaveGroup:
return "LeaveGroup"
case 14:
case APIKeySyncGroup:
return "SyncGroup"
case 15:
case APIKeyDescribeGroups:
return "DescribeGroups"
case 16:
case APIKeyListGroups:
return "ListGroups"
case 18:
case APIKeyApiVersions:
return "ApiVersions"
case 19:
case APIKeyCreateTopics:
return "CreateTopics"
case 20:
case APIKeyDeleteTopics:
return "DeleteTopics"
case 32:
case APIKeyDescribeConfigs:
return "DescribeConfigs"
case 22:
case APIKeyInitProducerId:
return "InitProducerId"
case APIKeyDescribeCluster:
return "DescribeCluster"
default:
return "Unknown"
}
@@ -2920,42 +2948,42 @@ func isFlexibleResponse(apiKey uint16, apiVersion uint16) bool {
// Reference: kafka-go/protocol/response.go:119 and sarama/response_header.go:21
// Flexible responses have headerVersion >= 1, which adds tagged fields after correlation ID
switch apiKey {
case 0: // Produce
switch APIKey(apiKey) {
case APIKeyProduce:
return apiVersion >= 9
case 1: // Fetch
case APIKeyFetch:
return apiVersion >= 12
case 3: // Metadata
case APIKeyMetadata:
// Metadata v9+ uses flexible responses (v7-8 use compact arrays/strings but NOT flexible headers)
return apiVersion >= 9
case 8: // OffsetCommit
case APIKeyOffsetCommit:
return apiVersion >= 8
case 9: // OffsetFetch
case APIKeyOffsetFetch:
return apiVersion >= 6
case 10: // FindCoordinator
case APIKeyFindCoordinator:
return apiVersion >= 3
case 11: // JoinGroup
case APIKeyJoinGroup:
return apiVersion >= 6
case 12: // Heartbeat
case APIKeyHeartbeat:
return apiVersion >= 4
case 13: // LeaveGroup
case APIKeyLeaveGroup:
return apiVersion >= 4
case 14: // SyncGroup
case APIKeySyncGroup:
return apiVersion >= 4
case 18: // ApiVersions
case APIKeyApiVersions:
// CRITICAL: AdminClient compatibility requires header version 0 (no tagged fields)
// Even though ApiVersions v3+ technically supports flexible responses, AdminClient
// expects the header to NOT include tagged fields. This is a known quirk.
return false // Always use non-flexible header for ApiVersions
case 19: // CreateTopics
case APIKeyCreateTopics:
return apiVersion >= 5
case 20: // DeleteTopics
case APIKeyDeleteTopics:
return apiVersion >= 4
case 22: // InitProducerId
case APIKeyInitProducerId:
return apiVersion >= 2 // Flexible from v2+ (KIP-360)
case 32: // DescribeConfigs
case APIKeyDescribeConfigs:
return apiVersion >= 4
case 60: // DescribeCluster
case APIKeyDescribeCluster:
return true // All versions (0+) are flexible
default:
// For unknown APIs, assume non-flexible (safer default)

View File

@@ -11,18 +11,18 @@ type Metrics struct {
// Request counters by API key
requestCounts map[uint16]*int64
errorCounts map[uint16]*int64
// Latency tracking
latencySum map[uint16]*int64 // Total latency in microseconds
latencyCount map[uint16]*int64 // Number of requests for average calculation
// Connection metrics
activeConnections int64
totalConnections int64
// Mutex for map operations
mu sync.RWMutex
// Start time for uptime calculation
startTime time.Time
}
@@ -38,10 +38,10 @@ type APIMetrics struct {
// ConnectionMetrics represents connection-related metrics
type ConnectionMetrics struct {
ActiveConnections int64 `json:"active_connections"`
TotalConnections int64 `json:"total_connections"`
UptimeSeconds int64 `json:"uptime_seconds"`
StartTime time.Time `json:"start_time"`
ActiveConnections int64 `json:"active_connections"`
TotalConnections int64 `json:"total_connections"`
UptimeSeconds int64 `json:"uptime_seconds"`
StartTime time.Time `json:"start_time"`
}
// MetricsSnapshot represents a complete metrics snapshot
@@ -65,7 +65,7 @@ func NewMetrics() *Metrics {
// RecordRequest records a successful request with latency
func (m *Metrics) RecordRequest(apiKey uint16, latency time.Duration) {
m.ensureCounters(apiKey)
atomic.AddInt64(m.requestCounts[apiKey], 1)
atomic.AddInt64(m.latencySum[apiKey], latency.Microseconds())
atomic.AddInt64(m.latencyCount[apiKey], 1)
@@ -74,7 +74,7 @@ func (m *Metrics) RecordRequest(apiKey uint16, latency time.Duration) {
// RecordError records an error for a specific API
func (m *Metrics) RecordError(apiKey uint16, latency time.Duration) {
m.ensureCounters(apiKey)
atomic.AddInt64(m.requestCounts[apiKey], 1)
atomic.AddInt64(m.errorCounts[apiKey], 1)
atomic.AddInt64(m.latencySum[apiKey], latency.Microseconds())
@@ -96,29 +96,29 @@ func (m *Metrics) RecordDisconnection() {
func (m *Metrics) GetSnapshot() MetricsSnapshot {
m.mu.RLock()
defer m.mu.RUnlock()
apis := make([]APIMetrics, 0, len(m.requestCounts))
for apiKey, requestCount := range m.requestCounts {
requests := atomic.LoadInt64(requestCount)
errors := atomic.LoadInt64(m.errorCounts[apiKey])
latencySum := atomic.LoadInt64(m.latencySum[apiKey])
latencyCount := atomic.LoadInt64(m.latencyCount[apiKey])
var avgLatencyMs float64
if latencyCount > 0 {
avgLatencyMs = float64(latencySum) / float64(latencyCount) / 1000.0 // Convert to milliseconds
}
apis = append(apis, APIMetrics{
APIKey: apiKey,
APIName: getAPIName(apiKey),
APIName: getAPIName(APIKey(apiKey)),
RequestCount: requests,
ErrorCount: errors,
AvgLatencyMs: avgLatencyMs,
})
}
return MetricsSnapshot{
APIs: apis,
Connections: ConnectionMetrics{
@@ -134,20 +134,20 @@ func (m *Metrics) GetSnapshot() MetricsSnapshot {
// GetAPIMetrics returns metrics for a specific API
func (m *Metrics) GetAPIMetrics(apiKey uint16) APIMetrics {
m.ensureCounters(apiKey)
requests := atomic.LoadInt64(m.requestCounts[apiKey])
errors := atomic.LoadInt64(m.errorCounts[apiKey])
latencySum := atomic.LoadInt64(m.latencySum[apiKey])
latencyCount := atomic.LoadInt64(m.latencyCount[apiKey])
var avgLatencyMs float64
if latencyCount > 0 {
avgLatencyMs = float64(latencySum) / float64(latencyCount) / 1000.0
}
return APIMetrics{
APIKey: apiKey,
APIName: getAPIName(apiKey),
APIName: getAPIName(APIKey(apiKey)),
RequestCount: requests,
ErrorCount: errors,
AvgLatencyMs: avgLatencyMs,
@@ -168,14 +168,14 @@ func (m *Metrics) GetConnectionMetrics() ConnectionMetrics {
func (m *Metrics) Reset() {
m.mu.Lock()
defer m.mu.Unlock()
for apiKey := range m.requestCounts {
atomic.StoreInt64(m.requestCounts[apiKey], 0)
atomic.StoreInt64(m.errorCounts[apiKey], 0)
atomic.StoreInt64(m.latencySum[apiKey], 0)
atomic.StoreInt64(m.latencyCount[apiKey], 0)
}
atomic.StoreInt64(&m.activeConnections, 0)
atomic.StoreInt64(&m.totalConnections, 0)
m.startTime = time.Now()
@@ -189,15 +189,15 @@ func (m *Metrics) ensureCounters(apiKey uint16) {
return
}
m.mu.RUnlock()
m.mu.Lock()
defer m.mu.Unlock()
// Double-check after acquiring write lock
if _, exists := m.requestCounts[apiKey]; exists {
return
}
m.requestCounts[apiKey] = new(int64)
m.errorCounts[apiKey] = new(int64)
m.latencySum[apiKey] = new(int64)