Compare commits

...

3 Commits

Author SHA1 Message Date
Chris Lu
1477eead01 final attempt
on par with 1K sized object, but no so good with large ones

the default http flow control is better than current implementation.
2020-02-15 14:01:37 -08:00
Chris Lu
c7ac94ea9a skip EOF message
now tcp

Concurrency Level:      16
Time taken for tests:   22.963 seconds
Complete requests:      1048576
Failed requests:        0
Total transferred:      1106755384 bytes
Requests per second:    45663.49 [#/sec]
Transfer rate:          47067.47 [Kbytes/sec]

vs normal

Concurrency Level:      16
Time taken for tests:   22.286 seconds
Complete requests:      1048576
Failed requests:        0
Total transferred:      1106754345 bytes
Requests per second:    47050.30 [#/sec]
Transfer rate:          48496.88 [Kbytes/sec]
2020-02-14 22:24:38 -08:00
Chris Lu
be415f4e3c add tcp read
Performance not so good. Could need some optimization.

Concurrency Level:      16
Time taken for tests:   33.575 seconds
Complete requests:      1048576
Failed requests:        0
Total transferred:      1106753375 bytes
Requests per second:    31230.86 [#/sec]
Transfer rate:          32191.03 [Kbytes/sec]

vs  normal http

Concurrency Level:      16
Time taken for tests:   24.829 seconds
Complete requests:      1048576
Failed requests:        0
Total transferred:      1106761259 bytes
Requests per second:    42231.10 [#/sec]
Transfer rate:          43529.78 [Kbytes/sec]
2020-02-14 17:49:07 -08:00
9 changed files with 781 additions and 280 deletions

View File

@@ -7,6 +7,7 @@ import (
"io"
"math"
"math/rand"
"net"
"os"
"runtime"
"runtime/pprof"
@@ -41,7 +42,8 @@ type BenchmarkOptions struct {
maxCpu *int
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
grpcRead *bool
readByGrpc *bool
readByTcp *bool
}
var (
@@ -66,7 +68,8 @@ func init() {
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.grpcRead = cmdBenchmark.Flag.Bool("grpcRead", false, "use grpc API to read")
b.readByGrpc = cmdBenchmark.Flag.Bool("read.grpc", false, "use grpc API to read")
b.readByTcp = cmdBenchmark.Flag.Bool("read.tcp", false, "use tcp API to read")
sharedBytes = make([]byte, 1024)
}
@@ -283,7 +286,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
start := time.Now()
var bytesRead int
var err error
if *b.grpcRead {
if *b.readByGrpc {
volumeServer, err := b.masterClient.LookupVolumeServer(fid)
if err != nil {
s.failed++
@@ -291,6 +294,15 @@ func readFiles(fileIdLineChan chan string, s *stat) {
continue
}
bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption)
} else if *b.readByTcp {
volumeServer, err := b.masterClient.LookupVolumeServer(fid)
if err != nil {
s.failed++
println("!!!! ", fid, " location not found!!!!!")
continue
}
bytesRead, err = tcpFileGet(volumeServer, fid)
} else {
url, err := b.masterClient.LookupFileId(fid)
if err != nil {
@@ -336,6 +348,38 @@ func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (byte
return
}
func tcpFileGet(volumeServer, fid string) (bytesRead int, err error) {
err = operation.WithVolumeServerTcpConnection(volumeServer, func(conn net.Conn) error {
// println("requesting", fid, "...")
if err := util.WriteMessage(conn, &volume_server_pb.TcpRequestHeader{
Get: &volume_server_pb.FileGetRequest{FileId: fid},
}); err != nil {
return err
}
for {
resp := &volume_server_pb.FileGetResponse{}
// println("reading...")
respErr := util.ReadMessage(conn, resp)
if respErr != nil {
if respErr == io.EOF {
return nil
}
// println("err:", respErr.Error())
return respErr
}
// println("resp size", len(resp.Data))
bytesRead += len(resp.Data)
if resp.IsLast {
return nil
}
}
})
return
}
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {

View File

@@ -91,6 +91,7 @@ func init() {
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.enableTcp", false, "[experimental] toggle tcp port, running on 20000 + port")
s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")

View File

@@ -50,6 +50,7 @@ type VolumeServerOptions struct {
memProfile *string
compactionMBPerSecond *int
fileSizeLimitMB *int
enableTcp *bool // temporary toggle
}
func init() {
@@ -71,6 +72,7 @@ func init() {
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
v.enableTcp = cmdVolume.Flag.Bool("enableTcp", false, "[experimental] toggle tcp port, running on 20000 + port")
}
var cmdVolume = &Command{
@@ -168,6 +170,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
// starting grpc server
grpcS := v.startGrpcService(volumeServer)
if v.enableTcp != nil && *v.enableTcp {
go v.startTcpServer(volumeServer)
}
// starting public http server
var publicHttpDown httpdown.Server
if v.isSeparatedPublicPort() {
@@ -245,6 +251,29 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe
return grpcS
}
func (v VolumeServerOptions) startTcpServer(vs *weed_server.VolumeServer) {
tcpPort := *v.port + 20000
tcpL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(tcpPort), 0)
if err != nil {
glog.Fatalf("failed to listen on tcp port %d: %v", tcpPort, err)
}
defer tcpL.Close()
for {
c, err := tcpL.Accept()
if err != nil {
glog.V(0).Infof("accept tcp connection: %v", err)
continue
}
go func() {
if err := vs.HandleTcpConnection(c); err != nil {
glog.V(0).Infof("handle tcp remote %s: %v", c.RemoteAddr(), err)
return
}
}()
}
}
func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server {
publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)

View File

@@ -3,13 +3,23 @@ package operation
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"strconv"
"strings"
)
var (
connectionPool = make(map[string]*util.ResourcePool)
connectionPoolLock sync.Mutex
)
func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(context.Context, volume_server_pb.VolumeServerClient) error) error {
@@ -38,6 +48,80 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
}
func WithVolumeServerTcpConnection(volumeServer string, fn func(conn net.Conn) error) error {
tcpAddress, err := toVolumeServerTcpAddress(volumeServer)
if err != nil {
return err
}
conn, err := getConnection(tcpAddress)
if err != nil {
return err
}
defer releaseConnection(conn, tcpAddress)
err = fn(conn)
return err
}
func getConnection(tcpAddress string) (net.Conn, error) {
connectionPoolLock.Lock()
defer connectionPoolLock.Unlock()
pool, found := connectionPool[tcpAddress]
if !found {
println("creating pool for", tcpAddress)
raddr, err := net.ResolveTCPAddr("tcp", tcpAddress)
if err != nil {
glog.Fatal(err)
}
pool = util.NewResourcePool(16, func() (interface{}, error) {
conn, err := net.DialTCP("tcp", nil, raddr)
if err != nil {
return conn, err
}
conn.SetKeepAlive(true)
conn.SetNoDelay(true)
println("connected", tcpAddress, "=>", conn.LocalAddr().String())
return conn, nil
})
connectionPool[tcpAddress] = pool
}
connObj, err := pool.Get(time.Minute)
if err != nil {
return nil, err
}
// println("get connection", tcpAddress, "=>", conn.LocalAddr().String())
return connObj.(net.Conn), nil
}
func releaseConnection(conn net.Conn, tcpAddress string) {
connectionPoolLock.Lock()
defer connectionPoolLock.Unlock()
pool, found := connectionPool[tcpAddress]
if !found {
println("can not return connection", tcpAddress, "=>", conn.LocalAddr().String())
return
}
pool.Release(conn)
// println("returned connection", tcpAddress, "=>", conn.LocalAddr().String())
}
func toVolumeServerTcpAddress(volumeServer string) (grpcAddress string, err error) {
sepIndex := strings.LastIndex(volumeServer, ":")
port, err := strconv.Atoi(volumeServer[sepIndex+1:])
if err != nil {
glog.Errorf("failed to parse volume server address: %v", volumeServer)
return "", err
}
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+20000), nil
}
func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(ctx2 context.Context, masterClient master_pb.SeaweedClient) error) error {
ctx := context.Background()

View File

@@ -104,6 +104,10 @@ message DeleteResult {
uint32 version = 5;
}
message TcpRequestHeader {
FileGetRequest get = 1;
}
message FileGetRequest {
string file_id = 1;
bool accept_gzip = 2;
@@ -118,6 +122,7 @@ message FileGetResponse {
bool is_gzipped = 7;
map<string, string> headers = 8;
int32 errorCode = 9;
bool is_last = 10;
}
message Empty {

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,156 @@
package weed_server
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (vs *VolumeServer) HandleTcpConnection(conn net.Conn) error {
defer conn.Close()
for {
// println("handle tcp conn", conn.RemoteAddr())
tcpMessage := &volume_server_pb.TcpRequestHeader{}
if err := util.ReadMessage(conn, tcpMessage); err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("read message: %v", err)
}
if tcpMessage.Get != nil {
vs.handleFileGet(conn, tcpMessage.Get)
}
// println("processed", tcpMessage.Get.FileId)
}
}
func (vs *VolumeServer) handleFileGet(conn net.Conn, req *volume_server_pb.FileGetRequest) error {
headResponse := &volume_server_pb.FileGetResponse{}
n := new(needle.Needle)
commaIndex := strings.LastIndex(req.FileId, ",")
vid := req.FileId[:commaIndex]
fid := req.FileId[commaIndex+1:]
volumeId, err := needle.NewVolumeId(vid)
if err != nil {
headResponse.ErrorCode = http.StatusBadRequest
return util.WriteMessage(conn, headResponse)
}
err = n.ParsePath(fid)
if err != nil {
headResponse.ErrorCode = http.StatusBadRequest
return util.WriteMessage(conn, headResponse)
}
hasVolume := vs.store.HasVolume(volumeId)
_, hasEcVolume := vs.store.FindEcVolume(volumeId)
if !hasVolume && !hasEcVolume {
headResponse.ErrorCode = http.StatusMovedPermanently
return util.WriteMessage(conn, headResponse)
}
cookie := n.Cookie
var count int
if hasVolume {
count, err = vs.store.ReadVolumeNeedle(volumeId, n)
} else if hasEcVolume {
count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n)
}
if err != nil || count < 0 {
headResponse.ErrorCode = http.StatusNotFound
return util.WriteMessage(conn, headResponse)
}
if n.Cookie != cookie {
headResponse.ErrorCode = http.StatusNotFound
return util.WriteMessage(conn, headResponse)
}
if n.LastModified != 0 {
headResponse.LastModified = n.LastModified
}
headResponse.Etag = n.Etag()
if n.HasPairs() {
pairMap := make(map[string]string)
err = json.Unmarshal(n.Pairs, &pairMap)
if err != nil {
glog.V(0).Infoln("Unmarshal pairs error:", err)
}
headResponse.Headers = pairMap
}
/*
// skip this, no redirection
if vs.tryHandleChunkedFile(n, filename, w, r) {
return
}
*/
if n.NameSize > 0 {
headResponse.Filename = string(n.Name)
}
mtype := ""
if n.MimeSize > 0 {
mt := string(n.Mime)
if !strings.HasPrefix(mt, "application/octet-stream") {
mtype = mt
}
}
headResponse.ContentType = mtype
headResponse.IsGzipped = n.IsGzipped()
if n.IsGzipped() && req.AcceptGzip {
if n.Data, err = util.UnGzipData(n.Data); err != nil {
glog.V(0).Infof("ungzip %s error: %v", req.FileId, err)
}
}
headResponse.ContentLength = uint32(len(n.Data))
bytesToRead := len(n.Data)
bytesRead := 0
t := headResponse
for bytesRead < bytesToRead {
stopIndex := bytesRead + BufferSizeLimit
if stopIndex > bytesToRead {
stopIndex = bytesToRead
}
if t == nil {
t = &volume_server_pb.FileGetResponse{}
}
t.Data = n.Data[bytesRead:stopIndex]
t.IsLast = stopIndex == bytesToRead
err = util.WriteMessage(conn, t)
t = nil
if err != nil {
return err
}
bytesRead = stopIndex
}
return nil
}

View File

@@ -8,10 +8,14 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
"strings"
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/glog"
)
@@ -312,3 +316,55 @@ func CloseResponse(resp *http.Response) {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}
func WriteMessage(conn net.Conn, message proto.Message) error {
data, err := proto.Marshal(message)
if err != nil {
glog.Fatalf("marshal: %v", err)
}
messageSizeBytes := make([]byte, 4)
Uint32toBytes(messageSizeBytes, uint32(len(data)))
_, err = conn.Write(messageSizeBytes)
if err != nil {
return err
}
_, err = conn.Write(data)
return err
}
func WriteMessageEOF(conn net.Conn) error {
messageSizeBytes := make([]byte, 4)
Uint32toBytes(messageSizeBytes, math.MaxUint32)
_, err := conn.Write(messageSizeBytes)
return err
}
func ReadMessage(conn net.Conn, message proto.Message) error {
messageSizeBuffer := make([]byte, 4)
n, err := conn.Read(messageSizeBuffer)
if err != nil {
if err == io.EOF {
// println("unexpected eof")
return err
}
return fmt.Errorf("read message size byte length: %d %v", n, err)
}
if n != 4 {
return fmt.Errorf("unexpected message size byte length: %d", n)
}
messageSize := BytesToUint32(messageSizeBuffer)
if messageSize == math.MaxUint32 {
// println("marked eof")
return io.EOF
}
messageBytes := make([]byte, messageSize)
readMessageLength, err := conn.Read(messageBytes)
if readMessageLength != int(messageSize) {
return fmt.Errorf("message size:%d, expected:%d", readMessageLength, messageSize)
}
if err := proto.Unmarshal(messageBytes, message); err != nil {
return err
}
return nil
}

98
weed/util/pool.go Normal file
View File

@@ -0,0 +1,98 @@
package util
import (
"errors"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
)
var (
TimeoutErr = errors.New("timeout")
)
// A bufferedChan implemented by a buffered channel
type ResourcePool struct {
sync.Mutex
bufferedChan chan interface{}
poolSizeLimit int
inuse int
newFn func() (interface{}, error)
}
func NewResourcePool(poolSizeLimit int, newFn func() (interface{}, error)) *ResourcePool {
p := &ResourcePool{
poolSizeLimit: poolSizeLimit,
newFn: newFn,
bufferedChan: make(chan interface{}, poolSizeLimit),
}
return p
}
func (p *ResourcePool) Size() int {
p.Lock()
defer p.Unlock()
return len(p.bufferedChan) + p.inuse
}
func (p *ResourcePool) Free() int {
p.Lock()
defer p.Unlock()
return p.poolSizeLimit - p.inuse
}
func (p *ResourcePool) Get(timeout time.Duration) (interface{}, error) {
d, err := p.get(timeout)
if err != nil {
return nil, err
}
if d == nil && p.newFn != nil {
var err error
d, err = p.newFn()
if err != nil {
return nil, err
}
}
p.Lock()
defer p.Unlock()
p.inuse++
return d, nil
}
func (p *ResourcePool) Release(v interface{}) {
p.Lock()
defer p.Unlock()
if p.inuse == 0 {
glog.V(0).Infof("released too many times?")
return
}
p.bufferedChan <- v
p.inuse--
}
func (p *ResourcePool) get(timeout time.Duration) (interface{}, error) {
select {
case v := <-p.bufferedChan:
return v, nil
default:
}
if p.Free() > 0 {
d, err := p.newFn()
if err != nil {
return nil, err
}
return d, nil
}
// wait for an freed item
select {
case v := <-p.bufferedChan:
return v, nil
case <-time.After(timeout):
}
return nil, TimeoutErr
}