diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index a35659818..3652d7add 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -144,7 +144,7 @@ func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, sort.Strings(peers) sort.Strings(oldPeers) - return oldPeers, reflect.DeepEqual(peers, oldPeers) + return oldPeers, !reflect.DeepEqual(peers, oldPeers) } @@ -165,7 +165,7 @@ func (s *RaftServer) Join(peers []string) error { target := fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m)) glog.V(0).Infoln("Attempting to connect to:", target) - err = postFollowingOneRedirect(target, "application/json", &b) + err = postFollowingOneRedirect(target, "application/json", b) if err != nil { glog.V(0).Infoln("Post returned error: ", err.Error()) @@ -182,9 +182,9 @@ func (s *RaftServer) Join(peers []string) error { } // a workaround because http POST following redirection misses request body -func postFollowingOneRedirect(target string, contentType string, b *bytes.Buffer) error { +func postFollowingOneRedirect(target string, contentType string, b bytes.Buffer) error { backupReader := bytes.NewReader(b.Bytes()) - resp, err := http.Post(target, contentType, b) + resp, err := http.Post(target, contentType, &b) if err != nil { return err } diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index cc3c83b63..e7604a734 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -2,7 +2,9 @@ package storage import ( "io/ioutil" + "os" "strings" + "sync" "github.com/chrislusf/seaweedfs/weed/glog" ) @@ -11,6 +13,7 @@ type DiskLocation struct { Directory string MaxVolumeCount int volumes map[VolumeId]*Volume + sync.RWMutex } func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { @@ -19,35 +22,80 @@ func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { return location } -func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { - - if dirs, err := ioutil.ReadDir(l.Directory); err == nil { - for _, dir := range dirs { - name := dir.Name() - if !dir.IsDir() && strings.HasSuffix(name, ".dat") { - collection := "" - base := name[:len(name)-len(".dat")] - i := strings.LastIndex(base, "_") - if i > 0 { - collection, base = base[0:i], base[i+1:] - } - if vid, err := NewVolumeId(base); err == nil { - if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil { - l.volumes[vid] = v - glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) - } else { - glog.V(0).Infof("new volume %s error %s", name, e) - } - } +func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) { + name := dir.Name() + if !dir.IsDir() && strings.HasSuffix(name, ".dat") { + collection := "" + base := name[:len(name)-len(".dat")] + i := strings.LastIndex(base, "_") + if i > 0 { + collection, base = base[0:i], base[i+1:] + } + if vid, err := NewVolumeId(base); err == nil { + mutex.RLock() + _, found := l.volumes[vid] + mutex.RUnlock() + if !found { + if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil { + mutex.Lock() + l.volumes[vid] = v + mutex.Unlock() + glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) + } else { + glog.V(0).Infof("new volume %s error %s", name, e) } } } } +} + +func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrentFlag bool) { + var concurrency int + if concurrentFlag { + //You could choose a better optimized concurency value after testing at your environment + concurrency = 10 + } else { + concurrency = 1 + } + + task_queue := make(chan os.FileInfo, 10*concurrency) + go func() { + if dirs, err := ioutil.ReadDir(l.Directory); err == nil { + for _, dir := range dirs { + task_queue <- dir + } + } + close(task_queue) + }() + + var wg sync.WaitGroup + var mutex sync.RWMutex + for workerNum := 0; workerNum < concurrency; workerNum++ { + wg.Add(1) + go func() { + defer wg.Done() + for dir := range task_queue { + l.loadExistingVolume(dir, needleMapKind, &mutex) + } + }() + } + wg.Wait() + +} + +func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { + l.Lock() + defer l.Unlock() + + l.concurrentLoadingVolumes(needleMapKind, true) + glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount) } func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) { + l.Lock() + defer l.Unlock() + for k, v := range l.volumes { if v.Collection == collection { e = l.deleteVolumeById(k) @@ -71,3 +119,35 @@ func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) { delete(l.volumes, vid) return } + +func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) { + l.Lock() + defer l.Unlock() + + l.volumes[vid] = volume +} + +func (l *DiskLocation) FindVolume(vid VolumeId) (*Volume, bool) { + l.RLock() + defer l.RUnlock() + + v, ok := l.volumes[vid] + return v, ok +} + +func (l *DiskLocation) VolumesLen() int { + l.RLock() + defer l.RUnlock() + + return len(l.volumes) +} + +func (l *DiskLocation) Close() { + l.Lock() + defer l.Unlock() + + for _, v := range l.volumes { + v.Close() + } + return +} diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index f2f4835df..195d8bdbc 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -48,7 +48,7 @@ func LoadNeedleMap(file *os.File) (*NeedleMap, error) { } return nil }) - glog.V(1).Infoln("max file key:", nm.MaximumFileKey) + glog.V(1).Infof("max file key: %d for file: %s", nm.MaximumFileKey, file.Name()) return nm, e } diff --git a/weed/storage/store.go b/weed/storage/store.go index d6c7172e7..614c87ace 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -143,7 +143,7 @@ func (s *Store) DeleteCollection(collection string) (e error) { func (s *Store) findVolume(vid VolumeId) *Volume { for _, location := range s.Locations { - if v, found := location.volumes[vid]; found { + if v, found := location.FindVolume(vid); found { return v } } @@ -152,7 +152,7 @@ func (s *Store) findVolume(vid VolumeId) *Volume { func (s *Store) findFreeLocation() (ret *DiskLocation) { max := 0 for _, location := range s.Locations { - currentFreeCount := location.MaxVolumeCount - len(location.volumes) + currentFreeCount := location.MaxVolumeCount - location.VolumesLen() if currentFreeCount > max { max = currentFreeCount ret = location @@ -168,7 +168,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleM glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil { - location.volumes[vid] = volume + location.SetVolume(vid, volume) return nil } else { return err @@ -180,6 +180,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleM func (s *Store) Status() []*VolumeInfo { var stats []*VolumeInfo for _, location := range s.Locations { + location.RLock() for k, v := range location.volumes { s := &VolumeInfo{ Id: VolumeId(k), @@ -194,6 +195,7 @@ func (s *Store) Status() []*VolumeInfo { Ttl: v.Ttl} stats = append(stats, s) } + location.RUnlock() } sortVolumeInfos(stats) return stats @@ -219,6 +221,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S var maxFileKey uint64 for _, location := range s.Locations { maxVolumeCount = maxVolumeCount + location.MaxVolumeCount + location.Lock() for k, v := range location.volumes { if maxFileKey < v.nm.MaxFileKey() { maxFileKey = v.nm.MaxFileKey() @@ -246,6 +249,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S } } } + location.Unlock() } joinMessage := &operation.JoinMessage{ @@ -290,9 +294,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S } func (s *Store) Close() { for _, location := range s.Locations { - for _, v := range location.volumes { - v.Close() - } + location.Close() } } func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 801dfe267..dfd623eaa 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -23,6 +23,9 @@ type Volume struct { dataFileAccessLock sync.Mutex lastModifiedTime uint64 //unix time in seconds + + lastCompactIndexOffset uint64 + lastCompactRevision uint16 } func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { @@ -92,9 +95,9 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool { if v.Ttl == nil || v.Ttl.Minutes() == 0 { return false } - glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime) + glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime) livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60 - glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) + glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) if int64(v.Ttl.Minutes()) < livedMinutes { return true } diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index d424010f1..48f707594 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -21,7 +21,6 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error { return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) } key, offset, size := idxFileEntry(lastIdxEntry) - //deleted index entry could not point to deleted needle if offset == 0 { return nil } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 51d74e311..723300557 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -6,6 +6,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" ) func (v *Volume) garbageLevel() float64 { @@ -20,7 +21,9 @@ func (v *Volume) Compact() error { //glog.V(3).Infof("Got Compaction lock...") filePath := v.FileName() - glog.V(3).Infof("creating copies for volume %d ...", v.Id) + v.lastCompactIndexOffset = v.nm.IndexFileSize() + v.lastCompactRevision = v.SuperBlock.CompactRevision + glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") } @@ -38,14 +41,28 @@ func (v *Volume) commitCompact() error { glog.V(3).Infof("Got Committing lock...") v.nm.Close() _ = v.dataFile.Close() - makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx") + var e error - if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { - return e - } - if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { - return e + if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { + glog.V(0).Infof("makeupDiff in commitCompact failed %v", e) + e = os.Remove(v.FileName() + ".cpd") + if e != nil { + return e + } + e = os.Remove(v.FileName() + ".cpx") + if e != nil { + return e + } + } else { + var e error + if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { + return e + } + if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { + return e + } } + //glog.V(3).Infof("Pretending to be vacuuming...") //time.Sleep(20 * time.Second) glog.V(3).Infof("Loading Commit file...") @@ -55,7 +72,138 @@ func (v *Volume) commitCompact() error { return nil } -func makeupDiff(newDatFile, newIdxFile, oldDatFile, oldIdxFile string) (err error) { +func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) { + if _, err = file.Seek(0, 0); err != nil { + return 0, fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err) + } + header := make([]byte, SuperBlockSize) + if _, e := file.Read(header); e != nil { + return 0, fmt.Errorf("cannot read file %s 's super block: %v", file.Name(), e) + } + superBlock, err := ParseSuperBlock(header) + if err != nil { + return 0, err + } + return superBlock.CompactRevision, nil +} + +func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) { + var indexSize int64 + + oldIdxFile, err := os.Open(oldIdxFileName) + defer oldIdxFile.Close() + + oldDatFile, err := os.Open(oldDatFileName) + defer oldDatFile.Close() + + if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil { + return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) + } + if indexSize == 0 || uint64(indexSize) <= v.lastCompactIndexOffset { + return nil + } + + oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatFile) + if err != nil { + return + } + if oldDatCompactRevision != v.lastCompactRevision { + return fmt.Errorf("current old dat file's compact revision %d is not the expected one %d", oldDatCompactRevision, v.lastCompactRevision) + } + + type keyField struct { + offset uint32 + size uint32 + } + incrementedHasUpdatedIndexEntry := make(map[uint64]keyField) + + for idx_offset := indexSize - NeedleIndexSize; uint64(idx_offset) >= v.lastCompactIndexOffset; idx_offset -= NeedleIndexSize { + var IdxEntry []byte + if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil { + return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err) + } + key, offset, size := idxFileEntry(IdxEntry) + if _, found := incrementedHasUpdatedIndexEntry[key]; !found { + incrementedHasUpdatedIndexEntry[key] = keyField{ + offset: offset, + size: size, + } + } + } + + if len(incrementedHasUpdatedIndexEntry) > 0 { + var ( + dst, idx *os.File + ) + if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil { + return + } + defer dst.Close() + + if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil { + return + } + defer idx.Close() + + var newDatCompactRevision uint16 + newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst) + if err != nil { + return + } + if oldDatCompactRevision+1 != newDatCompactRevision { + return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision) + } + + idx_entry_bytes := make([]byte, 16) + for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry { + util.Uint64toBytes(idx_entry_bytes[0:8], key) + util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset) + util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size) + + var offset int64 + if offset, err = dst.Seek(0, 2); err != nil { + glog.V(0).Infof("failed to seek the end of file: %v", err) + return + } + //ensure file writing starting from aligned positions + if offset%NeedlePaddingSize != 0 { + offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) + if offset, err = v.dataFile.Seek(offset, 0); err != nil { + glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) + return + } + } + + //updated needle + if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 { + //even the needle cache in memory is hit, the need_bytes is correct + var needle_bytes []byte + needle_bytes, _, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size) + if err != nil { + return + } + dst.Write(needle_bytes) + util.Uint32toBytes(idx_entry_bytes[8:12], uint32(offset/NeedlePaddingSize)) + } else { //deleted needle + //fakeDelNeedle 's default Data field is nil + fakeDelNeedle := new(Needle) + fakeDelNeedle.Id = key + fakeDelNeedle.Cookie = 0x12345678 + _, err = fakeDelNeedle.Append(dst, v.Version()) + if err != nil { + return + } + util.Uint32toBytes(idx_entry_bytes[8:12], uint32(0)) + } + + if _, err := idx.Seek(0, 2); err != nil { + return fmt.Errorf("cannot seek end of indexfile %s: %v", + newIdxFileName, err) + } + _, err = idx.Write(idx_entry_bytes) + } + } + return nil } diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go new file mode 100644 index 000000000..c2fac6ce8 --- /dev/null +++ b/weed/storage/volume_vacuum_test.go @@ -0,0 +1,55 @@ +package storage + +import ( + "testing" +) + +/* +makediff test steps +1. launch weed server at your local/dev environment, (option +"garbageThreshold" for master and option "max" for volume should be set with specific value which would let +preparing test prerequisite easier ) + a) ./weed master -garbageThreshold=0.99 -mdir=./m + b) ./weed volume -dir=./data -max=1 -mserver=localhost:9333 -port=8080 +2. upload 4 different files, you could call dir/assign to get 4 different fids + a) upload file A with fid a + b) upload file B with fid b + c) upload file C with fid c + d) upload file D with fid d +3. update file A and C + a) modify file A and upload file A with fid a + b) modify file C and upload file C with fid c + c) record the current 1.idx's file size(lastCompactIndexOffset value) +4. Compacting the data file + a) run curl http://localhost:8080/admin/vacuum/compact?volumeId=1 + b) verify the 1.cpd and 1.cpx is created under volume directory +5. update file B and delete file D + a) modify file B and upload file B with fid b + d) delete file B with fid b +6. Now you could run the following UT case, the case should be run successfully +7. Compact commit manually + a) mv 1.cpd 1.dat + b) mv 1.cpx 1.idx +8. Restart Volume Server +9. Now you should get updated file A,B,C +*/ + +func TestMakeDiff(t *testing.T) { + + v := new(Volume) + //lastCompactIndexOffset value is the index file size before step 4 + v.lastCompactIndexOffset = 96 + v.SuperBlock.version = 0x2 + /* + err := v.makeupDiff( + "/yourpath/1.cpd", + "/yourpath/1.cpx", + "/yourpath/1.dat", + "/yourpath/1.idx") + if err != nil { + t.Errorf("makeupDiff err is %v", err) + } else { + t.Log("makeupDiff Succeeded") + } + */ +}