diff --git a/weed/command/export.go b/weed/command/export.go index 0f7496472..72fcae9fb 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -14,6 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/types" ) const ( @@ -157,7 +158,7 @@ func runExport(cmd *Command, args []string) bool { type nameParams struct { Name string - Id uint64 + Id types.NeedleId Mime string Key string Ext string diff --git a/weed/command/filer.go b/weed/command/filer.go index f6478c105..cddad4c0d 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -88,15 +88,17 @@ func (fo *FilerOptions) start() { masters := *f.masters + println("*f.dirListingLimit", *f.dirListingLimit) + fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ Masters: strings.Split(masters, ","), - Collection: *fo.collection, - DefaultReplication: *fo.defaultReplicaPlacement, - RedirectOnRead: *fo.redirectOnRead, - DisableDirListing: *fo.disableDirListing, - MaxMB: *fo.maxMB, - SecretKey: *fo.secretKey, - DirListingLimit: *fo.dirListingLimit, + Collection: *f.collection, + DefaultReplication: *f.defaultReplicaPlacement, + RedirectOnRead: *f.redirectOnRead, + DisableDirListing: *f.disableDirListing, + MaxMB: *f.maxMB, + SecretKey: *f.secretKey, + DirListingLimit: *f.dirListingLimit, }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) diff --git a/weed/command/fix.go b/weed/command/fix.go index f3103c6c2..32b09fd72 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/types" ) func init() { @@ -54,11 +55,11 @@ func runFix(cmd *Command, args []string) bool { }, false, func(n *storage.Needle, offset int64) error { glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped()) if n.Size > 0 { - pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size) + pe := nm.Put(n.Id, types.Offset(offset/types.NeedlePaddingSize), n.Size) glog.V(2).Infof("saved %d with error %v", n.Size, pe) } else { glog.V(2).Infof("skipping deleted file ...") - return nm.Delete(n.Id, uint32(offset/storage.NeedlePaddingSize)) + return nm.Delete(n.Id, types.Offset(offset/types.NeedlePaddingSize)) } return nil }) diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go index b7a727fc7..5be312450 100644 --- a/weed/operation/sync_volume.go +++ b/weed/operation/sync_volume.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" + . "github.com/chrislusf/seaweedfs/weed/storage/types" ) type SyncVolumeResponse struct { @@ -37,14 +38,14 @@ func GetVolumeSyncStatus(server string, vid string) (*SyncVolumeResponse, error) return &ret, nil } -func GetVolumeIdxEntries(server string, vid string, eachEntryFn func(key uint64, offset, size uint32)) error { +func GetVolumeIdxEntries(server string, vid string, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error { values := make(url.Values) values.Add("volume", vid) - line := make([]byte, 16) + line := make([]byte, NeedleEntrySize) err := util.GetBufferStream("http://"+server+"/admin/sync/index", values, line, func(bytes []byte) { - key := util.BytesToUint64(bytes[:8]) - offset := util.BytesToUint32(bytes[8:12]) - size := util.BytesToUint32(bytes[12:16]) + key := BytesToNeedleId(line[:NeedleIdSize]) + offset := BytesToOffset(line[NeedleIdSize:NeedleIdSize+OffsetSize]) + size := util.BytesToUint32(line[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize]) eachEntryFn(key, offset, size) }) if err != nil { diff --git a/weed/server/volume_server_handlers_sync.go b/weed/server/volume_server_handlers_sync.go index df1fde590..38adfe870 100644 --- a/weed/server/volume_server_handlers_sync.go +++ b/weed/server/volume_server_handlers_sync.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/storage/types" ) func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) { @@ -50,13 +51,17 @@ func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *ht } offset := uint32(util.ParseUint64(r.FormValue("offset"), 0)) size := uint32(util.ParseUint64(r.FormValue("size"), 0)) - content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size) + content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*types.NeedlePaddingSize, size) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return } - id := util.ParseUint64(r.FormValue("id"), 0) + id, err := types.ParseNeedleId(r.FormValue("id")) + if err != nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } n := new(storage.Needle) n.ParseNeedleHeader(content) if id != n.Id { diff --git a/weed/storage/file_id.go b/weed/storage/file_id.go index 4cfdb16fa..0871bfb25 100644 --- a/weed/storage/file_id.go +++ b/weed/storage/file_id.go @@ -2,40 +2,27 @@ package storage import ( "encoding/hex" - "errors" - "strings" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" + . "github.com/chrislusf/seaweedfs/weed/storage/types" ) type FileId struct { VolumeId VolumeId - Key uint64 - Hashcode uint32 + Key NeedleId + Cookie Cookie } func NewFileIdFromNeedle(VolumeId VolumeId, n *Needle) *FileId { - return &FileId{VolumeId: VolumeId, Key: n.Id, Hashcode: n.Cookie} -} -func NewFileId(VolumeId VolumeId, Key uint64, Hashcode uint32) *FileId { - return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} + return &FileId{VolumeId: VolumeId, Key: n.Id, Cookie: n.Cookie} } -func ParseFileId(fid string) (*FileId, error) { - a := strings.Split(fid, ",") - if len(a) != 2 { - glog.V(1).Infoln("Invalid fid ", fid, ", split length ", len(a)) - return nil, errors.New("Invalid fid " + fid) - } - vid_string, key_hash_string := a[0], a[1] - volumeId, _ := NewVolumeId(vid_string) - key, hash, e := ParseKeyHash(key_hash_string) - return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}, e + +func NewFileId(VolumeId VolumeId, key uint64, cookie uint32) *FileId { + return &FileId{VolumeId: VolumeId, Key: Uint64ToNeedleId(key), Cookie: Uint32ToCookie(cookie)} } + func (n *FileId) String() string { - bytes := make([]byte, 12) - util.Uint64toBytes(bytes[0:8], n.Key) - util.Uint32toBytes(bytes[8:12], n.Hashcode) + bytes := make([]byte, NeedleIdSize+CookieSize) + NeedleIdToBytes(bytes[0:NeedleIdSize], n.Key) + CookieToBytes(bytes[NeedleIdSize:NeedleIdSize+CookieSize], n.Cookie) nonzero_index := 0 for ; bytes[nonzero_index] == 0; nonzero_index++ { } diff --git a/weed/storage/needle.go b/weed/storage/needle.go index 2ffaff4de..39952a3f9 100644 --- a/weed/storage/needle.go +++ b/weed/storage/needle.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "math" "mime" "net/http" "path" @@ -15,15 +14,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/operation" + . "github.com/chrislusf/seaweedfs/weed/storage/types" ) const ( - NeedleHeaderSize = 16 //should never change this - NeedlePaddingSize = 8 - NeedleChecksumSize = 4 - MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 - TombstoneFileSize = math.MaxUint32 - PairNamePrefix = "Seaweed-" + NeedleChecksumSize = 4 + PairNamePrefix = "Seaweed-" ) /* @@ -31,18 +27,18 @@ const ( * Needle file size is limited to 4GB for now. */ type Needle struct { - Cookie uint32 `comment:"random number to mitigate brute force lookups"` - Id uint64 `comment:"needle id"` - Size uint32 `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"` + Cookie Cookie `comment:"random number to mitigate brute force lookups"` + Id NeedleId `comment:"needle id"` + Size uint32 `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"` DataSize uint32 `comment:"Data size"` //version2 Data []byte `comment:"The actual file data"` - Flags byte `comment:"boolean flags"` //version2 - NameSize uint8 //version2 + Flags byte `comment:"boolean flags"` //version2 + NameSize uint8 //version2 Name []byte `comment:"maximum 256 characters"` //version2 - MimeSize uint8 //version2 + MimeSize uint8 //version2 Mime []byte `comment:"maximum 256 characters"` //version2 - PairsSize uint16 //version2 + PairsSize uint16 //version2 Pairs []byte `comment:"additional name value pairs, json format, maximum 64kB"` LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk Ttl *TTL @@ -213,7 +209,7 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { dotSep := strings.LastIndex(r.URL.Path, ".") fid := r.URL.Path[commaSep+1:] if dotSep > 0 { - fid = r.URL.Path[commaSep+1 : dotSep] + fid = r.URL.Path[commaSep+1: dotSep] } e = n.ParsePath(fid) @@ -222,7 +218,7 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { } func (n *Needle) ParsePath(fid string) (err error) { length := len(fid) - if length <= 8 { + if length <= CookieSize*2 { return fmt.Errorf("Invalid fid: %s", fid) } delta := "" @@ -230,13 +226,13 @@ func (n *Needle) ParsePath(fid string) (err error) { if deltaIndex > 0 { fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:] } - n.Id, n.Cookie, err = ParseKeyHash(fid) + n.Id, n.Cookie, err = ParseNeedleIdCookie(fid) if err != nil { return err } if delta != "" { if d, e := strconv.ParseUint(delta, 10, 64); e == nil { - n.Id += d + n.Id += NeedleId(d) } else { return e } @@ -244,21 +240,21 @@ func (n *Needle) ParsePath(fid string) (err error) { return err } -func ParseKeyHash(key_hash_string string) (uint64, uint32, error) { - if len(key_hash_string) <= 8 { +func ParseNeedleIdCookie(key_hash_string string) (NeedleId, Cookie, error) { + if len(key_hash_string) <= CookieSize*2 { return 0, 0, fmt.Errorf("KeyHash is too short.") } - if len(key_hash_string) > 24 { + if len(key_hash_string) > (NeedleIdSize+CookieSize)*2 { return 0, 0, fmt.Errorf("KeyHash is too long.") } - split := len(key_hash_string) - 8 - key, err := strconv.ParseUint(key_hash_string[:split], 16, 64) + split := len(key_hash_string) - CookieSize*2 + needleId, err := ParseNeedleId(key_hash_string[:split]) if err != nil { - return 0, 0, fmt.Errorf("Parse key error: %v", err) + return 0, 0, fmt.Errorf("Parse needleId error: %v", err) } - hash, err := strconv.ParseUint(key_hash_string[split:], 16, 32) + cookie, err := ParseCookie(key_hash_string[split:]) if err != nil { - return 0, 0, fmt.Errorf("Parse hash error: %v", err) + return 0, 0, fmt.Errorf("Parse cookie error: %v", err) } - return key, uint32(hash), nil + return needleId, cookie, nil } diff --git a/weed/storage/needle/btree_map.go b/weed/storage/needle/btree_map.go index 64c0bacc1..a7c9982ac 100644 --- a/weed/storage/needle/btree_map.go +++ b/weed/storage/needle/btree_map.go @@ -2,6 +2,7 @@ package needle import ( "github.com/google/btree" + . "github.com/chrislusf/seaweedfs/weed/storage/types" ) //This map assumes mostly inserting increasing keys @@ -15,7 +16,7 @@ func NewBtreeMap() *BtreeMap { } } -func (cm *BtreeMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) { +func (cm *BtreeMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) { found := cm.tree.ReplaceOrInsert(NeedleValue{key, offset, size}) if found != nil { old := found.(NeedleValue) @@ -24,7 +25,7 @@ func (cm *BtreeMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32 return } -func (cm *BtreeMap) Delete(key Key) (oldSize uint32) { +func (cm *BtreeMap) Delete(key NeedleId) (oldSize uint32) { found := cm.tree.Delete(NeedleValue{key, 0, 0}) if found != nil { old := found.(NeedleValue) @@ -32,7 +33,7 @@ func (cm *BtreeMap) Delete(key Key) (oldSize uint32) { } return } -func (cm *BtreeMap) Get(key Key) (*NeedleValue, bool) { +func (cm *BtreeMap) Get(key NeedleId) (*NeedleValue, bool) { found := cm.tree.Get(NeedleValue{key, 0, 0}) if found != nil { old := found.(NeedleValue) diff --git a/weed/storage/needle/compact_map.go b/weed/storage/needle/compact_map.go index ea2360fa7..7b653d838 100644 --- a/weed/storage/needle/compact_map.go +++ b/weed/storage/needle/compact_map.go @@ -2,27 +2,28 @@ package needle import ( "sync" + . "github.com/chrislusf/seaweedfs/weed/storage/types" ) type CompactSection struct { sync.RWMutex values []NeedleValue - overflow map[Key]NeedleValue - start Key - end Key + overflow map[NeedleId]NeedleValue + start NeedleId + end NeedleId counter int } -func NewCompactSection(start Key) *CompactSection { +func NewCompactSection(start NeedleId) *CompactSection { return &CompactSection{ values: make([]NeedleValue, batch), - overflow: make(map[Key]NeedleValue), + overflow: make(map[NeedleId]NeedleValue), start: start, } } //return old entry size -func (cs *CompactSection) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) { +func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) { cs.Lock() if key > cs.end { cs.end = key @@ -52,7 +53,7 @@ func (cs *CompactSection) Set(key Key, offset, size uint32) (oldOffset, oldSize } //return old entry size -func (cs *CompactSection) Delete(key Key) uint32 { +func (cs *CompactSection) Delete(key NeedleId) uint32 { cs.Lock() ret := uint32(0) if i := cs.binarySearchValues(key); i >= 0 { @@ -68,7 +69,7 @@ func (cs *CompactSection) Delete(key Key) uint32 { cs.Unlock() return ret } -func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) { +func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) { cs.RLock() if v, ok := cs.overflow[key]; ok { cs.RUnlock() @@ -81,7 +82,7 @@ func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) { cs.RUnlock() return nil, false } -func (cs *CompactSection) binarySearchValues(key Key) int { +func (cs *CompactSection) binarySearchValues(key NeedleId) int { l, h := 0, cs.counter-1 if h >= 0 && cs.values[h].Key < key { return -2 @@ -112,7 +113,7 @@ func NewCompactMap() *CompactMap { return &CompactMap{} } -func (cm *CompactMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) { +func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) { x := cm.binarySearchCompactSection(key) if x < 0 { //println(x, "creating", len(cm.list), "section, starting", key) @@ -130,21 +131,21 @@ func (cm *CompactMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint } return cm.list[x].Set(key, offset, size) } -func (cm *CompactMap) Delete(key Key) uint32 { +func (cm *CompactMap) Delete(key NeedleId) uint32 { x := cm.binarySearchCompactSection(key) if x < 0 { return uint32(0) } return cm.list[x].Delete(key) } -func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) { +func (cm *CompactMap) Get(key NeedleId) (*NeedleValue, bool) { x := cm.binarySearchCompactSection(key) if x < 0 { return nil, false } return cm.list[x].Get(key) } -func (cm *CompactMap) binarySearchCompactSection(key Key) int { +func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int { l, h := 0, len(cm.list)-1 if h < 0 { return -5 diff --git a/weed/storage/needle/compact_map_perf_test.go b/weed/storage/needle/compact_map_perf_test.go index 8a26e7ed3..59dd7f7f0 100644 --- a/weed/storage/needle/compact_map_perf_test.go +++ b/weed/storage/needle/compact_map_perf_test.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" + . "github.com/chrislusf/seaweedfs/weed/storage/types" ) func TestMemoryUsage(t *testing.T) { @@ -29,11 +30,11 @@ func loadNewNeedleMap(file *os.File) { } for count > 0 && e == nil { for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) + key := util.BytesToUint64(bytes[i: i+8]) + offset := util.BytesToUint32(bytes[i+8: i+12]) + size := util.BytesToUint32(bytes[i+12: i+16]) if offset > 0 { - m.Set(Key(key), offset, size) + m.Set(NeedleId(key), offset, size) } else { //delete(m, key) } diff --git a/weed/storage/needle/compact_map_test.go b/weed/storage/needle/compact_map_test.go index 4d574bafe..3c5c90fff 100644 --- a/weed/storage/needle/compact_map_test.go +++ b/weed/storage/needle/compact_map_test.go @@ -2,16 +2,17 @@ package needle import ( "testing" + . "github.com/chrislusf/seaweedfs/weed/storage/types" ) func TestIssue52(t *testing.T) { m := NewCompactMap() - m.Set(Key(10002), 10002, 10002) - if element, ok := m.Get(Key(10002)); ok { + m.Set(NeedleId(10002), 10002, 10002) + if element, ok := m.Get(NeedleId(10002)); ok { println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size) } - m.Set(Key(10001), 10001, 10001) - if element, ok := m.Get(Key(10002)); ok { + m.Set(NeedleId(10001), 10001, 10001) + if element, ok := m.Get(NeedleId(10002)); ok { println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size) } else { t.Fatal("key 10002 missing after setting 10001") @@ -21,15 +22,15 @@ func TestIssue52(t *testing.T) { func TestXYZ(t *testing.T) { m := NewCompactMap() for i := uint32(0); i < 100*batch; i += 2 { - m.Set(Key(i), i, i) + m.Set(NeedleId(i), i, i) } for i := uint32(0); i < 100*batch; i += 37 { - m.Delete(Key(i)) + m.Delete(NeedleId(i)) } for i := uint32(0); i < 10*batch; i += 3 { - m.Set(Key(i), i+11, i+5) + m.Set(NeedleId(i), i+11, i+5) } // for i := uint32(0); i < 100; i++ { @@ -39,7 +40,7 @@ func TestXYZ(t *testing.T) { // } for i := uint32(0); i < 10*batch; i++ { - v, ok := m.Get(Key(i)) + v, ok := m.Get(NeedleId(i)) if i%3 == 0 { if !ok { t.Fatal("key", i, "missing!") @@ -59,7 +60,7 @@ func TestXYZ(t *testing.T) { } for i := uint32(10 * batch); i < 100*batch; i++ { - v, ok := m.Get(Key(i)) + v, ok := m.Get(NeedleId(i)) if i%37 == 0 { if ok && v.Size > 0 { t.Fatal("key", i, "should have been deleted needle value", v) diff --git a/weed/storage/needle/needle_value.go b/weed/storage/needle/needle_value.go index 137ab0814..8fd7b1b1c 100644 --- a/weed/storage/needle/needle_value.go +++ b/weed/storage/needle/needle_value.go @@ -1,9 +1,8 @@ package needle import ( - "strconv" - "github.com/google/btree" + . "github.com/chrislusf/seaweedfs/weed/storage/types" ) const ( @@ -11,8 +10,8 @@ const ( ) type NeedleValue struct { - Key Key - Offset uint32 `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G + Key NeedleId + Offset Offset `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G Size uint32 `comment:"Size of the data portion"` } @@ -20,9 +19,3 @@ func (this NeedleValue) Less(than btree.Item) bool { that := than.(NeedleValue) return this.Key < that.Key } - -type Key uint64 - -func (k Key) String() string { - return strconv.FormatUint(uint64(k), 10) -} diff --git a/weed/storage/needle/needle_value_map.go b/weed/storage/needle/needle_value_map.go index 81f41b235..9da257443 100644 --- a/weed/storage/needle/needle_value_map.go +++ b/weed/storage/needle/needle_value_map.go @@ -1,8 +1,12 @@ package needle +import ( + . "github.com/chrislusf/seaweedfs/weed/storage/types" +) + type NeedleValueMap interface { - Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) - Delete(key Key) uint32 - Get(key Key) (*NeedleValue, bool) + Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) + Delete(key NeedleId) uint32 + Get(key NeedleId) (*NeedleValue, bool) Visit(visit func(NeedleValue) error) error } diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 16ce458e7..ce2079e89 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/chrislusf/seaweedfs/weed/storage/needle" + . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -19,21 +20,17 @@ const ( NeedleMapBtree ) -const ( - NeedleIndexSize = 16 -) - type NeedleMapper interface { - Put(key uint64, offset uint32, size uint32) error - Get(key uint64) (element *needle.NeedleValue, ok bool) - Delete(key uint64, offset uint32) error + Put(key NeedleId, offset Offset, size uint32) error + Get(key NeedleId) (element *needle.NeedleValue, ok bool) + Delete(key NeedleId, offset Offset) error Close() Destroy() error ContentSize() uint64 DeletedSize() uint64 FileCount() int DeletedCount() int - MaxFileKey() uint64 + MaxFileKey() NeedleId IndexFileSize() uint64 IndexFileContent() ([]byte, error) IndexFileName() string @@ -58,17 +55,17 @@ func (nm *baseNeedleMapper) IndexFileName() string { return nm.indexFile.Name() } -func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) { - key = util.BytesToUint64(bytes[:8]) - offset = util.BytesToUint32(bytes[8:12]) - size = util.BytesToUint32(bytes[12:16]) +func IdxFileEntry(bytes []byte) (key NeedleId, offset Offset, size uint32) { + key = BytesToNeedleId(bytes[:NeedleIdSize]) + offset = BytesToOffset(bytes[NeedleIdSize:NeedleIdSize+OffsetSize]) + size = util.BytesToUint32(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize]) return } -func (nm *baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error { - bytes := make([]byte, 16) - util.Uint64toBytes(bytes[0:8], key) - util.Uint32toBytes(bytes[8:12], offset) - util.Uint32toBytes(bytes[12:16], size) +func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size uint32) error { + bytes := make([]byte, NeedleIdSize+OffsetSize+SizeSize) + NeedleIdToBytes(bytes[0:NeedleIdSize], key) + OffsetToBytes(bytes[NeedleIdSize:NeedleIdSize+OffsetSize], offset) + util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], size) nm.indexFileAccessLock.Lock() defer nm.indexFileAccessLock.Unlock() diff --git a/weed/storage/needle_map_boltdb.go b/weed/storage/needle_map_boltdb.go index 021dd6aa9..08897e55f 100644 --- a/weed/storage/needle_map_boltdb.go +++ b/weed/storage/needle_map_boltdb.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/needle" + . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -64,7 +65,7 @@ func generateBoltDbFile(dbFileName string, indexFile *os.File) error { return err } defer db.Close() - return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { + return WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size uint32) error { if offset > 0 && size != TombstoneFileSize { boltDbWrite(db, key, offset, size) } else { @@ -74,10 +75,11 @@ func generateBoltDbFile(dbFileName string, indexFile *os.File) error { }) } -func (m *BoltDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) { - var offset, size uint32 - bytes := make([]byte, 8) - util.Uint64toBytes(bytes, key) +func (m *BoltDbNeedleMap) Get(key NeedleId) (element *needle.NeedleValue, ok bool) { + var offset Offset + var size uint32 + bytes := make([]byte, NeedleIdSize) + NeedleIdToBytes(bytes, key) err := m.db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket(boltdbBucket) if bucket == nil { @@ -86,13 +88,13 @@ func (m *BoltDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) data := bucket.Get(bytes) - if len(data) != 8 { + if len(data) != OffsetSize+SizeSize { glog.V(0).Infof("wrong data length: %d", len(data)) return fmt.Errorf("wrong data length: %d", len(data)) } - offset = util.BytesToUint32(data[0:4]) - size = util.BytesToUint32(data[4:8]) + offset = BytesToOffset(data[0:OffsetSize]) + size = util.BytesToUint32(data[OffsetSize:OffsetSize+SizeSize]) return nil }) @@ -100,10 +102,10 @@ func (m *BoltDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) if err != nil { return nil, false } - return &needle.NeedleValue{Key: needle.Key(key), Offset: offset, Size: size}, true + return &needle.NeedleValue{Key: NeedleId(key), Offset: offset, Size: size}, true } -func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { +func (m *BoltDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error { var oldSize uint32 if oldNeedle, ok := m.Get(key); ok { oldSize = oldNeedle.Size @@ -117,27 +119,29 @@ func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { } func boltDbWrite(db *bolt.DB, - key uint64, offset uint32, size uint32) error { - bytes := make([]byte, 16) - util.Uint64toBytes(bytes[0:8], key) - util.Uint32toBytes(bytes[8:12], offset) - util.Uint32toBytes(bytes[12:16], size) + key NeedleId, offset Offset, size uint32) error { + + bytes := make([]byte, NeedleIdSize+OffsetSize+SizeSize) + NeedleIdToBytes(bytes[0:NeedleIdSize], key) + OffsetToBytes(bytes[NeedleIdSize:NeedleIdSize+OffsetSize], offset) + util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], size) + return db.Update(func(tx *bolt.Tx) error { bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) if err != nil { return err } - err = bucket.Put(bytes[0:8], bytes[8:16]) + err = bucket.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize]) if err != nil { return err } return nil }) } -func boltDbDelete(db *bolt.DB, key uint64) error { - bytes := make([]byte, 8) - util.Uint64toBytes(bytes, key) +func boltDbDelete(db *bolt.DB, key NeedleId) error { + bytes := make([]byte, NeedleIdSize) + NeedleIdToBytes(bytes, key) return db.Update(func(tx *bolt.Tx) error { bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) if err != nil { @@ -152,7 +156,7 @@ func boltDbDelete(db *bolt.DB, key uint64) error { }) } -func (m *BoltDbNeedleMap) Delete(key uint64, offset uint32) error { +func (m *BoltDbNeedleMap) Delete(key NeedleId, offset Offset) error { if oldNeedle, ok := m.Get(key); ok { m.logDelete(oldNeedle.Size) } diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index c3e474033..1af88e545 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/needle" + . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" ) @@ -62,7 +63,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error { return err } defer db.Close() - return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { + return WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size uint32) error { if offset > 0 && size != TombstoneFileSize { levelDbWrite(db, key, offset, size) } else { @@ -72,19 +73,19 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error { }) } -func (m *LevelDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) { - bytes := make([]byte, 8) - util.Uint64toBytes(bytes, key) +func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle.NeedleValue, ok bool) { + bytes := make([]byte, NeedleIdSize) + NeedleIdToBytes(bytes[0:NeedleIdSize], key) data, err := m.db.Get(bytes, nil) - if err != nil || len(data) != 8 { + if err != nil || len(data) != OffsetSize+SizeSize { return nil, false } - offset := util.BytesToUint32(data[0:4]) - size := util.BytesToUint32(data[4:8]) - return &needle.NeedleValue{Key: needle.Key(key), Offset: offset, Size: size}, true + offset := BytesToOffset(data[0:OffsetSize]) + size := util.BytesToUint32(data[OffsetSize:OffsetSize+SizeSize]) + return &needle.NeedleValue{Key: NeedleId(key), Offset: offset, Size: size}, true } -func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { +func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error { var oldSize uint32 if oldNeedle, ok := m.Get(key); ok { oldSize = oldNeedle.Size @@ -98,23 +99,25 @@ func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { } func levelDbWrite(db *leveldb.DB, - key uint64, offset uint32, size uint32) error { - bytes := make([]byte, 16) - util.Uint64toBytes(bytes[0:8], key) - util.Uint32toBytes(bytes[8:12], offset) - util.Uint32toBytes(bytes[12:16], size) - if err := db.Put(bytes[0:8], bytes[8:16], nil); err != nil { + key NeedleId, offset Offset, size uint32) error { + + bytes := make([]byte, NeedleIdSize+OffsetSize+SizeSize) + NeedleIdToBytes(bytes[0:NeedleIdSize], key) + OffsetToBytes(bytes[NeedleIdSize:NeedleIdSize+OffsetSize], offset) + util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], size) + + if err := db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil { return fmt.Errorf("failed to write leveldb: %v", err) } return nil } -func levelDbDelete(db *leveldb.DB, key uint64) error { - bytes := make([]byte, 8) - util.Uint64toBytes(bytes, key) +func levelDbDelete(db *leveldb.DB, key NeedleId) error { + bytes := make([]byte, NeedleIdSize) + NeedleIdToBytes(bytes, key) return db.Delete(bytes, nil) } -func (m *LevelDbNeedleMap) Delete(key uint64, offset uint32) error { +func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error { if oldNeedle, ok := m.Get(key); ok { m.logDelete(oldNeedle.Size) } diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index cfdffe3d6..690ddd737 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/needle" + . "github.com/chrislusf/seaweedfs/weed/storage/types" ) type NeedleMap struct { @@ -45,21 +46,21 @@ func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) { } func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { - e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { + e := WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error { if key > nm.MaximumFileKey { nm.MaximumFileKey = key } if offset > 0 && size != TombstoneFileSize { nm.FileCounter++ nm.FileByteCounter = nm.FileByteCounter + uint64(size) - oldOffset, oldSize := nm.m.Set(needle.Key(key), offset, size) + oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size) // glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) if oldOffset > 0 && oldSize != TombstoneFileSize { nm.DeletionCounter++ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } } else { - oldSize := nm.m.Delete(needle.Key(key)) + oldSize := nm.m.Delete(NeedleId(key)) // glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) nm.DeletionCounter++ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) @@ -72,21 +73,22 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { // walks through the index file, calls fn function with each key, offset, size // stops with the error returned by the fn function -func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error { +func WalkIndexFile(r *os.File, fn func(key NeedleId, offset Offset, size uint32) error) error { var readerOffset int64 - bytes := make([]byte, NeedleIndexSize*RowsToRead) + bytes := make([]byte, NeedleEntrySize*RowsToRead) count, e := r.ReadAt(bytes, readerOffset) glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) readerOffset += int64(count) var ( - key uint64 - offset, size uint32 - i int + key NeedleId + offset Offset + size uint32 + i int ) for count > 0 && e == nil || e == io.EOF { - for i = 0; i+NeedleIndexSize <= count; i += NeedleIndexSize { - key, offset, size = idxFileEntry(bytes[i: i+NeedleIndexSize]) + for i = 0; i+NeedleEntrySize <= count; i += NeedleEntrySize { + key, offset, size = IdxFileEntry(bytes[i: i+NeedleEntrySize]) if e = fn(key, offset, size); e != nil { return e } @@ -101,17 +103,17 @@ func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e return e } -func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error { - _, oldSize := nm.m.Set(needle.Key(key), offset, size) +func (nm *NeedleMap) Put(key NeedleId, offset Offset, size uint32) error { + _, oldSize := nm.m.Set(NeedleId(key), offset, size) nm.logPut(key, oldSize, size) return nm.appendToIndexFile(key, offset, size) } -func (nm *NeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) { - element, ok = nm.m.Get(needle.Key(key)) +func (nm *NeedleMap) Get(key NeedleId) (element *needle.NeedleValue, ok bool) { + element, ok = nm.m.Get(NeedleId(key)) return } -func (nm *NeedleMap) Delete(key uint64, offset uint32) error { - deletedBytes := nm.m.Delete(needle.Key(key)) +func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error { + deletedBytes := nm.m.Delete(NeedleId(key)) nm.logDelete(deletedBytes) return nm.appendToIndexFile(key, offset, TombstoneFileSize) } diff --git a/weed/storage/needle_map_metric.go b/weed/storage/needle_map_metric.go index f43b29c59..793a9ea10 100644 --- a/weed/storage/needle_map_metric.go +++ b/weed/storage/needle_map_metric.go @@ -5,15 +5,15 @@ import ( "os" "github.com/willf/bloom" "github.com/chrislusf/seaweedfs/weed/glog" - "encoding/binary" + . "github.com/chrislusf/seaweedfs/weed/storage/types" ) type mapMetric struct { - DeletionCounter int `json:"DeletionCounter"` - FileCounter int `json:"FileCounter"` - DeletionByteCounter uint64 `json:"DeletionByteCounter"` - FileByteCounter uint64 `json:"FileByteCounter"` - MaximumFileKey uint64 `json:"MaxFileKey"` + DeletionCounter int `json:"DeletionCounter"` + FileCounter int `json:"FileCounter"` + DeletionByteCounter uint64 `json:"DeletionByteCounter"` + FileByteCounter uint64 `json:"FileByteCounter"` + MaximumFileKey NeedleId `json:"MaxFileKey"` } func (mm *mapMetric) logDelete(deletedByteCount uint32) { @@ -21,7 +21,7 @@ func (mm *mapMetric) logDelete(deletedByteCount uint32) { mm.DeletionCounter++ } -func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) { +func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) { if key > mm.MaximumFileKey { mm.MaximumFileKey = key } @@ -45,23 +45,22 @@ func (mm mapMetric) FileCount() int { func (mm mapMetric) DeletedCount() int { return mm.DeletionCounter } -func (mm mapMetric) MaxFileKey() uint64 { +func (mm mapMetric) MaxFileKey() NeedleId { return mm.MaximumFileKey } func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) { mm = &mapMetric{} var bf *bloom.BloomFilter - buf := make([]byte, 8) + buf := make([]byte, NeedleIdSize) err = reverseWalkIndexFile(r, func(entryCount int64) { bf = bloom.NewWithEstimates(uint(entryCount), 0.001) - }, func(key uint64, offset, size uint32) error { + }, func(key NeedleId, offset Offset, size uint32) error { if key > mm.MaximumFileKey { mm.MaximumFileKey = key } - - binary.BigEndian.PutUint64(buf, key) + NeedleIdToBytes(buf, key) if size != TombstoneFileSize { mm.FileByteCounter += uint64(size) } @@ -82,23 +81,23 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) { return } -func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key uint64, offset, size uint32) error) error { +func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key NeedleId, offset Offset, size uint32) error) error { fi, err := r.Stat() if err != nil { return fmt.Errorf("file %s stat error: %v", r.Name(), err) } fileSize := fi.Size() - if fileSize%NeedleIndexSize != 0 { + if fileSize%NeedleEntrySize != 0 { return fmt.Errorf("unexpected file %s size: %d", r.Name(), fileSize) } - initFn(fileSize / NeedleIndexSize) + initFn(fileSize / NeedleEntrySize) - bytes := make([]byte, NeedleIndexSize) - for readerOffset := fileSize - NeedleIndexSize; readerOffset >= 0; readerOffset -= NeedleIndexSize { + bytes := make([]byte, NeedleEntrySize) + for readerOffset := fileSize - NeedleEntrySize; readerOffset >= 0; readerOffset -= NeedleEntrySize { count, e := r.ReadAt(bytes, readerOffset) glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) - key, offset, size := idxFileEntry(bytes) + key, offset, size := IdxFileEntry(bytes) if e = fn(key, offset, size); e != nil { return e } diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go index 4241f0758..a62539afc 100644 --- a/weed/storage/needle_read_write.go +++ b/weed/storage/needle_read_write.go @@ -6,6 +6,7 @@ import ( "io" "os" + . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -43,27 +44,27 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i } switch version { case Version1: - header := make([]byte, NeedleHeaderSize) - util.Uint32toBytes(header[0:4], n.Cookie) - util.Uint64toBytes(header[4:12], n.Id) + header := make([]byte, NeedleEntrySize) + CookieToBytes(header[0:CookieSize], n.Cookie) + NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id) n.Size = uint32(len(n.Data)) size = n.Size - util.Uint32toBytes(header[12:16], n.Size) + util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) if _, err = w.Write(header); err != nil { return } if _, err = w.Write(n.Data); err != nil { return } - actualSize = NeedleHeaderSize + int64(n.Size) - padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) + actualSize = NeedleEntrySize + int64(n.Size) + padding := NeedlePaddingSize - ((NeedleEntrySize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) - _, err = w.Write(header[0 : NeedleChecksumSize+padding]) + _, err = w.Write(header[0: NeedleChecksumSize+padding]) return case Version2: - header := make([]byte, NeedleHeaderSize) - util.Uint32toBytes(header[0:4], n.Cookie) - util.Uint64toBytes(header[4:12], n.Id) + header := make([]byte, NeedleEntrySize) + CookieToBytes(header[0:CookieSize], n.Cookie) + NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id) n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime)) if n.DataSize > 0 { n.Size = 4 + n.DataSize + 1 @@ -86,7 +87,7 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i n.Size = 0 } size = n.DataSize - util.Uint32toBytes(header[12:16], n.Size) + util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) if _, err = w.Write(header); err != nil { return } @@ -122,7 +123,7 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i } if n.HasLastModifiedDate() { util.Uint64toBytes(header[0:8], n.LastModified) - if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil { + if _, err = w.Write(header[8-LastModifiedBytesLength: 8]); err != nil { return } } @@ -142,9 +143,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i } } } - padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) + padding := NeedlePaddingSize - ((NeedleEntrySize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) - _, err = w.Write(header[0 : NeedleChecksumSize+padding]) + _, err = w.Write(header[0: NeedleChecksumSize+padding]) return n.DataSize, getActualSize(n.Size), err } @@ -152,7 +153,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i } func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, err error) { - return getBytesForFileBlock(r, offset, int(getActualSize(size))) + dataSlice = make([]byte, int(getActualSize(size))) + _, err = r.ReadAt(dataSlice, offset) + return dataSlice, err } func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { @@ -166,14 +169,14 @@ func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version } switch version { case Version1: - n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] + n.Data = bytes[NeedleEntrySize: NeedleEntrySize+size] case Version2: - n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)]) + n.readNeedleDataVersion2(bytes[NeedleEntrySize: NeedleEntrySize+int(n.Size)]) } if size == 0 { return nil } - checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) + checksum := util.BytesToUint32(bytes[NeedleEntrySize+size: NeedleEntrySize+size+NeedleChecksumSize]) newChecksum := NewCRC(n.Data) if checksum != newChecksum.Value() { return errors.New("CRC error! Data On Disk Corrupted") @@ -181,22 +184,24 @@ func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version n.Checksum = newChecksum return nil } + func (n *Needle) ParseNeedleHeader(bytes []byte) { - n.Cookie = util.BytesToUint32(bytes[0:4]) - n.Id = util.BytesToUint64(bytes[4:12]) - n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize]) + n.Cookie = BytesToCookie(bytes[0:CookieSize]) + n.Id = BytesToNeedleId(bytes[CookieSize:CookieSize+NeedleIdSize]) + n.Size = util.BytesToUint32(bytes[CookieSize+NeedleIdSize:NeedleEntrySize]) } + func (n *Needle) readNeedleDataVersion2(bytes []byte) { index, lenBytes := 0, len(bytes) if index < lenBytes { - n.DataSize = util.BytesToUint32(bytes[index : index+4]) + n.DataSize = util.BytesToUint32(bytes[index: index+4]) index = index + 4 if int(n.DataSize)+index > lenBytes { // this if clause is due to bug #87 and #93, fixed in v0.69 // remove this clause later return } - n.Data = bytes[index : index+int(n.DataSize)] + n.Data = bytes[index: index+int(n.DataSize)] index = index + int(n.DataSize) n.Flags = bytes[index] index = index + 1 @@ -204,25 +209,25 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) { if index < lenBytes && n.HasName() { n.NameSize = uint8(bytes[index]) index = index + 1 - n.Name = bytes[index : index+int(n.NameSize)] + n.Name = bytes[index: index+int(n.NameSize)] index = index + int(n.NameSize) } if index < lenBytes && n.HasMime() { n.MimeSize = uint8(bytes[index]) index = index + 1 - n.Mime = bytes[index : index+int(n.MimeSize)] + n.Mime = bytes[index: index+int(n.MimeSize)] index = index + int(n.MimeSize) } if index < lenBytes && n.HasLastModifiedDate() { - n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength]) + n.LastModified = util.BytesToUint64(bytes[index: index+LastModifiedBytesLength]) index = index + LastModifiedBytesLength } if index < lenBytes && n.HasTtl() { - n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) + n.Ttl = LoadTTLFromBytes(bytes[index: index+TtlBytesLength]) index = index + TtlBytesLength } if index < lenBytes && n.HasPairs() { - n.PairsSize = util.BytesToUint16(bytes[index : index+2]) + n.PairsSize = util.BytesToUint16(bytes[index: index+2]) index += 2 end := index + int(n.PairsSize) n.Pairs = bytes[index:end] @@ -230,25 +235,25 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) { } } -func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) { +func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength int64, err error) { n = new(Needle) if version == Version1 || version == Version2 { - bytes := make([]byte, NeedleHeaderSize) + bytes := make([]byte, NeedleEntrySize) var count int count, err = r.ReadAt(bytes, offset) if count <= 0 || err != nil { return nil, 0, err } n.ParseNeedleHeader(bytes) - padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize) - bodyLength = n.Size + NeedleChecksumSize + padding + padding := NeedlePaddingSize - ((n.Size + NeedleEntrySize + NeedleChecksumSize) % NeedlePaddingSize) + bodyLength = int64(n.Size) + NeedleChecksumSize + int64(padding) } return } //n should be a needle already read the header //the input stream will read until next file entry -func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength uint32) (err error) { +func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength int64) (err error) { if bodyLength <= 0 { return nil } diff --git a/weed/storage/needle_test.go b/weed/storage/needle_test.go index c05afda2f..f2b578aa6 100644 --- a/weed/storage/needle_test.go +++ b/weed/storage/needle_test.go @@ -26,7 +26,7 @@ func TestParseKeyHash(t *testing.T) { } for _, tc := range testcases { - if id, cookie, err := ParseKeyHash(tc.KeyHash); err != nil && !tc.Err { + if id, cookie, err := ParseNeedleIdCookie(tc.KeyHash); err != nil && !tc.Err { t.Fatalf("Parse %s error: %v", tc.KeyHash, err) } else if err == nil && tc.Err { t.Fatalf("Parse %s expected error got nil", tc.KeyHash) @@ -40,6 +40,6 @@ func BenchmarkParseKeyHash(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - ParseKeyHash("4ed44ed44ed44ed4c8116e41") + ParseNeedleIdCookie("4ed44ed44ed44ed4c8116e41") } } diff --git a/weed/storage/store.go b/weed/storage/store.go index 84ed1951d..ef055ee59 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -5,6 +5,7 @@ import ( "strconv" "strings" + . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" ) @@ -160,7 +161,7 @@ func (s *Store) SetRack(rack string) { func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { var volumeMessages []*master_pb.VolumeInformationMessage maxVolumeCount := 0 - var maxFileKey uint64 + var maxFileKey NeedleId for _, location := range s.Locations { maxVolumeCount = maxVolumeCount + location.MaxVolumeCount location.Lock() @@ -199,7 +200,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { Port: uint32(s.Port), PublicUrl: s.PublicUrl, MaxVolumeCount: uint32(maxVolumeCount), - MaxFileKey: maxFileKey, + MaxFileKey: NeedleIdToUint64(maxFileKey), DataCenter: s.dataCenter, Rack: s.rack, Volumes: volumeMessages, diff --git a/weed/storage/types/needle_types.go b/weed/storage/types/needle_types.go new file mode 100644 index 000000000..6e092f100 --- /dev/null +++ b/weed/storage/types/needle_types.go @@ -0,0 +1,79 @@ +package types + +import ( + "math" + "github.com/chrislusf/seaweedfs/weed/util" + "strconv" + "fmt" +) + +type NeedleId uint64 +type Offset uint32 +type Cookie uint32 + +const ( + NeedleIdSize = 8 + OffsetSize = 4 + SizeSize = 4 // uint32 size + NeedleEntrySize = NeedleIdSize + OffsetSize + SizeSize + NeedlePaddingSize = 8 + MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 + TombstoneFileSize = math.MaxUint32 + CookieSize = 4 +) + +func NeedleIdToBytes(bytes []byte, needleId NeedleId) { + util.Uint64toBytes(bytes, uint64(needleId)) +} + +// NeedleIdToUint64 used to send max needle id to master +func NeedleIdToUint64(needleId NeedleId) uint64 { + return uint64(needleId) +} + +func Uint64ToNeedleId(needleId uint64) (NeedleId) { + return NeedleId(needleId) +} + +func BytesToNeedleId(bytes []byte) (NeedleId) { + return NeedleId(util.BytesToUint64(bytes)) +} + +func CookieToBytes(bytes []byte, cookie Cookie) { + util.Uint32toBytes(bytes, uint32(cookie)) +} +func Uint32ToCookie(cookie uint32) (Cookie) { + return Cookie(cookie) +} + +func BytesToCookie(bytes []byte) (Cookie) { + return Cookie(util.BytesToUint32(bytes[0:4])) +} + +func OffsetToBytes(bytes []byte, offset Offset) { + util.Uint32toBytes(bytes, uint32(offset)) +} + +func BytesToOffset(bytes []byte) (Offset) { + return Offset(util.BytesToUint32(bytes[0:4])) +} + +func (k NeedleId) String() string { + return strconv.FormatUint(uint64(k), 10) +} + +func ParseNeedleId(idString string) (NeedleId, error) { + key, err := strconv.ParseUint(idString, 16, 64) + if err != nil { + return 0, fmt.Errorf("needle id %s format error: %v", idString, err) + } + return NeedleId(key), nil +} + +func ParseCookie(cookieString string) (Cookie, error) { + cookie, err := strconv.ParseUint(cookieString, 16, 32) + if err != nil { + return 0, fmt.Errorf("needle cookie %s format error: %v", cookieString, err) + } + return Cookie(cookie), nil +} diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 5603a878b..c928ae9a2 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -4,12 +4,13 @@ import ( "fmt" "os" + . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) func getActualSize(size uint32) int64 { - padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize) - return NeedleHeaderSize + int64(size) + NeedleChecksumSize + int64(padding) + padding := NeedlePaddingSize - ((NeedleEntrySize + size + NeedleChecksumSize) % NeedlePaddingSize) + return NeedleEntrySize + int64(size) + NeedleChecksumSize + int64(padding) } func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error { @@ -22,10 +23,10 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error { return nil } var lastIdxEntry []byte - if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleIndexSize); e != nil { + if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleEntrySize); e != nil { return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) } - key, offset, size := idxFileEntry(lastIdxEntry) + key, offset, size := IdxFileEntry(lastIdxEntry) if offset == 0 || size == TombstoneFileSize { return nil } @@ -38,7 +39,7 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error { func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) { if indexSize, err = util.GetFileSize(indexFile); err == nil { - if indexSize%NeedleIndexSize != 0 { + if indexSize%NeedleEntrySize != 0 { err = fmt.Errorf("index file's size is %d bytes, maybe corrupted", indexSize) } } @@ -50,12 +51,12 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err err = fmt.Errorf("offset %d for index file is invalid", offset) return } - bytes = make([]byte, NeedleIndexSize) + bytes = make([]byte, NeedleEntrySize) _, err = indexFile.ReadAt(bytes, offset) return } -func verifyNeedleIntegrity(datFile *os.File, v Version, offset int64, key uint64, size uint32) error { +func verifyNeedleIntegrity(datFile *os.File, v Version, offset int64, key NeedleId, size uint32) error { n := new(Needle) err := n.ReadData(datFile, offset, size, v) if err != nil { diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 6572ea6c7..dc43c488a 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -9,6 +9,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" + . "github.com/chrislusf/seaweedfs/weed/storage/types" ) // isFileUnchanged checks whether this needle to write is same as last one. @@ -109,7 +110,7 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) { nv, ok := v.nm.Get(n.Id) if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { - if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { + if err = v.nm.Put(n.Id, Offset(offset/NeedlePaddingSize), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) } } @@ -134,7 +135,7 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) { if err != nil { return size, err } - if err := v.nm.Delete(n.Id, uint32(offset/NeedlePaddingSize)); err != nil { + if err := v.nm.Delete(n.Id, Offset(offset/NeedlePaddingSize)); err != nil { return size, err } n.Data = nil @@ -197,7 +198,7 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId, } for n != nil { if readNeedleBody { - if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil { + if err = n.ReadNeedleBody(v.dataFile, version, offset+NeedleEntrySize, rest); err != nil { glog.V(0).Infof("cannot read needle body: %v", err) //err = fmt.Errorf("cannot read needle body: %v", err) //return @@ -207,9 +208,9 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId, // fixed in v0.69 // remove this whole "if" clause later, long after 0.69 oldRest, oldSize := rest, n.Size - padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize) + padding := NeedlePaddingSize - ((n.Size + NeedleEntrySize + NeedleChecksumSize) % NeedlePaddingSize) n.Size = 0 - rest = n.Size + NeedleChecksumSize + padding + rest = int64(n.Size + NeedleChecksumSize + padding) if rest%NeedlePaddingSize != 0 { rest += (NeedlePaddingSize - rest%NeedlePaddingSize) } @@ -219,7 +220,7 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId, if err = visitNeedle(n, offset); err != nil { glog.V(0).Infof("visit needle error: %v", err) } - offset += int64(NeedleHeaderSize) + int64(rest) + offset += NeedleEntrySize + rest glog.V(4).Infof("==> new entry offset %d", offset) if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil { if err == io.EOF { diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go index b934fc59d..e808f888f 100644 --- a/weed/storage/volume_sync.go +++ b/weed/storage/volume_sync.go @@ -12,6 +12,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/storage/needle" + . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -93,7 +94,7 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.Compact if needleValue.Key == 0 { return nil } - if _, ok := slaveMap.Get(uint64(needleValue.Key)); ok { + if _, ok := slaveMap.Get(needleValue.Key); ok { return nil // skip intersection } delta = append(delta, needleValue) @@ -147,12 +148,12 @@ func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m *needle.Compac } total := 0 - err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) { + err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key NeedleId, offset Offset, size uint32) { // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size) if offset > 0 && size != TombstoneFileSize { - m.Set(needle.Key(key), offset, size) + m.Set(NeedleId(key), offset, size) } else { - m.Delete(needle.Key(key)) + m.Delete(NeedleId(key)) } total++ }) @@ -179,9 +180,9 @@ func (v *Volume) IndexFileContent() ([]byte, error) { } // removeNeedle removes one needle by needle key -func (v *Volume) removeNeedle(key needle.Key) { +func (v *Volume) removeNeedle(key NeedleId) { n := new(Needle) - n.Id = uint64(key) + n.Id = key v.deleteNeedle(n) } @@ -208,7 +209,7 @@ func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string, return fmt.Errorf("Appending volume %d error: %v", v.Id, err) } // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size) - v.nm.Put(uint64(needleValue.Key), uint32(offset/NeedlePaddingSize), needleValue.Size) + v.nm.Put(needleValue.Key, Offset(offset/NeedlePaddingSize), needleValue.Size) return nil }) } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 98037066f..34b4cfe0d 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -5,6 +5,7 @@ import ( "os" "time" + . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -122,17 +123,17 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI } type keyField struct { - offset uint32 + offset Offset size uint32 } - incrementedHasUpdatedIndexEntry := make(map[uint64]keyField) + incrementedHasUpdatedIndexEntry := make(map[NeedleId]keyField) - for idx_offset := indexSize - NeedleIndexSize; uint64(idx_offset) >= v.lastCompactIndexOffset; idx_offset -= NeedleIndexSize { + for idx_offset := indexSize - NeedleEntrySize; uint64(idx_offset) >= v.lastCompactIndexOffset; idx_offset -= NeedleEntrySize { var IdxEntry []byte if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil { return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err) } - key, offset, size := idxFileEntry(IdxEntry) + key, offset, size := IdxFileEntry(IdxEntry) glog.V(4).Infof("key %d offset %d size %d", key, offset, size) if _, found := incrementedHasUpdatedIndexEntry[key]; !found { incrementedHasUpdatedIndexEntry[key] = keyField{ @@ -170,11 +171,11 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision) } - idx_entry_bytes := make([]byte, 16) + idx_entry_bytes := make([]byte, NeedleIdSize+OffsetSize+SizeSize) for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry { - util.Uint64toBytes(idx_entry_bytes[0:8], key) - util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset) - util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size) + NeedleIdToBytes(idx_entry_bytes[0:NeedleIdSize], key) + OffsetToBytes(idx_entry_bytes[NeedleIdSize:NeedleIdSize+OffsetSize], incre_idx_entry.offset) + util.Uint32toBytes(idx_entry_bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], incre_idx_entry.size) var offset int64 if offset, err = dst.Seek(0, 2); err != nil { @@ -255,7 +256,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca nv, ok := v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { - if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { + if err = nm.Put(n.Id, Offset(new_offset/NeedlePaddingSize), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } if _, _, err := n.Append(dst, v.Version()); err != nil { @@ -296,7 +297,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { dst.Write(v.SuperBlock.Bytes()) new_offset := int64(v.SuperBlock.BlockSize()) - WalkIndexFile(oldIndexFile, func(key uint64, offset, size uint32) error { + WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error { if offset == 0 || size == TombstoneFileSize { return nil } @@ -315,7 +316,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if nv.Offset == offset && nv.Size > 0 { - if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { + if err = nm.Put(n.Id, Offset(new_offset/NeedlePaddingSize), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } if _, _, err = n.Append(dst, v.Version()); err != nil {