mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-10-07 23:54:22 +08:00
For offset-based reads, ignore startFileName
This commit is contained in:
@@ -91,6 +91,7 @@ start: build-gateway ## Start the infrastructure services (without load test)
|
||||
seaweedfs-filer \
|
||||
seaweedfs-mq-broker \
|
||||
kafka-gateway \
|
||||
schema-registry-init \
|
||||
schema-registry
|
||||
@echo "$(GREEN)Infrastructure started$(NC)"
|
||||
@echo "Waiting for services to be ready..."
|
||||
|
@@ -12,10 +12,28 @@ x-seaweedfs-build: &seaweedfs-build
|
||||
services:
|
||||
# Schema Registry (for Avro/Protobuf support)
|
||||
# Using host networking to connect to localhost:9093 (where our gateway advertises)
|
||||
# WORKAROUND: Schema Registry hangs on empty _schemas topic during bootstrap
|
||||
# Pre-create the topic first to avoid "wait to catch up" hang
|
||||
schema-registry-init:
|
||||
image: confluentinc/cp-kafka:8.0.0
|
||||
container_name: loadtest-schema-registry-init
|
||||
networks:
|
||||
- kafka-loadtest-net
|
||||
depends_on:
|
||||
kafka-gateway:
|
||||
condition: service_healthy
|
||||
command: >
|
||||
bash -c "
|
||||
echo 'Creating _schemas topic...';
|
||||
kafka-topics --create --topic _schemas --partitions 1 --replication-factor 1 \
|
||||
--bootstrap-server kafka-gateway:9093 --if-not-exists || exit 0;
|
||||
echo '_schemas topic created successfully';
|
||||
"
|
||||
|
||||
schema-registry:
|
||||
image: confluentinc/cp-schema-registry:8.0.0
|
||||
container_name: loadtest-schema-registry
|
||||
restart: on-failure:5
|
||||
restart: on-failure:3
|
||||
ports:
|
||||
- "8081:8081"
|
||||
environment:
|
||||
@@ -50,6 +68,8 @@ services:
|
||||
retries: 10
|
||||
start_period: 30s
|
||||
depends_on:
|
||||
schema-registry-init:
|
||||
condition: service_completed_successfully
|
||||
kafka-gateway:
|
||||
condition: service_healthy
|
||||
networks:
|
||||
|
@@ -3920,57 +3920,12 @@ func (h *Handler) createTopicWithSchemaSupport(topicName string, partitions int3
|
||||
// createTopicWithDefaultFlexibleSchema creates a topic with a flexible default schema
|
||||
// that can handle both Avro and JSON messages when schema management is enabled
|
||||
func (h *Handler) createTopicWithDefaultFlexibleSchema(topicName string, partitions int32) error {
|
||||
// For system topics like _schemas, create both key and value fields
|
||||
// Schema Registry messages have structured keys and values
|
||||
var keySchema, valueSchema *schema_pb.RecordType
|
||||
// CRITICAL FIX: System topics like _schemas should be PLAIN Kafka topics without schema management
|
||||
// Schema Registry uses _schemas to STORE schemas, so it can't have schema management itself
|
||||
// This was causing issues with Schema Registry bootstrap
|
||||
|
||||
if topicName == "_schemas" {
|
||||
// _schemas topic needs both key and value fields
|
||||
// Key contains metadata (magicByte, keytype, subject, version)
|
||||
keySchema = &schema_pb.RecordType{
|
||||
Fields: []*schema_pb.Field{
|
||||
{
|
||||
Name: "key",
|
||||
Type: &schema_pb.Type{
|
||||
Kind: &schema_pb.Type_ScalarType{
|
||||
ScalarType: schema_pb.ScalarType_BYTES,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Value contains schema data
|
||||
valueSchema = &schema_pb.RecordType{
|
||||
Fields: []*schema_pb.Field{
|
||||
{
|
||||
Name: "value",
|
||||
Type: &schema_pb.Type{
|
||||
Kind: &schema_pb.Type_ScalarType{
|
||||
ScalarType: schema_pb.ScalarType_BYTES,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
} else {
|
||||
// For other system topics, use flexible schema with just value
|
||||
valueSchema = &schema_pb.RecordType{
|
||||
Fields: []*schema_pb.Field{
|
||||
{
|
||||
Name: "value",
|
||||
Type: &schema_pb.Type{
|
||||
Kind: &schema_pb.Type_ScalarType{
|
||||
ScalarType: schema_pb.ScalarType_BYTES,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Create topic with the schemas
|
||||
return h.seaweedMQHandler.CreateTopicWithSchemas(topicName, partitions, keySchema, valueSchema)
|
||||
glog.V(0).Infof("Creating system topic %s as PLAIN topic (no schema management)", topicName)
|
||||
return h.seaweedMQHandler.CreateTopic(topicName, partitions)
|
||||
}
|
||||
|
||||
// fetchSchemaForTopic attempts to fetch schema information for a topic from Schema Registry
|
||||
|
@@ -164,6 +164,11 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
|
||||
var startOffset int64
|
||||
if isOffsetBased {
|
||||
startOffset = startPosition.Offset
|
||||
// CRITICAL FIX: For offset-based reads, ignore startFileName (which is based on Time)
|
||||
// and list all files from the beginning to find the right offset
|
||||
startFileName = ""
|
||||
glog.V(0).Infof("📖 DISK READ START: topic=%s partition=%s startOffset=%d (using empty startFileName for offset-based read)",
|
||||
t.Name, p, startOffset)
|
||||
}
|
||||
|
||||
// OPTIMIZATION: For offset-based reads, collect all files with their offset ranges first
|
||||
@@ -173,6 +178,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
|
||||
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
// First pass: collect all relevant files with their metadata
|
||||
glog.V(0).Infof("📁 DISK READ: Listing directory %s for offset %d startFileName=%q", partitionDir, startOffset, startFileName)
|
||||
return filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
|
||||
|
||||
if entry.IsDirectory {
|
||||
@@ -190,6 +196,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
|
||||
|
||||
// OPTIMIZATION: For offset-based reads, check if this file contains the requested offset
|
||||
if isOffsetBased {
|
||||
glog.V(0).Infof("📄 DISK READ: Found file %s", entry.Name)
|
||||
// Check if file has offset range metadata
|
||||
if minOffsetBytes, hasMin := entry.Extended["offset_min"]; hasMin && len(minOffsetBytes) == 8 {
|
||||
if maxOffsetBytes, hasMax := entry.Extended["offset_max"]; hasMax && len(maxOffsetBytes) == 8 {
|
||||
@@ -222,15 +229,25 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
|
||||
|
||||
// Add file to candidates for processing
|
||||
candidateFiles = append(candidateFiles, entry)
|
||||
glog.V(0).Infof("✅ DISK READ: Added candidate file %s (total=%d)", entry.Name, len(candidateFiles))
|
||||
return nil
|
||||
|
||||
}, startFileName, true, math.MaxInt32)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("❌ DISK READ: Failed to list directory %s: %v", partitionDir, err)
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(0).Infof("📊 DISK READ: Found %d candidate files for topic=%s partition=%s offset=%d",
|
||||
len(candidateFiles), t.Name, p, startOffset)
|
||||
|
||||
if len(candidateFiles) == 0 {
|
||||
glog.V(0).Infof("⚠️ DISK READ: No files found in %s", partitionDir)
|
||||
return startPosition, isDone, nil
|
||||
}
|
||||
|
||||
// OPTIMIZATION: For offset-based reads with many files, use binary search to find start file
|
||||
if isOffsetBased && len(candidateFiles) > 10 {
|
||||
// Binary search to find the first file that might contain our offset
|
||||
|
Reference in New Issue
Block a user