diff --git a/go/operation/sync_volume.go b/go/operation/sync_volume.go new file mode 100644 index 000000000..54944a64e --- /dev/null +++ b/go/operation/sync_volume.go @@ -0,0 +1,54 @@ +package operation + +import ( + "encoding/json" + "fmt" + "net/url" + + "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/util" +) + +type SyncVolumeResponse struct { + Replication string `json:"Replication,omitempty"` + Ttl string `json:"Ttl,omitempty"` + TailOffset uint64 `json:"TailOffset,omitempty"` + CompactRevision uint16 `json:"CompactRevision,omitempty"` + IdxFileSize uint64 `json:"IdxFileSize,omitempty"` + Error string `json:"error,omitempty"` +} + +func GetVolumeSyncStatus(server string, vid string) (*SyncVolumeResponse, error) { + values := make(url.Values) + values.Add("volume", vid) + jsonBlob, err := util.Post("http://"+server+"/admin/sync/status", values) + glog.V(2).Info("sync volume result :", string(jsonBlob)) + if err != nil { + return nil, err + } + var ret SyncVolumeResponse + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + if ret.Error != "" { + return nil, fmt.Errorf("Volume %s get sync status error: %s", vid, ret.Error) + } + return &ret, nil +} + +func GetVolumeIdxEntries(server string, vid string, eachEntryFn func(key uint64, offset, size uint32)) error { + values := make(url.Values) + values.Add("volume", vid) + line := make([]byte, 16) + err := util.GetBufferStream("http://"+server+"/admin/sync/index", values, line, func(bytes []byte) { + key := util.BytesToUint64(bytes[:8]) + offset := util.BytesToUint32(bytes[8:12]) + size := util.BytesToUint32(bytes[12:16]) + eachEntryFn(key, offset, size) + }) + if err != nil { + return err + } + return nil +} diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index 03499fda4..814789616 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -2,7 +2,9 @@ package storage import ( "fmt" + "io/ioutil" "os" + "sync" "github.com/chrislusf/seaweedfs/go/util" ) @@ -26,29 +28,66 @@ type NeedleMapper interface { FileCount() int DeletedCount() int MaxFileKey() uint64 + IndexFileSize() uint64 + IndexFileContent() ([]byte, error) + IndexFileName() string } -type mapMetric struct { - DeletionCounter int `json:"DeletionCounter"` - FileCounter int `json:"FileCounter"` - DeletionByteCounter uint64 `json:"DeletionByteCounter"` - FileByteCounter uint64 `json:"FileByteCounter"` - MaximumFileKey uint64 `json:"MaxFileKey"` +type baseNeedleMapper struct { + indexFile *os.File + indexFileAccessLock sync.Mutex + + mapMetric } -func appendToIndexFile(indexFile *os.File, - key uint64, offset uint32, size uint32) error { +func (nm baseNeedleMapper) IndexFileSize() uint64 { + stat, err := nm.indexFile.Stat() + if err == nil { + return uint64(stat.Size()) + } + return 0 +} + +func (nm baseNeedleMapper) IndexFileName() string { + return nm.indexFile.Name() +} + +func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) { + key = util.BytesToUint64(bytes[:8]) + offset = util.BytesToUint32(bytes[8:12]) + size = util.BytesToUint32(bytes[12:16]) + return +} +func (nm baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error { bytes := make([]byte, 16) util.Uint64toBytes(bytes[0:8], key) util.Uint32toBytes(bytes[8:12], offset) util.Uint32toBytes(bytes[12:16], size) - if _, err := indexFile.Seek(0, 2); err != nil { + + nm.indexFileAccessLock.Lock() + defer nm.indexFileAccessLock.Unlock() + if _, err := nm.indexFile.Seek(0, 2); err != nil { return fmt.Errorf("cannot seek end of indexfile %s: %v", - indexFile.Name(), err) + nm.indexFile.Name(), err) } - _, err := indexFile.Write(bytes) + _, err := nm.indexFile.Write(bytes) return err } +func (nm baseNeedleMapper) IndexFileContent() ([]byte, error) { + nm.indexFileAccessLock.Lock() + defer nm.indexFileAccessLock.Unlock() + return ioutil.ReadFile(nm.indexFile.Name()) +} + +type mapMetric struct { + indexFile *os.File + + DeletionCounter int `json:"DeletionCounter"` + FileCounter int `json:"FileCounter"` + DeletionByteCounter uint64 `json:"DeletionByteCounter"` + FileByteCounter uint64 `json:"FileByteCounter"` + MaximumFileKey uint64 `json:"MaxFileKey"` +} func (mm *mapMetric) logDelete(deletedByteCount uint32) { mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount) @@ -66,3 +105,19 @@ func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) { mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize) } } + +func (mm mapMetric) ContentSize() uint64 { + return mm.FileByteCounter +} +func (mm mapMetric) DeletedSize() uint64 { + return mm.DeletionByteCounter +} +func (mm mapMetric) FileCount() int { + return mm.FileCounter +} +func (mm mapMetric) DeletedCount() int { + return mm.DeletionCounter +} +func (mm mapMetric) MaxFileKey() uint64 { + return mm.MaximumFileKey +} diff --git a/go/storage/needle_map_boltdb.go b/go/storage/needle_map_boltdb.go index bed6e58d1..e95c016bb 100644 --- a/go/storage/needle_map_boltdb.go +++ b/go/storage/needle_map_boltdb.go @@ -12,15 +12,15 @@ import ( type BoltDbNeedleMap struct { dbFileName string - indexFile *os.File db *bolt.DB - mapMetric + baseNeedleMapper } var boltdbBucket = []byte("weed") func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) { - m = &BoltDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName} + m = &BoltDbNeedleMap{dbFileName: dbFileName} + m.indexFile = indexFile if !isBoltDbFresh(dbFileName, indexFile) { glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) generateBoltDbFile(dbFileName, indexFile) @@ -101,7 +101,7 @@ func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { } m.logPut(key, oldSize, size) // write to index file first - if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil { + if err := m.appendToIndexFile(key, offset, size); err != nil { return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) } return boltDbWrite(m.db, key, offset, size) @@ -148,7 +148,7 @@ func (m *BoltDbNeedleMap) Delete(key uint64) error { m.logDelete(oldNeedle.Size) } // write to index file first - if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil { + if err := m.appendToIndexFile(key, 0, 0); err != nil { return err } return boltDbDelete(m.db, key) @@ -163,19 +163,3 @@ func (m *BoltDbNeedleMap) Destroy() error { os.Remove(m.indexFile.Name()) return os.Remove(m.dbFileName) } - -func (m *BoltDbNeedleMap) ContentSize() uint64 { - return m.FileByteCounter -} -func (m *BoltDbNeedleMap) DeletedSize() uint64 { - return m.DeletionByteCounter -} -func (m *BoltDbNeedleMap) FileCount() int { - return m.FileCounter -} -func (m *BoltDbNeedleMap) DeletedCount() int { - return m.DeletionCounter -} -func (m *BoltDbNeedleMap) MaxFileKey() uint64 { - return m.MaximumFileKey -} diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go index 65377a8a8..47f63e3ae 100644 --- a/go/storage/needle_map_leveldb.go +++ b/go/storage/needle_map_leveldb.go @@ -12,13 +12,13 @@ import ( type LevelDbNeedleMap struct { dbFileName string - indexFile *os.File db *leveldb.DB - mapMetric + baseNeedleMapper } func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) { - m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName} + m = &LevelDbNeedleMap{dbFileName: dbFileName} + m.indexFile = indexFile if !isLevelDbFresh(dbFileName, indexFile) { glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) generateLevelDbFile(dbFileName, indexFile) @@ -89,7 +89,7 @@ func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { } m.logPut(key, oldSize, size) // write to index file first - if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil { + if err := m.appendToIndexFile(key, offset, size); err != nil { return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) } return levelDbWrite(m.db, key, offset, size) @@ -117,7 +117,7 @@ func (m *LevelDbNeedleMap) Delete(key uint64) error { m.logDelete(oldNeedle.Size) } // write to index file first - if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil { + if err := m.appendToIndexFile(key, 0, 0); err != nil { return err } return levelDbDelete(m.db, key) @@ -132,19 +132,3 @@ func (m *LevelDbNeedleMap) Destroy() error { os.Remove(m.indexFile.Name()) return os.Remove(m.dbFileName) } - -func (m *LevelDbNeedleMap) ContentSize() uint64 { - return m.FileByteCounter -} -func (m *LevelDbNeedleMap) DeletedSize() uint64 { - return m.DeletionByteCounter -} -func (m *LevelDbNeedleMap) FileCount() int { - return m.FileCounter -} -func (m *LevelDbNeedleMap) DeletedCount() int { - return m.DeletionCounter -} -func (m *LevelDbNeedleMap) MaxFileKey() uint64 { - return m.MaximumFileKey -} diff --git a/go/storage/needle_map_memory.go b/go/storage/needle_map_memory.go index f0c30c9f2..2b1fc1b54 100644 --- a/go/storage/needle_map_memory.go +++ b/go/storage/needle_map_memory.go @@ -5,21 +5,19 @@ import ( "os" "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/util" ) type NeedleMap struct { - indexFile *os.File - m CompactMap + m CompactMap - mapMetric + baseNeedleMapper } func NewNeedleMap(file *os.File) *NeedleMap { nm := &NeedleMap{ - m: NewCompactMap(), - indexFile: file, + m: NewCompactMap(), } + nm.indexFile = file return nm } @@ -70,9 +68,7 @@ func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e for count > 0 && e == nil || e == io.EOF { for i = 0; i+16 <= count; i += 16 { - key = util.BytesToUint64(bytes[i : i+8]) - offset = util.BytesToUint32(bytes[i+8 : i+12]) - size = util.BytesToUint32(bytes[i+12 : i+16]) + key, offset, size = idxFileEntry(bytes[i : i+16]) if e = fn(key, offset, size); e != nil { return e } @@ -90,7 +86,7 @@ func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error { oldSize := nm.m.Set(Key(key), offset, size) nm.logPut(key, oldSize, size) - return appendToIndexFile(nm.indexFile, key, offset, size) + return nm.appendToIndexFile(key, offset, size) } func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { element, ok = nm.m.Get(Key(key)) @@ -99,7 +95,7 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { func (nm *NeedleMap) Delete(key uint64) error { deletedBytes := nm.m.Delete(Key(key)) nm.logDelete(deletedBytes) - return appendToIndexFile(nm.indexFile, key, 0, 0) + return nm.appendToIndexFile(key, 0, 0) } func (nm *NeedleMap) Close() { _ = nm.indexFile.Close() @@ -108,19 +104,3 @@ func (nm *NeedleMap) Destroy() error { nm.Close() return os.Remove(nm.indexFile.Name()) } -func (nm NeedleMap) ContentSize() uint64 { - return nm.FileByteCounter -} -func (nm NeedleMap) DeletedSize() uint64 { - return nm.DeletionByteCounter -} -func (nm NeedleMap) FileCount() int { - return nm.FileCounter -} -func (nm NeedleMap) DeletedCount() int { - return nm.DeletionCounter -} - -func (nm NeedleMap) MaxFileKey() uint64 { - return nm.MaximumFileKey -} diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index 20d3a093f..9a9f63ddb 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -135,49 +135,37 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { return 0, fmt.Errorf("Unsupported Version! (%d)", version) } -func (n *Needle) Read(r *os.File, offset int64, size uint32, version Version) (ret int, err error) { +func ReadNeedleBlob(r *os.File, offset int64, size uint32) (bytes []byte, err error) { + padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize) + bytes = make([]byte, NeedleHeaderSize+size+NeedleChecksumSize+padding) + _, err = r.ReadAt(bytes, offset) + return +} + +func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { + bytes, err := ReadNeedleBlob(r, offset, size) + if err != nil { + return err + } + n.ParseNeedleHeader(bytes) + if n.Size != size { + return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size) + } switch version { case Version1: - bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) - if ret, err = r.ReadAt(bytes, offset); err != nil { - return - } - n.readNeedleHeader(bytes) n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] - checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) - newChecksum := NewCRC(n.Data) - if checksum != newChecksum.Value() { - return 0, errors.New("CRC error! Data On Disk Corrupted") - } - n.Checksum = newChecksum - return case Version2: - if size == 0 { - return 0, nil - } - bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) - if ret, err = r.ReadAt(bytes, offset); err != nil { - return - } - if ret != int(NeedleHeaderSize+size+NeedleChecksumSize) { - return 0, errors.New("File Entry Not Found") - } - n.readNeedleHeader(bytes) - if n.Size != size { - return 0, fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size) - } n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)]) - checksum := util.BytesToUint32(bytes[NeedleHeaderSize+n.Size : NeedleHeaderSize+n.Size+NeedleChecksumSize]) - newChecksum := NewCRC(n.Data) - if checksum != newChecksum.Value() { - return 0, errors.New("CRC Found Data On Disk Corrupted") - } - n.Checksum = newChecksum - return } - return 0, fmt.Errorf("Unsupported Version! (%d)", version) + checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) + newChecksum := NewCRC(n.Data) + if checksum != newChecksum.Value() { + return errors.New("CRC error! Data On Disk Corrupted") + } + n.Checksum = newChecksum + return nil } -func (n *Needle) readNeedleHeader(bytes []byte) { +func (n *Needle) ParseNeedleHeader(bytes []byte) { n.Cookie = util.BytesToUint32(bytes[0:4]) n.Id = util.BytesToUint64(bytes[4:12]) n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize]) @@ -228,7 +216,7 @@ func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bod if count <= 0 || err != nil { return nil, 0, err } - n.readNeedleHeader(bytes) + n.ParseNeedleHeader(bytes) padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize) bodyLength = n.Size + NeedleChecksumSize + padding } diff --git a/go/storage/store.go b/go/storage/store.go index 54a6f9c69..425675c8b 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -371,9 +371,9 @@ func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { } return 0, nil } -func (s *Store) Read(i VolumeId, n *Needle) (int, error) { +func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) { if v := s.findVolume(i); v != nil { - return v.read(n) + return v.readNeedle(n) } return 0, fmt.Errorf("Volume %v not found!", i) } diff --git a/go/storage/volume.go b/go/storage/volume.go index e4cebea7c..0e6cadecc 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -54,6 +54,9 @@ func (v *Volume) FileName() (fileName string) { } return } +func (v *Volume) DataFile() *os.File { + return v.dataFile +} func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error { var e error fileName := v.FileName() @@ -152,7 +155,7 @@ func (v *Volume) isFileUnchanged(n *Needle) bool { nv, ok := v.nm.Get(n.Id) if ok && nv.Offset > 0 { oldNeedle := new(Needle) - _, err := oldNeedle.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) if err != nil { glog.V(0).Infof("Failed to check updated file %v", err) return false @@ -180,6 +183,30 @@ func (v *Volume) Destroy() (err error) { return } +// AppendBlob append a blob to end of the data file, used in replication +func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { + if v.readOnly { + err = fmt.Errorf("%s is read-only", v.dataFile.Name()) + return + } + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + if offset, err = v.dataFile.Seek(0, 2); err != nil { + glog.V(0).Infof("failed to seek the end of file: %v", err) + return + } + //ensure file writing starting from aligned positions + if offset%NeedlePaddingSize != 0 { + offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) + if offset, err = v.dataFile.Seek(offset, 0); err != nil { + glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err) + return + } + } + v.dataFile.Write(b) + return +} + func (v *Volume) write(n *Needle) (size uint32, err error) { glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String()) if v.readOnly { @@ -250,17 +277,19 @@ func (v *Volume) delete(n *Needle) (uint32, error) { return 0, nil } -func (v *Volume) read(n *Needle) (int, error) { +// read fills in Needle content by looking up n.Id from NeedleMapper +func (v *Volume) readNeedle(n *Needle) (int, error) { nv, ok := v.nm.Get(n.Id) if !ok || nv.Offset == 0 { return -1, errors.New("Not Found") } - bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) if err != nil { - return bytesRead, err + return 0, err } + bytesRead := len(n.Data) if !n.HasTtl() { - return bytesRead, err + return bytesRead, nil } ttlMinutes := n.Ttl.Minutes() if ttlMinutes == 0 { diff --git a/go/storage/volume_sync.go b/go/storage/volume_sync.go new file mode 100644 index 000000000..2c72d62f0 --- /dev/null +++ b/go/storage/volume_sync.go @@ -0,0 +1,213 @@ +package storage + +import ( + "fmt" + "io" + "io/ioutil" + "net/url" + "os" + "sort" + "strconv" + + "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/operation" + "github.com/chrislusf/seaweedfs/go/util" +) + +// The volume sync with a master volume via 2 steps: +// 1. The slave checks master side to find subscription checkpoint +// to setup the replication. +// 2. The slave receives the updates from master + +/* +Assume the slave volume needs to follow the master volume. + +The master volume could be compacted, and could be many files ahead of +slave volume. + +Step 1: +The slave volume will ask the master volume for a snapshot +of (existing file entries, last offset, number of compacted times). + +For each entry x in master existing file entries: + if x does not exist locally: + add x locally + +For each entry y in local slave existing file entries: + if y does not exist on master: + delete y locally + +Step 2: +After this, use the last offset and number of compacted times to request +the master volume to send a new file, and keep looping. If the number of +compacted times is changed, go back to step 1 (very likely this can be +optimized more later). + +*/ + +func (v *Volume) Synchronize(volumeServer string) (err error) { + var lastCompactRevision uint16 = 0 + var compactRevision uint16 = 0 + var masterMap CompactMap + for i := 0; i < 3; i++ { + if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil { + return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err) + } + if lastCompactRevision != compactRevision && lastCompactRevision != 0 { + if err = v.Compact(); err != nil { + return fmt.Errorf("Compact Volume before synchronizing %v", err) + } + if err = v.commitCompact(); err != nil { + return fmt.Errorf("Commit Compact before synchronizing %v", err) + } + } + lastCompactRevision = compactRevision + if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil { + return + } + } + return +} + +type ByOffset []NeedleValue + +func (a ByOffset) Len() int { return len(a) } +func (a ByOffset) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset } + +// trySynchronizing sync with remote volume server incrementally by +// make up the local and remote delta. +func (v *Volume) trySynchronizing(volumeServer string, masterMap CompactMap, compactRevision uint16) error { + slaveIdxFile, err := os.Open(v.nm.IndexFileName()) + if err != nil { + return fmt.Errorf("Open volume %d index file: %v", v.Id, err) + } + defer slaveIdxFile.Close() + slaveMap, err := LoadNeedleMap(slaveIdxFile) + if err != nil { + return fmt.Errorf("Load volume %d index file: %v", v.Id, err) + } + var delta []NeedleValue + if err := masterMap.Visit(func(needleValue NeedleValue) error { + if needleValue.Key == 0 { + return nil + } + if _, ok := slaveMap.Get(uint64(needleValue.Key)); ok { + return nil // skip intersection + } + delta = append(delta, needleValue) + return nil + }); err != nil { + return fmt.Errorf("Add master entry: %v", err) + } + if err := slaveMap.m.Visit(func(needleValue NeedleValue) error { + if needleValue.Key == 0 { + return nil + } + if _, ok := masterMap.Get(needleValue.Key); ok { + return nil // skip intersection + } + needleValue.Size = 0 + delta = append(delta, needleValue) + return nil + }); err != nil { + return fmt.Errorf("Remove local entry: %v", err) + } + + // simulate to same ordering of remote .dat file needle entries + sort.Sort(ByOffset(delta)) + + // make up the delta + fetchCount := 0 + volumeDataContentHandlerUrl := "http://" + volumeServer + "/admin/sync/data" + for _, needleValue := range delta { + if needleValue.Size == 0 { + // remove file entry from local + v.removeNeedle(needleValue.Key) + continue + } + // add master file entry to local data file + if err := v.fetchNeedle(volumeDataContentHandlerUrl, needleValue, compactRevision); err != nil { + glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err) + return err + } + fetchCount++ + } + glog.V(1).Infof("Fetched %d needles from %s", fetchCount, volumeServer) + return nil +} + +func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, lastOffset uint64, compactRevision uint16, err error) { + m = NewCompactMap() + + syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, vid.String()) + if err != nil { + return m, 0, 0, err + } + + total := 0 + err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) { + // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size) + if offset != 0 && size != 0 { + m.Set(Key(key), offset, size) + } else { + m.Delete(Key(key)) + } + total++ + }) + + glog.V(2).Infof("server %s volume %d, entries %d, last offset %d, revision %d", volumeServer, vid, total, syncStatus.TailOffset, syncStatus.CompactRevision) + return m, syncStatus.TailOffset, syncStatus.CompactRevision, err + +} + +func (v *Volume) GetVolumeSyncStatus() operation.SyncVolumeResponse { + var syncStatus = operation.SyncVolumeResponse{} + if stat, err := v.dataFile.Stat(); err == nil { + syncStatus.TailOffset = uint64(stat.Size()) + } + syncStatus.IdxFileSize = v.nm.IndexFileSize() + syncStatus.CompactRevision = v.SuperBlock.CompactRevision + syncStatus.Ttl = v.SuperBlock.Ttl.String() + syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String() + return syncStatus +} + +func (v *Volume) IndexFileContent() ([]byte, error) { + return v.nm.IndexFileContent() +} + +// removeNeedle removes one needle by needle key +func (v *Volume) removeNeedle(key Key) { + n := new(Needle) + n.Id = uint64(key) + v.delete(n) +} + +// fetchNeedle fetches a remote volume needle by vid, id, offset +// The compact revision is checked first in case the remote volume +// is compacted and the offset is invalid any more. +func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string, + needleValue NeedleValue, compactRevision uint16) error { + // add master file entry to local data file + values := make(url.Values) + values.Add("revision", strconv.Itoa(int(compactRevision))) + values.Add("volume", v.Id.String()) + values.Add("id", needleValue.Key.String()) + values.Add("offset", strconv.FormatUint(uint64(needleValue.Offset), 10)) + values.Add("size", strconv.FormatUint(uint64(needleValue.Size), 10)) + glog.V(4).Infof("Fetch %+v", needleValue) + return util.GetUrlStream(volumeDataContentHandlerUrl, values, func(r io.Reader) error { + b, err := ioutil.ReadAll(r) + if err != nil { + return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err) + } + offset, err := v.AppendBlob(b) + if err != nil { + return fmt.Errorf("Appending volume %d error: %v", v.Id, err) + } + // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size) + v.nm.Put(uint64(needleValue.Key), uint32(offset/NeedlePaddingSize), needleValue.Size) + return nil + }) +} diff --git a/go/util/http_util.go b/go/util/http_util.go index 10e781140..7854302ab 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -3,6 +3,7 @@ package util import ( "bytes" "fmt" + "io" "io/ioutil" "net/http" "net/url" @@ -84,6 +85,43 @@ func Delete(url string, jwt security.EncodedJwt) error { return nil } +func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error { + r, err := client.PostForm(url, values) + if err != nil { + return err + } + defer r.Body.Close() + if r.StatusCode != 200 { + return fmt.Errorf("%s: %s", url, r.Status) + } + bufferSize := len(allocatedBytes) + for { + n, err := r.Body.Read(allocatedBytes) + if n == bufferSize { + eachBuffer(allocatedBytes) + } + if err != nil { + if err == io.EOF { + return nil + } + return err + } + } + return nil +} + +func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error { + r, err := client.PostForm(url, values) + if err != nil { + return err + } + defer r.Body.Close() + if r.StatusCode != 200 { + return fmt.Errorf("%s: %s", url, r.Status) + } + return readFn(r.Body) +} + func DownloadUrl(fileUrl string) (filename string, content []byte, e error) { response, err := client.Get(fileUrl) if err != nil { diff --git a/go/weed/backup.go b/go/weed/backup.go new file mode 100644 index 000000000..5e51a8b03 --- /dev/null +++ b/go/weed/backup.go @@ -0,0 +1,90 @@ +package main + +import ( + "fmt" + + "github.com/chrislusf/seaweedfs/go/operation" + "github.com/chrislusf/seaweedfs/go/storage" +) + +var ( + s BackupOptions +) + +type BackupOptions struct { + master *string + collection *string + dir *string + volumeId *int +} + +func init() { + cmdBackup.Run = runBackup // break init cycle + s.master = cmdBackup.Flag.String("server", "localhost:9333", "SeaweedFS master location") + s.collection = cmdBackup.Flag.String("collection", "", "collection name") + s.dir = cmdBackup.Flag.String("dir", ".", "directory to store volume data files") + s.volumeId = cmdBackup.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.") +} + +var cmdBackup = &Command{ + UsageLine: "backup -dir=. -volumeId=234 -server=localhost:9333", + Short: "incrementally backup a volume to local folder", + Long: `Incrementally backup volume data. + + It is expected that you use this inside a script, to loop through + all possible volume ids that needs to be backup to local folder. + + The volume id does not need to exist locally or even remotely. + This will help to backup future new volumes. + + Usually backing up is just copying the .dat (and .idx) files. + But it's tricky to incremententally copy the differences. + + The complexity comes when there are multiple addition, deletion and compaction. + This tool will handle them correctly and efficiently, avoiding unnecessary data transporation. + `, +} + +func runBackup(cmd *Command, args []string) bool { + if *s.volumeId == -1 { + return false + } + vid := storage.VolumeId(*s.volumeId) + + // find volume location, replication, ttl info + lookup, err := operation.Lookup(*s.master, vid.String()) + if err != nil { + fmt.Printf("Error looking up volume %d: %v\n", vid, err) + return true + } + volumeServer := lookup.Locations[0].Url + + stats, err := operation.GetVolumeSyncStatus(volumeServer, vid.String()) + if err != nil { + fmt.Printf("Error get volume %d status: %v\n", vid, err) + return true + } + ttl, err := storage.ReadTTL(stats.Ttl) + if err != nil { + fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err) + return true + } + replication, err := storage.NewReplicaPlacementFromString(stats.Replication) + if err != nil { + fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err) + return true + } + + v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl) + if err != nil { + fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) + return true + } + + if err := v.Synchronize(volumeServer); err != nil { + fmt.Printf("Error synchronizing volume %d: %v\n", vid, err) + return true + } + + return true +} diff --git a/go/weed/compact.go b/go/weed/compact.go index a599f5d64..673b96901 100644 --- a/go/weed/compact.go +++ b/go/weed/compact.go @@ -7,7 +7,6 @@ import ( func init() { cmdCompact.Run = runCompact // break init cycle - cmdCompact.IsDebug = cmdCompact.Flag.Bool("debug", false, "enable debug mode") } var cmdCompact = &Command{ diff --git a/go/weed/weed.go b/go/weed/weed.go index b3dd61616..49fe17eaa 100644 --- a/go/weed/weed.go +++ b/go/weed/weed.go @@ -21,6 +21,7 @@ var server *string var commands = []*Command{ cmdBenchmark, + cmdBackup, cmdCompact, cmdFix, cmdServer, diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index 6e0a7f536..703536c7a 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -51,6 +51,9 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/vacuum/compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler)) adminMux.HandleFunc("/admin/vacuum/commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler)) adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler)) + adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) + adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) + adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) diff --git a/go/weed/weed_server/volume_server_handlers_read.go b/go/weed/weed_server/volume_server_handlers_read.go index a8156e85b..86dfee560 100644 --- a/go/weed/weed_server/volume_server_handlers_read.go +++ b/go/weed/weed_server/volume_server_handlers_read.go @@ -47,7 +47,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } cookie := n.Cookie - count, e := vs.store.Read(volumeId, n) + count, e := vs.store.ReadVolumeNeedle(volumeId, n) glog.V(4).Infoln("read bytes", count, "error", e) if e != nil || count <= 0 { glog.V(0).Infoln("read error:", e, r.URL.Path) diff --git a/go/weed/weed_server/volume_server_handlers_sync.go b/go/weed/weed_server/volume_server_handlers_sync.go new file mode 100644 index 000000000..c650e5f53 --- /dev/null +++ b/go/weed/weed_server/volume_server_handlers_sync.go @@ -0,0 +1,86 @@ +package weed_server + +import ( + "fmt" + "net/http" + + "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/storage" + "github.com/chrislusf/seaweedfs/go/util" +) + +func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) { + v, err := vs.getVolume("volume", r) + if v == nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + syncStat := v.GetVolumeSyncStatus() + if syncStat.Error != "" { + writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Get Volume %d status error: %s", v.Id, syncStat.Error)) + glog.V(2).Infoln("getVolumeSyncStatusHandler volume =", r.FormValue("volume"), ", error =", err) + } else { + writeJsonQuiet(w, r, http.StatusOK, syncStat) + } +} + +func (vs *VolumeServer) getVolumeIndexContentHandler(w http.ResponseWriter, r *http.Request) { + v, err := vs.getVolume("volume", r) + if v == nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + content, err := v.IndexFileContent() + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + w.Write(content) +} + +func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *http.Request) { + v, err := vs.getVolume("volume", r) + if v == nil { + writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("Not Found volume: %v", err)) + return + } + if int(v.SuperBlock.CompactRevision) != util.ParseInt(r.FormValue("revision"), 0) { + writeJsonError(w, r, http.StatusExpectationFailed, fmt.Errorf("Requested Volume Revision is %s, but current revision is %d", r.FormValue("revision"), v.SuperBlock.CompactRevision)) + return + } + offset := uint32(util.ParseUint64(r.FormValue("offset"), 0)) + size := uint32(util.ParseUint64(r.FormValue("size"), 0)) + content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + + id := util.ParseUint64(r.FormValue("id"), 0) + n := new(storage.Needle) + n.ParseNeedleHeader(content) + if id != n.Id { + writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id)) + return + } + + w.Write(content) +} + +func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) { + volumeIdString := r.FormValue(volumeParameterName) + if volumeIdString == "" { + err := fmt.Errorf("Empty Volume Id: Need to pass in %s=the_volume_id.", volumeParameterName) + return nil, err + } + vid, err := storage.NewVolumeId(volumeIdString) + if err != nil { + err = fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) + return nil, err + } + v := vs.store.GetVolume(vid) + if v == nil { + return nil, fmt.Errorf("Not Found Volume Id %s: %d", volumeIdString, vid) + } + return v, nil +} diff --git a/go/weed/weed_server/volume_server_handlers_write.go b/go/weed/weed_server/volume_server_handlers_write.go index fdede5562..2f7e79ce9 100644 --- a/go/weed/weed_server/volume_server_handlers_write.go +++ b/go/weed/weed_server/volume_server_handlers_write.go @@ -53,7 +53,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { glog.V(2).Infoln("deleting", n) cookie := n.Cookie - count, ok := vs.store.Read(volumeId, n) + count, ok := vs.store.ReadVolumeNeedle(volumeId, n) if ok != nil { m := make(map[string]uint32) @@ -94,7 +94,7 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques n.ParsePath(id_cookie) glog.V(4).Infoln("batch deleting", n) cookie := n.Cookie - if _, err := vs.store.Read(volumeId, n); err != nil { + if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil { ret = append(ret, operation.DeleteResult{Fid: fid, Error: err.Error()}) continue }