mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-11-24 08:46:54 +08:00
"golang.org/x/exp/slices" => "slices" and go fmt
This commit is contained in:
@@ -5,11 +5,11 @@ import (
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@@ -7,11 +7,11 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"strings"
|
||||
"time"
|
||||
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@@ -50,7 +50,7 @@ func (p *TopicPublisher) FinishPublish() error {
|
||||
inputBuffer.Enqueue(&mq_pb.DataMessage{
|
||||
TsNs: time.Now().UnixNano(),
|
||||
Ctrl: &mq_pb.ControlMessage{
|
||||
IsClose: true,
|
||||
IsClose: true,
|
||||
PublisherName: p.config.PublisherName,
|
||||
},
|
||||
})
|
||||
|
||||
@@ -95,8 +95,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
|
||||
followerCount: 1,
|
||||
assignments: []*mq_pb.BrokerPartitionAssignment{
|
||||
{
|
||||
LeaderBroker: "",
|
||||
Partition: &mq_pb.Partition{},
|
||||
LeaderBroker: "",
|
||||
Partition: &mq_pb.Partition{},
|
||||
FollowerBroker: "localhost:2",
|
||||
},
|
||||
},
|
||||
@@ -110,8 +110,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
|
||||
followerCount: 1,
|
||||
assignments: []*mq_pb.BrokerPartitionAssignment{
|
||||
{
|
||||
LeaderBroker: "localhost:1",
|
||||
Partition: &mq_pb.Partition{},
|
||||
LeaderBroker: "localhost:1",
|
||||
Partition: &mq_pb.Partition{},
|
||||
FollowerBroker: "",
|
||||
},
|
||||
},
|
||||
@@ -125,8 +125,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
|
||||
followerCount: 1,
|
||||
assignments: []*mq_pb.BrokerPartitionAssignment{
|
||||
{
|
||||
LeaderBroker: "localhost:1",
|
||||
Partition: &mq_pb.Partition{},
|
||||
LeaderBroker: "localhost:1",
|
||||
Partition: &mq_pb.Partition{},
|
||||
FollowerBroker: "localhost:200",
|
||||
},
|
||||
},
|
||||
@@ -140,8 +140,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
|
||||
followerCount: 1,
|
||||
assignments: []*mq_pb.BrokerPartitionAssignment{
|
||||
{
|
||||
LeaderBroker: "localhost:100",
|
||||
Partition: &mq_pb.Partition{},
|
||||
LeaderBroker: "localhost:100",
|
||||
Partition: &mq_pb.Partition{},
|
||||
FollowerBroker: "localhost:200",
|
||||
},
|
||||
},
|
||||
@@ -155,8 +155,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
|
||||
followerCount: 3,
|
||||
assignments: []*mq_pb.BrokerPartitionAssignment{
|
||||
{
|
||||
LeaderBroker: "localhost:1",
|
||||
Partition: &mq_pb.Partition{},
|
||||
LeaderBroker: "localhost:1",
|
||||
Partition: &mq_pb.Partition{},
|
||||
FollowerBroker: "localhost:2",
|
||||
},
|
||||
},
|
||||
@@ -184,8 +184,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
|
||||
followerCount: 3,
|
||||
assignments: []*mq_pb.BrokerPartitionAssignment{
|
||||
{
|
||||
LeaderBroker: "localhost:1",
|
||||
Partition: &mq_pb.Partition{},
|
||||
LeaderBroker: "localhost:1",
|
||||
Partition: &mq_pb.Partition{},
|
||||
FollowerBroker: "localhost:2",
|
||||
},
|
||||
},
|
||||
|
||||
@@ -16,8 +16,8 @@ type ConsumerGroupInstance struct {
|
||||
|
||||
func NewConsumerGroupInstance(instanceId string, maxPartitionCount int32) *ConsumerGroupInstance {
|
||||
return &ConsumerGroupInstance{
|
||||
InstanceId: ConsumerGroupInstanceId(instanceId),
|
||||
ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1),
|
||||
InstanceId: ConsumerGroupInstanceId(instanceId),
|
||||
ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1),
|
||||
MaxPartitionCount: maxPartitionCount,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,9 +84,9 @@ type TimestampStatus struct {
|
||||
|
||||
// RingBuffer represents a circular buffer to hold timestamps.
|
||||
type RingBuffer struct {
|
||||
buffer []*TimestampStatus
|
||||
head int
|
||||
size int
|
||||
buffer []*TimestampStatus
|
||||
head int
|
||||
size int
|
||||
maxTimestamp int64
|
||||
maxAllAckedTs int64
|
||||
}
|
||||
@@ -111,7 +111,7 @@ func (rb *RingBuffer) EnflightTimestamp(timestamp int64) {
|
||||
if rb.size < len(rb.buffer) {
|
||||
rb.size++
|
||||
} else {
|
||||
newBuf := newBuffer(2*len(rb.buffer))
|
||||
newBuf := newBuffer(2 * len(rb.buffer))
|
||||
for i := 0; i < rb.size; i++ {
|
||||
newBuf[i] = rb.buffer[(rb.head+len(rb.buffer)-rb.size+i)%len(rb.buffer)]
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ import "sync"
|
||||
|
||||
type LocalTopic struct {
|
||||
Topic
|
||||
Partitions []*LocalPartition
|
||||
Partitions []*LocalPartition
|
||||
partitionLock sync.RWMutex
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user