Verify topics count

This commit is contained in:
chrislu
2025-10-07 00:30:36 -07:00
parent 13f5961504
commit fb003b2e5a

View File

@@ -71,7 +71,6 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
isSchemasTopic = true
}
// Long-poll if: (1) client wants to wait (maxWaitMs > 0), (2) no data available, (3) topics exist
// NOTE: We long-poll even if MinBytes=0, since the client specified a wait time
hasData := hasDataAvailable()
@@ -419,11 +418,27 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
}
// Verify topics count hasn't been corrupted
if !isFlexible && len(response) >= 14 {
actualTopicsCount := binary.BigEndian.Uint32(response[10:14])
if !isFlexible {
// Topics count position depends on API version:
// v0: byte 0 (no throttle_time_ms, no error_code, no session_id)
// v1-v6: byte 4 (after throttle_time_ms)
// v7+: byte 10 (after throttle_time_ms, error_code, session_id)
var topicsCountPos int
if apiVersion == 0 {
topicsCountPos = 0
} else if apiVersion < 7 {
topicsCountPos = 4
} else {
topicsCountPos = 10
}
if len(response) >= topicsCountPos+4 {
actualTopicsCount := binary.BigEndian.Uint32(response[topicsCountPos : topicsCountPos+4])
if actualTopicsCount != uint32(topicsCount) {
glog.Errorf("FETCH CORR=%d: Topics count CORRUPTED! Expected %d, found %d at response[10:14]=%02x %02x %02x %02x",
correlationID, topicsCount, actualTopicsCount, response[10], response[11], response[12], response[13])
glog.Errorf("FETCH CORR=%d v%d: Topics count CORRUPTED! Expected %d, found %d at response[%d:%d]=%02x %02x %02x %02x",
correlationID, apiVersion, topicsCount, actualTopicsCount, topicsCountPos, topicsCountPos+4,
response[topicsCountPos], response[topicsCountPos+1], response[topicsCountPos+2], response[topicsCountPos+3])
}
}
}