From a710e4d2cbd1b997937a5b18b4f3f178394bd16c Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 4 Oct 2025 12:40:39 -0700 Subject: [PATCH] reduce logs, reduce CPU usages --- weed/cluster/lock_client.go | 44 +++-- weed/command/mq_kafka_gateway.go | 13 ++ weed/mq/broker/broker_grpc_sub.go | 6 +- .../mq/broker/broker_topic_conf_read_write.go | 22 +-- weed/mq/kafka/gateway/server.go | 2 - .../mq/kafka/integration/seaweedmq_handler.go | 127 ++++++++++----- weed/mq/kafka/protocol/fetch.go | 150 +++++------------- weed/mq/kafka/protocol/handler.go | 38 ++--- weed/mq/topic/local_topic.go | 9 +- weed/server/filer_grpc_server_dlm.go | 14 +- 10 files changed, 216 insertions(+), 209 deletions(-) diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index ae47d01f9..d6ed9f5ee 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -3,13 +3,14 @@ package cluster import ( "context" "fmt" + "time" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" - "time" ) type LockClient struct { @@ -109,7 +110,7 @@ func (lock *LiveLock) retryUntilLocked(lockDuration time.Duration) { } func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error { - glog.V(0).Infof("šŸ”„ LOCK: AttemptToLock key=%s owner=%s", lock.key, lock.self) + glog.V(4).Infof("LOCK: AttemptToLock key=%s owner=%s", lock.key, lock.self) errorMessage, err := lock.doLock(lockDuration) if err != nil { glog.V(0).Infof("āŒ LOCK: doLock failed for key=%s: %v", lock.key, err) @@ -121,8 +122,11 @@ func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error { time.Sleep(time.Second) return fmt.Errorf("%v", errorMessage) } + if !lock.isLocked { + // Only log when transitioning from unlocked to locked + glog.V(0).Infof("āœ… LOCK: Successfully acquired key=%s owner=%s", lock.key, lock.self) + } lock.isLocked = true - glog.V(0).Infof("āœ… LOCK: Successfully locked key=%s owner=%s", lock.key, lock.self) return nil } @@ -151,14 +155,18 @@ func (lock *LiveLock) Stop() error { default: close(lock.cancelCh) } - + // Also release the lock if held return lock.StopShortLivedLock() } func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) { - glog.V(0).Infof("šŸ”„ LOCK: doLock calling DistributedLock - key=%s filer=%s owner=%s renewToken=%s", - lock.key, lock.hostFiler, lock.self, lock.renewToken) + glog.V(4).Infof("LOCK: doLock calling DistributedLock - key=%s filer=%s owner=%s", + lock.key, lock.hostFiler, lock.self) + + previousHostFiler := lock.hostFiler + previousOwner := lock.owner + err = pb.WithFilerClient(false, 0, lock.hostFiler, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{ Name: lock.key, @@ -167,28 +175,34 @@ func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, e IsMoved: false, Owner: lock.self, }) - glog.V(0).Infof("šŸ” LOCK: DistributedLock response - key=%s err=%v resp=%+v", lock.key, err, resp) + glog.V(4).Infof("LOCK: DistributedLock response - key=%s err=%v", lock.key, err) if err == nil && resp != nil { lock.renewToken = resp.RenewToken - glog.V(0).Infof("āœ… LOCK: Got renewToken for key=%s: %s", lock.key, lock.renewToken) + glog.V(4).Infof("LOCK: Got renewToken for key=%s", lock.key) } else { //this can be retried. Need to remember the last valid renewToken lock.renewToken = "" - glog.V(0).Infof("āš ļø LOCK: Cleared renewToken for key=%s", lock.key) + glog.V(0).Infof("āš ļø LOCK: Cleared renewToken for key=%s (err=%v)", lock.key, err) } if resp != nil { errorMessage = resp.Error - if resp.LockHostMovedTo != "" { - glog.V(0).Infof("šŸ”„ LOCK: Lock moved to %s for key=%s", resp.LockHostMovedTo, lock.key) + if resp.LockHostMovedTo != "" && resp.LockHostMovedTo != string(previousHostFiler) { + // Only log if the host actually changed + glog.V(0).Infof("šŸ”„ LOCK: Host changed from %s to %s for key=%s", previousHostFiler, resp.LockHostMovedTo, lock.key) lock.hostFiler = pb.ServerAddress(resp.LockHostMovedTo) lock.lc.seedFiler = lock.hostFiler + } else if resp.LockHostMovedTo != "" { + lock.hostFiler = pb.ServerAddress(resp.LockHostMovedTo) } - if resp.LockOwner != "" { + if resp.LockOwner != "" && resp.LockOwner != previousOwner { + // Only log if the owner actually changed + glog.V(0).Infof("šŸ‘¤ LOCK: Owner changed from %s to %s for key=%s", previousOwner, resp.LockOwner, lock.key) lock.owner = resp.LockOwner - glog.V(0).Infof("šŸ‘¤ LOCK: Lock owner is %s for key=%s", lock.owner, lock.key) - } else { + } else if resp.LockOwner != "" { + lock.owner = resp.LockOwner + } else if previousOwner != "" { + glog.V(0).Infof("āš ļø LOCK: Owner cleared for key=%s", lock.key) lock.owner = "" - glog.V(0).Infof("āš ļø LOCK: No lock owner for key=%s", lock.key) } } return err diff --git a/weed/command/mq_kafka_gateway.go b/weed/command/mq_kafka_gateway.go index a1ce4f4bc..b76ae77a4 100644 --- a/weed/command/mq_kafka_gateway.go +++ b/weed/command/mq_kafka_gateway.go @@ -2,6 +2,8 @@ package command import ( "fmt" + "net/http" + _ "net/http/pprof" "os" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -105,6 +107,17 @@ func runMqKafkaGateway(cmd *Command, args []string) bool { } glog.V(0).Infof("Using SeaweedMQ brokers from masters: %s", *mqKafkaGatewayOptions.master) + // Start HTTP profiling server + pprofPort := *mqKafkaGatewayOptions.port + 1000 // e.g., 10093 for profiling if gateway is on 9093 + go func() { + pprofAddr := fmt.Sprintf(":%d", pprofPort) + glog.V(0).Infof("Kafka Gateway pprof server listening on %s", pprofAddr) + glog.V(0).Infof("Access profiling at: http://localhost:%d/debug/pprof/", pprofPort) + if err := http.ListenAndServe(pprofAddr, nil); err != nil { + glog.Errorf("pprof server error: %v", err) + } + }() + if err := srv.Start(); err != nil { glog.Fatalf("mq kafka gateway start: %v", err) return false diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 6df451a80..369573379 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -38,13 +38,13 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) - glog.V(0).Infof("šŸ” DEBUG: Calling GetOrGenerateLocalPartition for %s %s", t, partition) + glog.V(4).Infof("Calling GetOrGenerateLocalPartition for %s %s", t, partition) localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition) if getOrGenErr != nil { - glog.V(0).Infof("šŸ” DEBUG: GetOrGenerateLocalPartition failed: %v", getOrGenErr) + glog.V(4).Infof("GetOrGenerateLocalPartition failed: %v", getOrGenErr) return getOrGenErr } - glog.V(0).Infof("šŸ” DEBUG: GetOrGenerateLocalPartition succeeded, localTopicPartition=%v", localTopicPartition != nil) + glog.V(4).Infof("GetOrGenerateLocalPartition succeeded, localTopicPartition=%v", localTopicPartition != nil) if localTopicPartition == nil { return fmt.Errorf("failed to get or generate local partition for topic %v partition %v", t, partition) } diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index 9f1e75d1d..ec44c73d9 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -46,18 +46,18 @@ func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition t func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) { self := b.option.BrokerAddress() - glog.V(0).Infof("šŸ” DEBUG: genLocalPartitionFromFiler for %s %s, self=%s", t, partition, self) - glog.V(0).Infof("šŸ” DEBUG: conf.BrokerPartitionAssignments: %v", conf.BrokerPartitionAssignments) + glog.V(4).Infof("genLocalPartitionFromFiler for %s %s, self=%s", t, partition, self) + glog.V(4).Infof("conf.BrokerPartitionAssignments: %v", conf.BrokerPartitionAssignments) for _, assignment := range conf.BrokerPartitionAssignments { assignmentPartition := topic.FromPbPartition(assignment.Partition) - glog.V(0).Infof("šŸ” DEBUG: checking assignment: LeaderBroker=%s, Partition=%s", assignment.LeaderBroker, assignmentPartition) - glog.V(0).Infof("šŸ” DEBUG: comparing self=%s with LeaderBroker=%s: %v", self, assignment.LeaderBroker, assignment.LeaderBroker == string(self)) - glog.V(0).Infof("šŸ” DEBUG: comparing partition=%s with assignmentPartition=%s: %v", partition.String(), assignmentPartition.String(), partition.Equals(assignmentPartition)) - glog.V(0).Infof("šŸ” DEBUG: logical comparison (RangeStart, RangeStop only): %v", partition.LogicalEquals(assignmentPartition)) - glog.V(0).Infof("šŸ” DEBUG: partition details: RangeStart=%d, RangeStop=%d, RingSize=%d, UnixTimeNs=%d", partition.RangeStart, partition.RangeStop, partition.RingSize, partition.UnixTimeNs) - glog.V(0).Infof("šŸ” DEBUG: assignmentPartition details: RangeStart=%d, RangeStop=%d, RingSize=%d, UnixTimeNs=%d", assignmentPartition.RangeStart, assignmentPartition.RangeStop, assignmentPartition.RingSize, assignmentPartition.UnixTimeNs) + glog.V(4).Infof("checking assignment: LeaderBroker=%s, Partition=%s", assignment.LeaderBroker, assignmentPartition) + glog.V(4).Infof("comparing self=%s with LeaderBroker=%s: %v", self, assignment.LeaderBroker, assignment.LeaderBroker == string(self)) + glog.V(4).Infof("comparing partition=%s with assignmentPartition=%s: %v", partition.String(), assignmentPartition.String(), partition.Equals(assignmentPartition)) + glog.V(4).Infof("logical comparison (RangeStart, RangeStop only): %v", partition.LogicalEquals(assignmentPartition)) + glog.V(4).Infof("partition details: RangeStart=%d, RangeStop=%d, RingSize=%d, UnixTimeNs=%d", partition.RangeStart, partition.RangeStop, partition.RingSize, partition.UnixTimeNs) + glog.V(4).Infof("assignmentPartition details: RangeStart=%d, RangeStop=%d, RingSize=%d, UnixTimeNs=%d", assignmentPartition.RangeStart, assignmentPartition.RangeStop, assignmentPartition.RingSize, assignmentPartition.UnixTimeNs) if assignment.LeaderBroker == string(self) && partition.LogicalEquals(assignmentPartition) { - glog.V(0).Infof("šŸ” DEBUG: Creating local partition for %s %s", t, partition) + glog.V(4).Infof("Creating local partition for %s %s", t, partition) localPartition = topic.NewLocalPartition(partition, b.option.LogFlushInterval, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition)) // Initialize offset from existing data to ensure continuity on restart @@ -65,13 +65,13 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition b.localTopicManager.AddLocalPartition(t, localPartition) isGenerated = true - glog.V(0).Infof("šŸ” DEBUG: Successfully added local partition %s %s to localTopicManager", t, partition) + glog.V(4).Infof("Successfully added local partition %s %s to localTopicManager", t, partition) break } } if !isGenerated { - glog.V(0).Infof("šŸ” DEBUG: No matching assignment found for %s %s", t, partition) + glog.V(4).Infof("No matching assignment found for %s %s", t, partition) } return localPartition, isGenerated, nil diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index 5f99d68f3..c986330f5 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -177,11 +177,9 @@ func (s *Server) Start() error { s.wg.Add(1) go func(c net.Conn) { defer s.wg.Done() - fmt.Printf("šŸ”„ GATEWAY DEBUG: About to call handler.HandleConn for %s\n", c.RemoteAddr()) if err := s.handler.HandleConn(s.ctx, c); err != nil { glog.V(1).Infof("handle conn %v: %v", c.RemoteAddr(), err) } - fmt.Printf("šŸ”„ GATEWAY DEBUG: handler.HandleConn completed for %s\n", c.RemoteAddr()) }(conn) } }() diff --git a/weed/mq/kafka/integration/seaweedmq_handler.go b/weed/mq/kafka/integration/seaweedmq_handler.go index 636641b00..c39ba3ccb 100644 --- a/weed/mq/kafka/integration/seaweedmq_handler.go +++ b/weed/mq/kafka/integration/seaweedmq_handler.go @@ -41,6 +41,12 @@ type hwmCacheEntry struct { expiresAt time.Time } +// topicExistsCacheEntry represents a cached topic existence check +type topicExistsCacheEntry struct { + exists bool + expiresAt time.Time +} + // SeaweedMQHandler integrates Kafka protocol handlers with real SeaweedMQ storage type SeaweedMQHandler struct { // Shared filer client accessor for all components @@ -67,6 +73,11 @@ type SeaweedMQHandler struct { hwmCache map[string]*hwmCacheEntry // key: "topic:partition" hwmCacheMu sync.RWMutex hwmCacheTTL time.Duration + + // Topic existence cache to reduce broker queries + topicExistsCache map[string]*topicExistsCacheEntry // key: "topic" + topicExistsCacheMu sync.RWMutex + topicExistsCacheTTL time.Duration } // ConnectionContext holds connection-specific information for requests @@ -256,23 +267,23 @@ type PartitionRangeInfo struct { // GetEarliestOffset returns the earliest available offset for a topic partition // ALWAYS queries SMQ broker directly - no ledger involved func (h *SeaweedMQHandler) GetEarliestOffset(topic string, partition int32) (int64, error) { - glog.Infof("[DEBUG_OFFSET] GetEarliestOffset called for topic=%s partition=%d", topic, partition) + glog.V(4).Infof("[DEBUG_OFFSET] GetEarliestOffset called for topic=%s partition=%d", topic, partition) // Check if topic exists if !h.TopicExists(topic) { - glog.Infof("[DEBUG_OFFSET] Topic %s does not exist", topic) + glog.V(4).Infof("[DEBUG_OFFSET] Topic %s does not exist", topic) return 0, nil // Empty topic starts at offset 0 } // ALWAYS query SMQ broker directly for earliest offset if h.brokerClient != nil { - glog.Infof("[DEBUG_OFFSET] Querying SMQ broker for earliest offset...") + glog.V(4).Infof("[DEBUG_OFFSET] Querying SMQ broker for earliest offset...") earliestOffset, err := h.brokerClient.GetEarliestOffset(topic, partition) if err != nil { glog.Errorf("[DEBUG_OFFSET] Failed to get earliest offset from broker: %v", err) return 0, err } - glog.Infof("[DEBUG_OFFSET] Got earliest offset from broker: %d", earliestOffset) + glog.V(4).Infof("[DEBUG_OFFSET] Got earliest offset from broker: %d", earliestOffset) return earliestOffset, nil } @@ -399,7 +410,7 @@ func (h *SeaweedMQHandler) CreatePerConnectionBrokerClient() (*BrokerClient, err // Use the first broker address (in production, could use load balancing) brokerAddress := h.brokerAddresses[0] - glog.Infof("[BROKER_CLIENT] Creating per-connection client to %s", brokerAddress) + glog.V(4).Infof("[BROKER_CLIENT] Creating per-connection client to %s", brokerAddress) // Create a new client with the shared filer accessor client, err := NewBrokerClientWithFilerAccessor(brokerAddress, h.filerClientAccessor) @@ -407,7 +418,7 @@ func (h *SeaweedMQHandler) CreatePerConnectionBrokerClient() (*BrokerClient, err return nil, fmt.Errorf("failed to create broker client: %w", err) } - glog.Infof("[BROKER_CLIENT] Successfully created per-connection client") + glog.V(4).Infof("[BROKER_CLIENT] Successfully created per-connection client") return client, nil } @@ -478,6 +489,9 @@ func (h *SeaweedMQHandler) CreateTopicWithSchemas(name string, partitions int32, // Offset management now handled directly by SMQ broker - no initialization needed + // Invalidate cache after successful topic creation + h.InvalidateTopicExistsCache(name) + glog.V(1).Infof("Topic %s created successfully with %d partitions", name, partitions) return nil } @@ -565,19 +579,56 @@ func (h *SeaweedMQHandler) DeleteTopic(name string) error { } // TopicExists checks if a topic exists in SeaweedMQ broker (includes in-memory topics) +// Uses a 5-second cache to reduce broker queries func (h *SeaweedMQHandler) TopicExists(name string) bool { + // Check cache first + h.topicExistsCacheMu.RLock() + if entry, found := h.topicExistsCache[name]; found { + if time.Now().Before(entry.expiresAt) { + h.topicExistsCacheMu.RUnlock() + glog.V(4).Infof("TopicExists cache HIT for %s: %v", name, entry.exists) + return entry.exists + } + } + h.topicExistsCacheMu.RUnlock() + + // Cache miss or expired - query broker + glog.V(4).Infof("TopicExists cache MISS for %s, querying broker", name) + + var exists bool // Check via SeaweedMQ broker (includes in-memory topics) if h.brokerClient != nil { - exists, err := h.brokerClient.TopicExists(name) - if err == nil { - return exists + var err error + exists, err = h.brokerClient.TopicExists(name) + if err != nil { + fmt.Printf("TopicExists: Failed to check topic %s via SMQ broker: %v\n", name, err) + // Don't cache errors + return false } - fmt.Printf("TopicExists: Failed to check topic %s via SMQ broker: %v\n", name, err) + } else { + // Return false if broker is unavailable + fmt.Printf("TopicExists: No broker client available for topic %s\n", name) + return false } - // Return false if broker is unavailable - fmt.Printf("TopicExists: No broker client available for topic %s\n", name) - return false + // Update cache + h.topicExistsCacheMu.Lock() + h.topicExistsCache[name] = &topicExistsCacheEntry{ + exists: exists, + expiresAt: time.Now().Add(h.topicExistsCacheTTL), + } + h.topicExistsCacheMu.Unlock() + + return exists +} + +// InvalidateTopicExistsCache removes a topic from the existence cache +// Should be called after creating or deleting a topic +func (h *SeaweedMQHandler) InvalidateTopicExistsCache(name string) { + h.topicExistsCacheMu.Lock() + delete(h.topicExistsCache, name) + h.topicExistsCacheMu.Unlock() + glog.V(4).Infof("Invalidated TopicExists cache for %s", name) } // GetTopicInfo returns information about a topic from filer @@ -1004,9 +1055,11 @@ func NewSeaweedMQBrokerHandler(masters string, filerGroup string, clientHost str masterClient: masterClient, // topics map removed - always read from filer directly // ledgers removed - SMQ broker handles all offset management - brokerAddresses: brokerAddresses, // Store all discovered broker addresses - hwmCache: make(map[string]*hwmCacheEntry), - hwmCacheTTL: 2 * time.Second, // 2 second cache TTL to reduce broker queries + brokerAddresses: brokerAddresses, // Store all discovered broker addresses + hwmCache: make(map[string]*hwmCacheEntry), + hwmCacheTTL: 2 * time.Second, // 2 second cache TTL to reduce broker queries + topicExistsCache: make(map[string]*topicExistsCacheEntry), + topicExistsCacheTTL: 5 * time.Second, // 5 second cache TTL for topic existence }, nil } @@ -1254,14 +1307,12 @@ func NewBrokerClientWithFilerAccessor(brokerAddress string, filerClientAccessor // Close shuts down the broker client and all streams func (bc *BrokerClient) Close() error { - glog.V(0).Infof("šŸ” BrokerClient.Close() called - closing %d publishers and %d subscribers", len(bc.publishers), len(bc.subscribers)) bc.cancel() // Close all publisher streams bc.publishersLock.Lock() for key, session := range bc.publishers { if session.Stream != nil { - glog.V(0).Infof("šŸ” Closing publisher stream for %s", key) _ = session.Stream.CloseSend() } delete(bc.publishers, key) @@ -1271,7 +1322,6 @@ func (bc *BrokerClient) Close() error { // Close all subscriber streams bc.subscribersLock.Lock() for key, session := range bc.subscribers { - glog.V(0).Infof("šŸ” Closing subscriber stream for %s", key) if session.Stream != nil { _ = session.Stream.CloseSend() } @@ -1282,13 +1332,12 @@ func (bc *BrokerClient) Close() error { } bc.subscribersLock.Unlock() - glog.V(0).Infof("šŸ” BrokerClient.Close() complete, closing gRPC connection") return bc.conn.Close() } // GetPartitionRangeInfo gets comprehensive range information from SeaweedMQ broker's native range manager func (bc *BrokerClient) GetPartitionRangeInfo(topic string, partition int32) (*PartitionRangeInfo, error) { - glog.Infof("[DEBUG_OFFSET] GetPartitionRangeInfo called for topic=%s partition=%d", topic, partition) + glog.V(4).Infof("[DEBUG_OFFSET] GetPartitionRangeInfo called for topic=%s partition=%d", topic, partition) if bc.client == nil { return nil, fmt.Errorf("broker client not connected") @@ -1312,12 +1361,12 @@ func (bc *BrokerClient) GetPartitionRangeInfo(topic string, partition int32) (*P Partition: actualPartition, }) if err != nil { - glog.Infof("[DEBUG_OFFSET] Failed to call GetPartitionRangeInfo gRPC: %v", err) + glog.V(4).Infof("[DEBUG_OFFSET] Failed to call GetPartitionRangeInfo gRPC: %v", err) return nil, fmt.Errorf("failed to get partition range info from broker: %v", err) } if resp.Error != "" { - glog.Infof("[DEBUG_OFFSET] Broker returned error: %s", resp.Error) + glog.V(4).Infof("[DEBUG_OFFSET] Broker returned error: %s", resp.Error) return nil, fmt.Errorf("broker error: %s", resp.Error) } @@ -1346,7 +1395,7 @@ func (bc *BrokerClient) GetPartitionRangeInfo(topic string, partition int32) (*P ActiveSubscriptions: resp.ActiveSubscriptions, } - glog.Infof("[DEBUG_OFFSET] Got range info from broker: earliest=%d, latest=%d, hwm=%d, records=%d, ts_range=[%d,%d]", + glog.V(4).Infof("[DEBUG_OFFSET] Got range info from broker: earliest=%d, latest=%d, hwm=%d, records=%d, ts_range=[%d,%d]", info.EarliestOffset, info.LatestOffset, info.HighWaterMark, info.RecordCount, info.EarliestTimestampNs, info.LatestTimestampNs) return info, nil @@ -1354,23 +1403,23 @@ func (bc *BrokerClient) GetPartitionRangeInfo(topic string, partition int32) (*P // GetHighWaterMark gets the high water mark for a topic partition func (bc *BrokerClient) GetHighWaterMark(topic string, partition int32) (int64, error) { - glog.Infof("[DEBUG_OFFSET] GetHighWaterMark called for topic=%s partition=%d", topic, partition) + glog.V(4).Infof("[DEBUG_OFFSET] GetHighWaterMark called for topic=%s partition=%d", topic, partition) // Primary approach: Use SeaweedMQ's native range manager via gRPC info, err := bc.GetPartitionRangeInfo(topic, partition) if err != nil { - glog.Infof("[DEBUG_OFFSET] Failed to get offset info from broker, falling back to chunk metadata: %v", err) + glog.V(4).Infof("[DEBUG_OFFSET] Failed to get offset info from broker, falling back to chunk metadata: %v", err) // Fallback to chunk metadata approach highWaterMark, err := bc.getHighWaterMarkFromChunkMetadata(topic, partition) if err != nil { - glog.Infof("[DEBUG_OFFSET] Failed to get high water mark from chunk metadata: %v", err) + glog.V(4).Infof("[DEBUG_OFFSET] Failed to get high water mark from chunk metadata: %v", err) return 0, err } - glog.Infof("[DEBUG_OFFSET] Got high water mark from chunk metadata fallback: %d", highWaterMark) + glog.V(4).Infof("[DEBUG_OFFSET] Got high water mark from chunk metadata fallback: %d", highWaterMark) return highWaterMark, nil } - glog.Infof("[DEBUG_OFFSET] Successfully got high water mark from broker: %d", info.HighWaterMark) + glog.V(4).Infof("[DEBUG_OFFSET] Successfully got high water mark from broker: %d", info.HighWaterMark) return info.HighWaterMark, nil } @@ -1414,7 +1463,7 @@ func (bc *BrokerClient) getOffsetRangeFromChunkMetadata(topic string, partition } if latestVersion == "" { - glog.Infof("[DEBUG_OFFSET] No version directory found for topic %s", topic) + glog.V(4).Infof("[DEBUG_OFFSET] No version directory found for topic %s", topic) return 0, 0, nil } @@ -1449,7 +1498,7 @@ func (bc *BrokerClient) getOffsetRangeFromChunkMetadata(topic string, partition } if partitionDir == "" { - glog.Infof("[DEBUG_OFFSET] No partition directory found for topic %s", topic) + glog.V(4).Infof("[DEBUG_OFFSET] No partition directory found for topic %s", topic) return 0, 0, nil } @@ -1488,7 +1537,7 @@ func (bc *BrokerClient) getOffsetRangeFromChunkMetadata(topic string, partition if maxOffset > highWaterMark { highWaterMark = maxOffset } - glog.Infof("[DEBUG_OFFSET] %s file %s has offset_max=%d", fileType, resp.Entry.Name, maxOffset) + glog.V(4).Infof("[DEBUG_OFFSET] %s file %s has offset_max=%d", fileType, resp.Entry.Name, maxOffset) } // Track minimum offset for earliest offset @@ -1497,7 +1546,7 @@ func (bc *BrokerClient) getOffsetRangeFromChunkMetadata(topic string, partition if earliestOffset == -1 || minOffset < earliestOffset { earliestOffset = minOffset } - glog.Infof("[DEBUG_OFFSET] %s file %s has offset_min=%d", fileType, resp.Entry.Name, minOffset) + glog.V(4).Infof("[DEBUG_OFFSET] %s file %s has offset_min=%d", fileType, resp.Entry.Name, minOffset) } } } @@ -1518,7 +1567,7 @@ func (bc *BrokerClient) getOffsetRangeFromChunkMetadata(topic string, partition earliestOffset = 0 } - glog.Infof("[DEBUG_OFFSET] Offset range for topic %s partition %d: earliest=%d, highWaterMark=%d", topic, partition, earliestOffset, highWaterMark) + glog.V(4).Infof("[DEBUG_OFFSET] Offset range for topic %s partition %d: earliest=%d, highWaterMark=%d", topic, partition, earliestOffset, highWaterMark) return earliestOffset, highWaterMark, nil } @@ -1530,23 +1579,23 @@ func (bc *BrokerClient) getHighWaterMarkFromChunkMetadata(topic string, partitio // GetEarliestOffset gets the earliest offset from SeaweedMQ broker's native offset manager func (bc *BrokerClient) GetEarliestOffset(topic string, partition int32) (int64, error) { - glog.Infof("[DEBUG_OFFSET] BrokerClient.GetEarliestOffset called for topic=%s partition=%d", topic, partition) + glog.V(4).Infof("[DEBUG_OFFSET] BrokerClient.GetEarliestOffset called for topic=%s partition=%d", topic, partition) // Primary approach: Use SeaweedMQ's native range manager via gRPC info, err := bc.GetPartitionRangeInfo(topic, partition) if err != nil { - glog.Infof("[DEBUG_OFFSET] Failed to get offset info from broker, falling back to chunk metadata: %v", err) + glog.V(4).Infof("[DEBUG_OFFSET] Failed to get offset info from broker, falling back to chunk metadata: %v", err) // Fallback to chunk metadata approach earliestOffset, err := bc.getEarliestOffsetFromChunkMetadata(topic, partition) if err != nil { - glog.Infof("[DEBUG_OFFSET] Failed to get earliest offset from chunk metadata: %v", err) + glog.V(4).Infof("[DEBUG_OFFSET] Failed to get earliest offset from chunk metadata: %v", err) return 0, err } - glog.Infof("[DEBUG_OFFSET] Got earliest offset from chunk metadata fallback: %d", earliestOffset) + glog.V(4).Infof("[DEBUG_OFFSET] Got earliest offset from chunk metadata fallback: %d", earliestOffset) return earliestOffset, nil } - glog.Infof("[DEBUG_OFFSET] Successfully got earliest offset from broker: %d", info.EarliestOffset) + glog.V(4).Infof("[DEBUG_OFFSET] Successfully got earliest offset from broker: %d", info.EarliestOffset) return info.EarliestOffset, nil } diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 1f7a53ab8..1e1bc69c8 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -61,21 +61,33 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers } return false } - // Cap long-polling to avoid blocking connection shutdowns in tests + // Long-poll when client requests it via MaxWaitTime and there's no data + // Even if MinBytes=0, we should honor MaxWaitTime to reduce polling overhead maxWaitMs := fetchRequest.MaxWaitTime - if maxWaitMs > 1000 { - maxWaitMs = 1000 + // For production, allow longer wait times to reduce CPU usage + // Only cap at 30 seconds to be reasonable + if maxWaitMs > 30000 { + maxWaitMs = 30000 } - shouldLongPoll := fetchRequest.MinBytes > 0 && maxWaitMs > 0 && !hasDataAvailable() && allTopicsExist() + // Long-poll if: (1) client wants to wait (maxWaitMs > 0), (2) no data available, (3) topics exist + // NOTE: We long-poll even if MinBytes=0, since the client specified a wait time + hasData := hasDataAvailable() + topicsExist := allTopicsExist() + shouldLongPoll := maxWaitMs > 0 && !hasData && topicsExist + // Debug Schema Registry polling (disabled for production) + // Uncomment for debugging long-poll behavior + /* + if len(fetchRequest.Topics) > 0 && strings.HasPrefix(fetchRequest.Topics[0].Name, "_schemas") { + glog.V(4).Infof("SR Fetch: maxWaitMs=%d minBytes=%d hasData=%v topicsExist=%v shouldLongPoll=%v", + maxWaitMs, fetchRequest.MinBytes, hasData, topicsExist, shouldLongPoll) + } + */ if shouldLongPoll { start := time.Now() - // Limit polling time to maximum 2 seconds to prevent hanging in CI + // Use the client's requested wait time (already capped at 30s) maxPollTime := time.Duration(maxWaitMs) * time.Millisecond - if maxPollTime > 2*time.Second { - maxPollTime = 2 * time.Second - Debug("Limiting fetch polling to 2 seconds to prevent hanging") - } deadline := start.Add(maxPollTime) + glog.V(4).Infof("Fetch long-polling: maxWaitMs=%d, deadline=%v", maxWaitMs, deadline) for time.Now().Before(deadline) { // Use context-aware sleep instead of blocking time.Sleep select { @@ -121,22 +133,22 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers // Topics count - write the actual number of topics in the request // Kafka protocol: we MUST return all requested topics in the response (even with empty data) topicsCount := len(fetchRequest.Topics) - glog.Infof("šŸ” FETCH CORR=%d: Writing topics count=%d at offset=%d, isFlexible=%v", correlationID, topicsCount, len(response), isFlexible) + glog.V(4).Infof("FETCH CORR=%d: Writing topics count=%d at offset=%d, isFlexible=%v", correlationID, topicsCount, len(response), isFlexible) if isFlexible { // Flexible versions use compact array format (count + 1) response = append(response, EncodeUvarint(uint32(topicsCount+1))...) } else { topicsCountBytes := make([]byte, 4) binary.BigEndian.PutUint32(topicsCountBytes, uint32(topicsCount)) - glog.Infof("šŸ” FETCH CORR=%d: topicsCountBytes = %02x %02x %02x %02x", correlationID, topicsCountBytes[0], topicsCountBytes[1], topicsCountBytes[2], topicsCountBytes[3]) + glog.V(4).Infof("FETCH CORR=%d: topicsCountBytes = %02x %02x %02x %02x", correlationID, topicsCountBytes[0], topicsCountBytes[1], topicsCountBytes[2], topicsCountBytes[3]) response = append(response, topicsCountBytes...) - glog.Infof("šŸ” FETCH CORR=%d: After appending topics count, response length=%d, response[10-13]=%02x %02x %02x %02x", + glog.V(4).Infof("FETCH CORR=%d: After appending topics count, response length=%d, response[10-13]=%02x %02x %02x %02x", correlationID, len(response), response[10], response[11], response[12], response[13]) } // Process each requested topic for topicIdx, topic := range fetchRequest.Topics { - glog.Infof("FETCH: Processing topic %d/%d: %s (partitions: %d)", topicIdx+1, len(fetchRequest.Topics), topic.Name, len(topic.Partitions)) + glog.V(4).Infof("FETCH: Processing topic %d/%d: %s (partitions: %d)", topicIdx+1, len(fetchRequest.Topics), topic.Name, len(topic.Partitions)) topicNameBytes := []byte(topic.Name) // Topic name length and name @@ -211,7 +223,7 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers } if strings.HasPrefix(topic.Name, "_schemas") { - glog.Infof("šŸ“ SR FETCH REQUEST: topic=%s partition=%d requestOffset=%d effectiveOffset=%d highWaterMark=%d", + glog.V(4).Infof("SR FETCH REQUEST: topic=%s partition=%d requestOffset=%d effectiveOffset=%d highWaterMark=%d", topic.Name, partition.PartitionID, partition.FetchOffset, effectiveFetchOffset, highWaterMark) } @@ -374,7 +386,7 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers response = append(response, recordBatch...) totalAppendedRecordBytes += len(recordBatch) // Always log per-topic appended bytes for visibility - glog.Infof("FETCH TOPIC APPENDED: corr=%d topic=%s partition=%d bytes=%d flexible=%v", + glog.V(4).Infof("FETCH TOPIC APPENDED: corr=%d topic=%s partition=%d bytes=%d flexible=%v", correlationID, topic.Name, partition.PartitionID, len(recordBatch), isFlexible) // Tagged fields for flexible versions (v12+) after each partition @@ -401,106 +413,32 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers glog.Errorf("🚨 FETCH CORR=%d: Topics count CORRUPTED! Expected %d, found %d at response[10:14]=%02x %02x %02x %02x", correlationID, topicsCount, actualTopicsCount, response[10], response[11], response[12], response[13]) } else { - glog.Infof("āœ… FETCH CORR=%d: Topics count verified OK: %d at response[10:14]=%02x %02x %02x %02x", + glog.V(4).Infof("FETCH CORR=%d: Topics count verified OK: %d at response[10:14]=%02x %02x %02x %02x", correlationID, topicsCount, response[10], response[11], response[12], response[13]) } } Debug("Fetch v%d response constructed, size: %d bytes (flexible: %v)", apiVersion, len(response), isFlexible) - glog.Infof("FETCH RESPONSE SUMMARY: correlationID=%d topics=%d totalRecordBytes=%d totalResponseBytes=%d", correlationID, topicsCount, totalAppendedRecordBytes, len(response)) + glog.V(4).Infof("FETCH RESPONSE SUMMARY: correlationID=%d topics=%d totalRecordBytes=%d totalResponseBytes=%d", correlationID, topicsCount, totalAppendedRecordBytes, len(response)) - // CRITICAL BYTE ANALYSIS for offset 32 error - if len(response) > 36 { - fmt.Printf("\nšŸ”šŸ”šŸ” CRITICAL: Analyzing bytes around offset 32:\n") - fmt.Printf(" Offset 28-31 (should be after first topic name): %02x %02x %02x %02x\n", - response[28], response[29], response[30], response[31]) - fmt.Printf(" Offset 32-35 (ERROR LOCATION): %02x %02x %02x %02x\n", - response[32], response[33], response[34], response[35]) - fmt.Printf(" Offset 36-39 (should be partition ID or next field): %02x %02x %02x %02x\n", - response[36], response[37], response[38], response[39]) - - // Decode as int32 (what partition count/ID would be) - val32at32 := binary.BigEndian.Uint32(response[32:36]) - fmt.Printf(" Value at offset 32 as int32: %d (0x%08x)\n", val32at32, val32at32) - - // Decode as int16 (what a string length would be) - val16at32 := binary.BigEndian.Uint16(response[32:34]) - fmt.Printf(" Value at offset 32 as int16: %d (0x%04x)\n", val16at32, val16at32) - } - - // HEX DUMP for debugging decode errors: dump first 100 bytes of response to diagnose "invalid length (off=32, len=36)" error - if len(response) > 0 && len(response) <= 200 { - // Full dump for small responses (likely empty/error responses) - fmt.Printf("\nšŸ” FETCH RESPONSE HEX DUMP (FULL - %d bytes):\n", len(response)) - for i := 0; i < len(response); i += 16 { - end := i + 16 - if end > len(response) { - end = len(response) - } - fmt.Printf(" %04d: %02x\n", i, response[i:end]) - } - - // Decode structure for Fetch v7 (non-flexible) - if !isFlexible && len(response) >= 40 { - fmt.Printf(" Decoded structure:\n") - fmt.Printf(" [0-3] Throttle time ms: %d\n", int32(binary.BigEndian.Uint32(response[0:4]))) - fmt.Printf(" [4-7] Num topics: %d\n", int32(binary.BigEndian.Uint32(response[4:8]))) - - if len(response) >= 12 { - topicNameLen := int32(binary.BigEndian.Uint16(response[8:10])) - fmt.Printf(" [8-9] Topic name len: %d\n", topicNameLen) - if topicNameLen > 0 && 10+topicNameLen <= int32(len(response)) { - fmt.Printf(" [10-%d] Topic name: %s\n", 10+topicNameLen-1, string(response[10:10+topicNameLen])) - - partitionsOffset := 10 + topicNameLen - if int(partitionsOffset)+4 <= len(response) { - numPartitions := int32(binary.BigEndian.Uint32(response[partitionsOffset : partitionsOffset+4])) - fmt.Printf(" [%d-%d] Num partitions: %d\n", partitionsOffset, partitionsOffset+3, numPartitions) - - // First partition data starts here - partDataOffset := partitionsOffset + 4 - if int(partDataOffset)+32 <= len(response) { - fmt.Printf(" [%d] *** BYTE 32 area (ERROR LOCATION) ***\n", 32) - if 32 < len(response) { - fmt.Printf(" Byte[32] = 0x%02x (%d)\n", response[32], response[32]) - } - if 33 < len(response) { - fmt.Printf(" Byte[33] = 0x%02x (%d)\n", response[33], response[33]) - } - if 34 < len(response) { - fmt.Printf(" Byte[34] = 0x%02x (%d)\n", response[34], response[34]) - } - if 35 < len(response) { - fmt.Printf(" Byte[35] = 0x%02x (%d)\n", response[35], response[35]) - } - } - } - } - } - } - } else if len(response) > 200 { - // Partial dump for large responses - fmt.Printf("\nšŸ” FETCH RESPONSE HEX DUMP (FIRST 64 bytes of %d total):\n", len(response)) - dumpSize := 64 - if len(response) < dumpSize { - dumpSize = len(response) - } - for i := 0; i < dumpSize; i += 16 { - end := i + 16 - if end > dumpSize { - end = dumpSize - } - fmt.Printf(" %04d: %02x\n", i, response[i:end]) - } - - // Always show byte 32 area since that's where the error occurs + // Debug byte analysis (disabled for production to reduce CPU usage) + // Uncomment for debugging protocol issues + /* if len(response) > 36 { - fmt.Printf(" *** BYTE 32 area (ERROR LOCATION) ***\n") - fmt.Printf(" 0032: %02x\n", response[32:36]) + glog.V(4).Infof("FETCH CORR=%d: Offset 32-35: %02x %02x %02x %02x", + correlationID, response[32], response[33], response[34], response[35]) } - } + */ - glog.Infof("āœ… FETCH RESPONSE COMPLETE: correlationID=%d version=%d size=%d bytes", correlationID, apiVersion, len(response)) + // HEX DUMP disabled for production (causes high CPU) + // Uncomment for debugging protocol issues + /* + if len(response) > 0 { + // Debug hex dump code here... + } + */ + + glog.V(4).Infof("FETCH RESPONSE COMPLETE: correlationID=%d version=%d size=%d bytes", correlationID, apiVersion, len(response)) return response, nil } diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 40a8cb015..ca80e73df 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -394,7 +394,6 @@ func (h *Handler) GetConnectionContext() *integration.ConnectionContext { // HandleConn processes a single client connection func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { connectionID := fmt.Sprintf("%s->%s", conn.RemoteAddr(), conn.LocalAddr()) - fmt.Printf("šŸ”„šŸ”„šŸ”„ HandleConn START: %s\n", connectionID) Debug("KAFKA 8.0.0 DEBUG: NEW HANDLER CODE ACTIVE - %s", time.Now().Format("15:04:05")) // Record connection metrics @@ -409,35 +408,28 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // CRITICAL: Create per-connection BrokerClient for isolated gRPC streams // This prevents different connections from interfering with each other's Fetch requests - fmt.Printf("šŸ”„šŸ”„šŸ”„ [%s] [BROKER_CLIENT] Creating per-connection BrokerClient\n", connectionID) - glog.Infof("[%s] [BROKER_CLIENT] Creating per-connection BrokerClient", connectionID) + glog.V(4).Infof("[%s] [BROKER_CLIENT] Creating per-connection BrokerClient", connectionID) connBrokerClient, err := h.seaweedMQHandler.CreatePerConnectionBrokerClient() if err != nil { - fmt.Printf("šŸ”„šŸ”„šŸ”„ [%s] [BROKER_CLIENT] Failed: %v\n", connectionID, err) glog.Errorf("[%s] [BROKER_CLIENT] Failed to create per-connection BrokerClient: %v", connectionID, err) return fmt.Errorf("failed to create broker client: %w", err) } h.connContext.BrokerClient = connBrokerClient - fmt.Printf("šŸ”„šŸ”„šŸ”„ [%s] [BROKER_CLIENT] Per-connection BrokerClient created successfully\n", connectionID) - glog.Infof("[%s] [BROKER_CLIENT] Per-connection BrokerClient created successfully", connectionID) + glog.V(4).Infof("[%s] [BROKER_CLIENT] Per-connection BrokerClient created successfully", connectionID) Debug("[%s] NEW CONNECTION ESTABLISHED", connectionID) defer func() { - glog.V(0).Infof("šŸ” [%s] Connection closing, cleaning up BrokerClient", connectionID) Debug("[%s] Connection closing, cleaning up BrokerClient", connectionID) // Close the per-connection broker client if connBrokerClient != nil { - glog.V(0).Infof("šŸ” [%s] Calling BrokerClient.Close()", connectionID) if closeErr := connBrokerClient.Close(); closeErr != nil { - glog.Errorf("šŸ” [%s] Error closing BrokerClient: %v", connectionID, closeErr) Error("[%s] Error closing BrokerClient: %v", connectionID, closeErr) } } RecordDisconnectionMetrics() h.connContext = nil // Clear connection context conn.Close() - glog.V(0).Infof("šŸ” [%s] Connection cleanup complete", connectionID) }() r := bufio.NewReader(conn) @@ -549,8 +541,8 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { defer func() { close(controlChan) close(dataChan) + close(responseChan) // Close BEFORE wg.Wait() so response writer can exit wg.Wait() - close(responseChan) }() for { @@ -622,7 +614,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Successfully read the message size size := binary.BigEndian.Uint32(sizeBytes[:]) Debug("[%s] Read message size header: %d bytes", connectionID, size) - fmt.Printf("šŸ”„ PROTOCOL DEBUG: Read message size: %d bytes\n", size) + // Debug("Read message size: %d bytes", size) if size == 0 || size > 1024*1024 { // 1MB limit // Use standardized error for message size limit Debug("[%s] Invalid message size: %d (limit: 1MB)", connectionID, size) @@ -657,8 +649,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) - apiName := getAPIName(apiKey) - fmt.Printf("šŸ”„ PROTOCOL DEBUG: Parsed header - API Key: %d (%s), Version: %d, Correlation: %d\n", apiKey, apiName, apiVersion, correlationID) + // Debug("Parsed header - API Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(apiKey), apiVersion, correlationID) // Validate API version against what we support Debug("VALIDATING API VERSION: Key=%d, Version=%d", apiKey, apiVersion) @@ -3057,14 +3048,17 @@ func (h *Handler) writeResponseWithHeader(w *bufio.Writer, correlationID uint32, // Write response body fullResponse = append(fullResponse, responseBody...) - // Hex dump for debugging (first 64 bytes) - dumpLen := len(fullResponse) - if dumpLen > 64 { - dumpLen = 64 - } - if apiKey == 1 || apiKey == 19 { // Fetch or CreateTopics - glog.Infof("šŸ” API %d v%d response wire format (first %d bytes):\n%s", apiKey, apiVersion, dumpLen, hexDump(fullResponse[:dumpLen])) - } + // Hex dump for debugging (disabled for production to reduce CPU) + // Uncomment for debugging protocol issues + /* + dumpLen := len(fullResponse) + if dumpLen > 64 { + dumpLen = 64 + } + if apiKey == 1 || apiKey == 19 { // Fetch or CreateTopics + glog.V(4).Infof("API %d v%d response wire format (first %d bytes):\n%s", apiKey, apiVersion, dumpLen, hexDump(fullResponse[:dumpLen])) + } + */ Debug("Wrote API %d response v%d: size=%d, flexible=%t, correlationID=%d, totalBytes=%d", apiKey, apiVersion, totalSize, isFlexible, correlationID, len(fullResponse)) // Write to connection diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go index af4544d58..5a5086322 100644 --- a/weed/mq/topic/local_topic.go +++ b/weed/mq/topic/local_topic.go @@ -2,6 +2,7 @@ package topic import ( "sync" + "github.com/seaweedfs/seaweedfs/weed/glog" ) @@ -22,15 +23,15 @@ func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition localTopic.partitionLock.RLock() defer localTopic.partitionLock.RUnlock() - glog.V(0).Infof("šŸ” DEBUG: findPartition searching for %s in %d partitions", partition.String(), len(localTopic.Partitions)) + glog.V(4).Infof("findPartition searching for %s in %d partitions", partition.String(), len(localTopic.Partitions)) for i, localPartition := range localTopic.Partitions { - glog.V(0).Infof("šŸ” DEBUG: Comparing partition[%d]: %s with target %s", i, localPartition.Partition.String(), partition.String()) + glog.V(4).Infof("Comparing partition[%d]: %s with target %s", i, localPartition.Partition.String(), partition.String()) if localPartition.Partition.LogicalEquals(partition) { - glog.V(0).Infof("šŸ” DEBUG: Found matching partition at index %d", i) + glog.V(4).Infof("Found matching partition at index %d", i) return localPartition } } - glog.V(0).Infof("šŸ” DEBUG: No matching partition found for %s", partition.String()) + glog.V(4).Infof("No matching partition found for %s", partition.String()) return nil } func (localTopic *LocalTopic) removePartition(partition Partition) bool { diff --git a/weed/server/filer_grpc_server_dlm.go b/weed/server/filer_grpc_server_dlm.go index 40444e061..f18c824e4 100644 --- a/weed/server/filer_grpc_server_dlm.go +++ b/weed/server/filer_grpc_server_dlm.go @@ -16,7 +16,7 @@ import ( // DistributedLock is a grpc handler to handle FilerServer's LockRequest func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRequest) (resp *filer_pb.LockResponse, err error) { - glog.V(0).Infof("šŸ”„ FILER LOCK: Received DistributedLock request - name=%s owner=%s renewToken=%s secondsToLock=%d isMoved=%v", + glog.V(4).Infof("FILER LOCK: Received DistributedLock request - name=%s owner=%s renewToken=%s secondsToLock=%d isMoved=%v", req.Name, req.Owner, req.RenewToken, req.SecondsToLock, req.IsMoved) resp = &filer_pb.LockResponse{} @@ -24,11 +24,11 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe var movedTo pb.ServerAddress expiredAtNs := time.Now().Add(time.Duration(req.SecondsToLock) * time.Second).UnixNano() resp.LockOwner, resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner) - glog.V(0).Infof("šŸ” FILER LOCK: LockWithTimeout result - name=%s lockOwner=%s renewToken=%s movedTo=%s err=%v", + glog.V(4).Infof("FILER LOCK: LockWithTimeout result - name=%s lockOwner=%s renewToken=%s movedTo=%s err=%v", req.Name, resp.LockOwner, resp.RenewToken, movedTo, err) glog.V(4).Infof("lock %s %v %v %v, isMoved=%v %v", req.Name, req.SecondsToLock, req.RenewToken, req.Owner, req.IsMoved, movedTo) if movedTo != "" && movedTo != fs.option.Host && !req.IsMoved { - glog.V(0).Infof("šŸ”„ FILER LOCK: Forwarding to correct filer - from=%s to=%s", fs.option.Host, movedTo) + glog.V(0).Infof("FILER LOCK: Forwarding to correct filer - from=%s to=%s", fs.option.Host, movedTo) err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { secondResp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{ Name: req.Name, @@ -41,9 +41,9 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe resp.RenewToken = secondResp.RenewToken resp.LockOwner = secondResp.LockOwner resp.Error = secondResp.Error - glog.V(0).Infof("āœ… FILER LOCK: Forwarded lock acquired - name=%s renewToken=%s", req.Name, resp.RenewToken) + glog.V(0).Infof("FILER LOCK: Forwarded lock acquired - name=%s renewToken=%s", req.Name, resp.RenewToken) } else { - glog.V(0).Infof("āŒ FILER LOCK: Forward failed - name=%s err=%v", req.Name, err) + glog.V(0).Infof("FILER LOCK: Forward failed - name=%s err=%v", req.Name, err) } return err }) @@ -51,13 +51,13 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe if err != nil { resp.Error = fmt.Sprintf("%v", err) - glog.V(0).Infof("āŒ FILER LOCK: Error - name=%s error=%s", req.Name, resp.Error) + glog.V(0).Infof("FILER LOCK: Error - name=%s error=%s", req.Name, resp.Error) } if movedTo != "" { resp.LockHostMovedTo = string(movedTo) } - glog.V(0).Infof("šŸ“¤ FILER LOCK: Returning response - name=%s renewToken=%s lockOwner=%s error=%s movedTo=%s", + glog.V(4).Infof("FILER LOCK: Returning response - name=%s renewToken=%s lockOwner=%s error=%s movedTo=%s", req.Name, resp.RenewToken, resp.LockOwner, resp.Error, resp.LockHostMovedTo) return resp, nil