Browse Source

同步作者的更新

pull/439/head
chulin 9 years ago
parent
commit
04d9591e1d
  1. 8
      weed/server/raft_server.go
  2. 122
      weed/storage/disk_location.go
  3. 2
      weed/storage/needle_map_memory.go
  4. 14
      weed/storage/store.go
  5. 7
      weed/storage/volume.go
  6. 1
      weed/storage/volume_checking.go
  7. 164
      weed/storage/volume_vacuum.go
  8. 55
      weed/storage/volume_vacuum_test.go

8
weed/server/raft_server.go

@ -144,7 +144,7 @@ func isPeersChanged(dir string, self string, peers []string) (oldPeers []string,
sort.Strings(peers) sort.Strings(peers)
sort.Strings(oldPeers) sort.Strings(oldPeers)
return oldPeers, reflect.DeepEqual(peers, oldPeers)
return oldPeers, !reflect.DeepEqual(peers, oldPeers)
} }
@ -165,7 +165,7 @@ func (s *RaftServer) Join(peers []string) error {
target := fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m)) target := fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m))
glog.V(0).Infoln("Attempting to connect to:", target) glog.V(0).Infoln("Attempting to connect to:", target)
err = postFollowingOneRedirect(target, "application/json", &b)
err = postFollowingOneRedirect(target, "application/json", b)
if err != nil { if err != nil {
glog.V(0).Infoln("Post returned error: ", err.Error()) glog.V(0).Infoln("Post returned error: ", err.Error())
@ -182,9 +182,9 @@ func (s *RaftServer) Join(peers []string) error {
} }
// a workaround because http POST following redirection misses request body // a workaround because http POST following redirection misses request body
func postFollowingOneRedirect(target string, contentType string, b *bytes.Buffer) error {
func postFollowingOneRedirect(target string, contentType string, b bytes.Buffer) error {
backupReader := bytes.NewReader(b.Bytes()) backupReader := bytes.NewReader(b.Bytes())
resp, err := http.Post(target, contentType, b)
resp, err := http.Post(target, contentType, &b)
if err != nil { if err != nil {
return err return err
} }

122
weed/storage/disk_location.go

@ -2,7 +2,9 @@ package storage
import ( import (
"io/ioutil" "io/ioutil"
"os"
"strings" "strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
) )
@ -11,6 +13,7 @@ type DiskLocation struct {
Directory string Directory string
MaxVolumeCount int MaxVolumeCount int
volumes map[VolumeId]*Volume volumes map[VolumeId]*Volume
sync.RWMutex
} }
func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
@ -19,35 +22,80 @@ func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
return location return location
} }
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs {
name := dir.Name()
if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
collection := ""
base := name[:len(name)-len(".dat")]
i := strings.LastIndex(base, "_")
if i > 0 {
collection, base = base[0:i], base[i+1:]
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
l.volumes[vid] = v
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
} else {
glog.V(0).Infof("new volume %s error %s", name, e)
}
}
func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) {
name := dir.Name()
if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
collection := ""
base := name[:len(name)-len(".dat")]
i := strings.LastIndex(base, "_")
if i > 0 {
collection, base = base[0:i], base[i+1:]
}
if vid, err := NewVolumeId(base); err == nil {
mutex.RLock()
_, found := l.volumes[vid]
mutex.RUnlock()
if !found {
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
mutex.Lock()
l.volumes[vid] = v
mutex.Unlock()
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
} else {
glog.V(0).Infof("new volume %s error %s", name, e)
} }
} }
} }
} }
}
func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrentFlag bool) {
var concurrency int
if concurrentFlag {
//You could choose a better optimized concurency value after testing at your environment
concurrency = 10
} else {
concurrency = 1
}
task_queue := make(chan os.FileInfo, 10*concurrency)
go func() {
if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs {
task_queue <- dir
}
}
close(task_queue)
}()
var wg sync.WaitGroup
var mutex sync.RWMutex
for workerNum := 0; workerNum < concurrency; workerNum++ {
wg.Add(1)
go func() {
defer wg.Done()
for dir := range task_queue {
l.loadExistingVolume(dir, needleMapKind, &mutex)
}
}()
}
wg.Wait()
}
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
l.Lock()
defer l.Unlock()
l.concurrentLoadingVolumes(needleMapKind, true)
glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount) glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
} }
func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) { func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
l.Lock()
defer l.Unlock()
for k, v := range l.volumes { for k, v := range l.volumes {
if v.Collection == collection { if v.Collection == collection {
e = l.deleteVolumeById(k) e = l.deleteVolumeById(k)
@ -71,3 +119,35 @@ func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) {
delete(l.volumes, vid) delete(l.volumes, vid)
return return
} }
func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) {
l.Lock()
defer l.Unlock()
l.volumes[vid] = volume
}
func (l *DiskLocation) FindVolume(vid VolumeId) (*Volume, bool) {
l.RLock()
defer l.RUnlock()
v, ok := l.volumes[vid]
return v, ok
}
func (l *DiskLocation) VolumesLen() int {
l.RLock()
defer l.RUnlock()
return len(l.volumes)
}
func (l *DiskLocation) Close() {
l.Lock()
defer l.Unlock()
for _, v := range l.volumes {
v.Close()
}
return
}

2
weed/storage/needle_map_memory.go

@ -48,7 +48,7 @@ func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
} }
return nil return nil
}) })
glog.V(1).Infoln("max file key:", nm.MaximumFileKey)
glog.V(1).Infof("max file key: %d for file: %s", nm.MaximumFileKey, file.Name())
return nm, e return nm, e
} }

14
weed/storage/store.go

@ -143,7 +143,7 @@ func (s *Store) DeleteCollection(collection string) (e error) {
func (s *Store) findVolume(vid VolumeId) *Volume { func (s *Store) findVolume(vid VolumeId) *Volume {
for _, location := range s.Locations { for _, location := range s.Locations {
if v, found := location.volumes[vid]; found {
if v, found := location.FindVolume(vid); found {
return v return v
} }
} }
@ -152,7 +152,7 @@ func (s *Store) findVolume(vid VolumeId) *Volume {
func (s *Store) findFreeLocation() (ret *DiskLocation) { func (s *Store) findFreeLocation() (ret *DiskLocation) {
max := 0 max := 0
for _, location := range s.Locations { for _, location := range s.Locations {
currentFreeCount := location.MaxVolumeCount - len(location.volumes)
currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
if currentFreeCount > max { if currentFreeCount > max {
max = currentFreeCount max = currentFreeCount
ret = location ret = location
@ -168,7 +168,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleM
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl) location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil { if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
location.volumes[vid] = volume
location.SetVolume(vid, volume)
return nil return nil
} else { } else {
return err return err
@ -180,6 +180,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleM
func (s *Store) Status() []*VolumeInfo { func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo var stats []*VolumeInfo
for _, location := range s.Locations { for _, location := range s.Locations {
location.RLock()
for k, v := range location.volumes { for k, v := range location.volumes {
s := &VolumeInfo{ s := &VolumeInfo{
Id: VolumeId(k), Id: VolumeId(k),
@ -194,6 +195,7 @@ func (s *Store) Status() []*VolumeInfo {
Ttl: v.Ttl} Ttl: v.Ttl}
stats = append(stats, s) stats = append(stats, s)
} }
location.RUnlock()
} }
sortVolumeInfos(stats) sortVolumeInfos(stats)
return stats return stats
@ -219,6 +221,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
var maxFileKey uint64 var maxFileKey uint64
for _, location := range s.Locations { for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
location.Lock()
for k, v := range location.volumes { for k, v := range location.volumes {
if maxFileKey < v.nm.MaxFileKey() { if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey() maxFileKey = v.nm.MaxFileKey()
@ -246,6 +249,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
} }
} }
} }
location.Unlock()
} }
joinMessage := &operation.JoinMessage{ joinMessage := &operation.JoinMessage{
@ -290,9 +294,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
} }
func (s *Store) Close() { func (s *Store) Close() {
for _, location := range s.Locations { for _, location := range s.Locations {
for _, v := range location.volumes {
v.Close()
}
location.Close()
} }
} }
func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {

7
weed/storage/volume.go

@ -23,6 +23,9 @@ type Volume struct {
dataFileAccessLock sync.Mutex dataFileAccessLock sync.Mutex
lastModifiedTime uint64 //unix time in seconds lastModifiedTime uint64 //unix time in seconds
lastCompactIndexOffset uint64
lastCompactRevision uint16
} }
func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
@ -92,9 +95,9 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool {
if v.Ttl == nil || v.Ttl.Minutes() == 0 { if v.Ttl == nil || v.Ttl.Minutes() == 0 {
return false return false
} }
glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60 livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
if int64(v.Ttl.Minutes()) < livedMinutes { if int64(v.Ttl.Minutes()) < livedMinutes {
return true return true
} }

1
weed/storage/volume_checking.go

@ -21,7 +21,6 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error {
return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
} }
key, offset, size := idxFileEntry(lastIdxEntry) key, offset, size := idxFileEntry(lastIdxEntry)
//deleted index entry could not point to deleted needle
if offset == 0 { if offset == 0 {
return nil return nil
} }

164
weed/storage/volume_vacuum.go

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
) )
func (v *Volume) garbageLevel() float64 { func (v *Volume) garbageLevel() float64 {
@ -20,7 +21,9 @@ func (v *Volume) Compact() error {
//glog.V(3).Infof("Got Compaction lock...") //glog.V(3).Infof("Got Compaction lock...")
filePath := v.FileName() filePath := v.FileName()
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
v.lastCompactIndexOffset = v.nm.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactRevision
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
} }
@ -38,14 +41,28 @@ func (v *Volume) commitCompact() error {
glog.V(3).Infof("Got Committing lock...") glog.V(3).Infof("Got Committing lock...")
v.nm.Close() v.nm.Close()
_ = v.dataFile.Close() _ = v.dataFile.Close()
makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx")
var e error var e error
if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
return e
}
if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
return e
if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
glog.V(0).Infof("makeupDiff in commitCompact failed %v", e)
e = os.Remove(v.FileName() + ".cpd")
if e != nil {
return e
}
e = os.Remove(v.FileName() + ".cpx")
if e != nil {
return e
}
} else {
var e error
if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
return e
}
if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
return e
}
} }
//glog.V(3).Infof("Pretending to be vacuuming...") //glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second) //time.Sleep(20 * time.Second)
glog.V(3).Infof("Loading Commit file...") glog.V(3).Infof("Loading Commit file...")
@ -55,7 +72,138 @@ func (v *Volume) commitCompact() error {
return nil return nil
} }
func makeupDiff(newDatFile, newIdxFile, oldDatFile, oldIdxFile string) (err error) {
func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) {
if _, err = file.Seek(0, 0); err != nil {
return 0, fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err)
}
header := make([]byte, SuperBlockSize)
if _, e := file.Read(header); e != nil {
return 0, fmt.Errorf("cannot read file %s 's super block: %v", file.Name(), e)
}
superBlock, err := ParseSuperBlock(header)
if err != nil {
return 0, err
}
return superBlock.CompactRevision, nil
}
func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) {
var indexSize int64
oldIdxFile, err := os.Open(oldIdxFileName)
defer oldIdxFile.Close()
oldDatFile, err := os.Open(oldDatFileName)
defer oldDatFile.Close()
if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil {
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err)
}
if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset {
return nil
}
oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatFile)
if err != nil {
return
}
if oldDatCompactRevision != v.lastCompactRevision {
return fmt.Errorf("current old dat file's compact revision %d is not the expected one %d", oldDatCompactRevision, v.lastCompactRevision)
}
type keyField struct {
offset uint32
size uint32
}
incrementedHasUpdatedIndexEntry := make(map[uint64]keyField)
for idx_offset := indexSize - NeedleIndexSize; uint64(idx_offset) >= v.lastCompactIndexOffset; idx_offset -= NeedleIndexSize {
var IdxEntry []byte
if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil {
return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err)
}
key, offset, size := idxFileEntry(IdxEntry)
if _, found := incrementedHasUpdatedIndexEntry[key]; !found {
incrementedHasUpdatedIndexEntry[key] = keyField{
offset: offset,
size: size,
}
}
}
if len(incrementedHasUpdatedIndexEntry) > 0 {
var (
dst, idx *os.File
)
if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil {
return
}
defer dst.Close()
if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil {
return
}
defer idx.Close()
var newDatCompactRevision uint16
newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst)
if err != nil {
return
}
if oldDatCompactRevision+1 != newDatCompactRevision {
return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision)
}
idx_entry_bytes := make([]byte, 16)
for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry {
util.Uint64toBytes(idx_entry_bytes[0:8], key)
util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset)
util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size)
var offset int64
if offset, err = dst.Seek(0, 2); err != nil {
glog.V(0).Infof("failed to seek the end of file: %v", err)
return
}
//ensure file writing starting from aligned positions
if offset%NeedlePaddingSize != 0 {
offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
if offset, err = v.dataFile.Seek(offset, 0); err != nil {
glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
return
}
}
//updated needle
if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 {
//even the needle cache in memory is hit, the need_bytes is correct
var needle_bytes []byte
needle_bytes, _, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size)
if err != nil {
return
}
dst.Write(needle_bytes)
util.Uint32toBytes(idx_entry_bytes[8:12], uint32(offset/NeedlePaddingSize))
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil
fakeDelNeedle := new(Needle)
fakeDelNeedle.Id = key
fakeDelNeedle.Cookie = 0x12345678
_, err = fakeDelNeedle.Append(dst, v.Version())
if err != nil {
return
}
util.Uint32toBytes(idx_entry_bytes[8:12], uint32(0))
}
if _, err := idx.Seek(0, 2); err != nil {
return fmt.Errorf("cannot seek end of indexfile %s: %v",
newIdxFileName, err)
}
_, err = idx.Write(idx_entry_bytes)
}
}
return nil return nil
} }

55
weed/storage/volume_vacuum_test.go

@ -0,0 +1,55 @@
package storage
import (
"testing"
)
/*
makediff test steps
1. launch weed server at your local/dev environment, (option
"garbageThreshold" for master and option "max" for volume should be set with specific value which would let
preparing test prerequisite easier )
a) ./weed master -garbageThreshold=0.99 -mdir=./m
b) ./weed volume -dir=./data -max=1 -mserver=localhost:9333 -port=8080
2. upload 4 different files, you could call dir/assign to get 4 different fids
a) upload file A with fid a
b) upload file B with fid b
c) upload file C with fid c
d) upload file D with fid d
3. update file A and C
a) modify file A and upload file A with fid a
b) modify file C and upload file C with fid c
c) record the current 1.idx's file size(lastCompactIndexOffset value)
4. Compacting the data file
a) run curl http://localhost:8080/admin/vacuum/compact?volumeId=1
b) verify the 1.cpd and 1.cpx is created under volume directory
5. update file B and delete file D
a) modify file B and upload file B with fid b
d) delete file B with fid b
6. Now you could run the following UT case, the case should be run successfully
7. Compact commit manually
a) mv 1.cpd 1.dat
b) mv 1.cpx 1.idx
8. Restart Volume Server
9. Now you should get updated file A,B,C
*/
func TestMakeDiff(t *testing.T) {
v := new(Volume)
//lastCompactIndexOffset value is the index file size before step 4
v.lastCompactIndexOffset = 96
v.SuperBlock.version = 0x2
/*
err := v.makeupDiff(
"/yourpath/1.cpd",
"/yourpath/1.cpx",
"/yourpath/1.dat",
"/yourpath/1.idx")
if err != nil {
t.Errorf("makeupDiff err is %v", err)
} else {
t.Log("makeupDiff Succeeded")
}
*/
}
Loading…
Cancel
Save