verify adding columes should work well

This commit is contained in:
Chris Lu
2012-09-13 01:33:47 -07:00
parent 9e0f8d9f0d
commit 6892842021
4 changed files with 66 additions and 34 deletions

View File

@@ -11,7 +11,7 @@ func TestXYZ(t *testing.T) {
dn := topology.NewDataNode("server1") dn := topology.NewDataNode("server1")
dn.Ip = "localhost" dn.Ip = "localhost"
dn.Port = 8080 dn.Port = 8080
vid, _:= storage.NewVolumeId("5") vid, _:= storage.NewVolumeId("6")
out := AllocateVolume(dn,vid,storage.Copy00) out := AllocateVolume(dn,vid,storage.Copy00)
log.Println(out) log.Println(out)
} }

View File

@@ -3,6 +3,7 @@ package storage
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"io/ioutil"
"log" "log"
"net/url" "net/url"
"pkg/util" "pkg/util"
@@ -22,6 +23,7 @@ func NewStore(port int, publicUrl, dirname string, volumeListString string) (s *
s = &Store{Port: port, PublicUrl: publicUrl, dir: dirname} s = &Store{Port: port, PublicUrl: publicUrl, dir: dirname}
s.volumes = make(map[VolumeId]*Volume) s.volumes = make(map[VolumeId]*Volume)
s.loadExistingVolumes()
s.AddVolume(volumeListString, "00") s.AddVolume(volumeListString, "00")
log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes", volumeListString) log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes", volumeListString)
@@ -48,7 +50,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) (e er
} }
for id := start; id <= end; id++ { for id := start; id <= end; id++ {
if err := s.addVolume(VolumeId(id), NewReplicationType(replicationType)); err != nil { if err := s.addVolume(VolumeId(id), NewReplicationType(replicationType)); err != nil {
e = err e = err
} }
} }
} }
@@ -59,10 +61,27 @@ func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
if s.volumes[vid] != nil { if s.volumes[vid] != nil {
return errors.New("Volume Id " + vid.String() + " already exists!") return errors.New("Volume Id " + vid.String() + " already exists!")
} }
log.Println("In dir", s.dir, "adds volume = ", vid, ", replicationType =", replicationType) log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType)
s.volumes[vid] = NewVolume(s.dir, vid, replicationType) s.volumes[vid] = NewVolume(s.dir, vid, replicationType)
return nil return nil
} }
func (s *Store) loadExistingVolumes() {
if dirs, err := ioutil.ReadDir(s.dir); err == nil {
for _, dir := range dirs {
name := dir.Name()
if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
base := name[:len(name)-len(".dat")]
if vid, err := NewVolumeId(base); err == nil {
if s.volumes[vid] == nil {
v := NewVolume(s.dir, vid, CopyNil)
s.volumes[vid] = v
log.Println("In dir", s.dir, "reads volume = ", vid, ", replicationType =", v.replicaType)
}
}
}
}
}
}
func (s *Store) Status() *[]*VolumeInfo { func (s *Store) Status() *[]*VolumeInfo {
stats := new([]*VolumeInfo) stats := new([]*VolumeInfo)
for k, v := range s.volumes { for k, v := range s.volumes {

View File

@@ -18,23 +18,27 @@ type Volume struct {
dataFile *os.File dataFile *os.File
nm *NeedleMap nm *NeedleMap
replicaType ReplicationType replicaType ReplicationType
accessLock sync.Mutex accessLock sync.Mutex
//transient //transient
locations []string locations []string
} }
func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) { func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) {
var e error var e error
v = &Volume{dir: dirname, Id: id, replicaType:replicationType} v = &Volume{dir: dirname, Id: id, replicaType: replicationType}
fileName := id.String() fileName := id.String()
v.dataFile, e = os.OpenFile(path.Join(v.dir, fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) v.dataFile, e = os.OpenFile(path.Join(v.dir, fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644)
if e != nil { if e != nil {
log.Fatalf("New Volume [ERROR] %s\n", e) log.Fatalf("New Volume [ERROR] %s\n", e)
} }
v.maybeWriteSuperBlock() if replicationType == CopyNil {
v.readSuperBlock()
} else {
v.maybeWriteSuperBlock()
}
indexFile, ie := os.OpenFile(path.Join(v.dir, fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644) indexFile, ie := os.OpenFile(path.Join(v.dir, fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644)
if ie != nil { if ie != nil {
log.Fatalf("Write Volume Index [ERROR] %s\n", ie) log.Fatalf("Write Volume Index [ERROR] %s\n", ie)
@@ -58,10 +62,18 @@ func (v *Volume) maybeWriteSuperBlock() {
stat, _ := v.dataFile.Stat() stat, _ := v.dataFile.Stat()
if stat.Size() == 0 { if stat.Size() == 0 {
header := make([]byte, SuperBlockSize) header := make([]byte, SuperBlockSize)
header[0] = byte(v.replicaType) header[0] = 1
header[1] = byte(v.replicaType)
v.dataFile.Write(header) v.dataFile.Write(header)
} }
} }
func (v *Volume) readSuperBlock() {
v.dataFile.Seek(0, 0)
header := make([]byte, SuperBlockSize)
if _, error := v.dataFile.Read(header); error == nil {
v.replicaType = ReplicationType(header[1])
}
}
func (v *Volume) write(n *Needle) uint32 { func (v *Volume) write(n *Needle) uint32 {
v.accessLock.Lock() v.accessLock.Lock()

View File

@@ -16,37 +16,38 @@ const (
Copy11 = ReplicationType(11) // 3 copies, 2 on different racks and local data center, 1 on different data center Copy11 = ReplicationType(11) // 3 copies, 2 on different racks and local data center, 1 on different data center
Copy20 = ReplicationType(20) // 3 copies, each on dffereint data center Copy20 = ReplicationType(20) // 3 copies, each on dffereint data center
LengthRelicationType = 5 LengthRelicationType = 5
CopyNil = ReplicationType(255) // nil value
) )
func NewReplicationType(t string) ReplicationType { func NewReplicationType(t string) ReplicationType {
switch t { switch t {
case "00": case "00":
return Copy00 return Copy00
case "01": case "01":
return Copy01 return Copy01
case "10": case "10":
return Copy10 return Copy10
case "11": case "11":
return Copy11 return Copy11
case "20": case "20":
return Copy20 return Copy20
} }
return Copy00 return Copy00
} }
func (r *ReplicationType) String() string { func (r *ReplicationType) String() string {
switch *r { switch *r {
case Copy00: case Copy00:
return "00" return "00"
case Copy01: case Copy01:
return "01" return "01"
case Copy10: case Copy10:
return "10" return "10"
case Copy11: case Copy11:
return "11" return "11"
case Copy20: case Copy20:
return "20" return "20"
} }
return "00" return "00"
} }
func GetReplicationLevelIndex(v *VolumeInfo) int { func GetReplicationLevelIndex(v *VolumeInfo) int {