Browse Source

refactoring: add type for needle id, offset

later the type size can possibly be adjusted
pull/680/head
Chris Lu 7 years ago
parent
commit
d4d7ced922
  1. 3
      weed/command/export.go
  2. 16
      weed/command/filer.go
  3. 5
      weed/command/fix.go
  4. 11
      weed/operation/sync_volume.go
  5. 9
      weed/server/volume_server_handlers_sync.go
  6. 35
      weed/storage/file_id.go
  7. 50
      weed/storage/needle.go
  8. 7
      weed/storage/needle/btree_map.go
  9. 27
      weed/storage/needle/compact_map.go
  10. 9
      weed/storage/needle/compact_map_perf_test.go
  11. 19
      weed/storage/needle/compact_map_test.go
  12. 13
      weed/storage/needle/needle_value.go
  13. 10
      weed/storage/needle/needle_value_map.go
  14. 31
      weed/storage/needle_map.go
  15. 44
      weed/storage/needle_map_boltdb.go
  16. 41
      weed/storage/needle_map_leveldb.go
  17. 34
      weed/storage/needle_map_memory.go
  18. 35
      weed/storage/needle_map_metric.go
  19. 71
      weed/storage/needle_read_write.go
  20. 4
      weed/storage/needle_test.go
  21. 5
      weed/storage/store.go
  22. 79
      weed/storage/types/needle_types.go
  23. 15
      weed/storage/volume_checking.go
  24. 13
      weed/storage/volume_read_write.go
  25. 15
      weed/storage/volume_sync.go
  26. 23
      weed/storage/volume_vacuum.go

3
weed/command/export.go

@ -14,6 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/types"
) )
const ( const (
@ -157,7 +158,7 @@ func runExport(cmd *Command, args []string) bool {
type nameParams struct { type nameParams struct {
Name string Name string
Id uint64
Id types.NeedleId
Mime string Mime string
Key string Key string
Ext string Ext string

16
weed/command/filer.go

@ -88,15 +88,17 @@ func (fo *FilerOptions) start() {
masters := *f.masters masters := *f.masters
println("*f.dirListingLimit", *f.dirListingLimit)
fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{
Masters: strings.Split(masters, ","), Masters: strings.Split(masters, ","),
Collection: *fo.collection,
DefaultReplication: *fo.defaultReplicaPlacement,
RedirectOnRead: *fo.redirectOnRead,
DisableDirListing: *fo.disableDirListing,
MaxMB: *fo.maxMB,
SecretKey: *fo.secretKey,
DirListingLimit: *fo.dirListingLimit,
Collection: *f.collection,
DefaultReplication: *f.defaultReplicaPlacement,
RedirectOnRead: *f.redirectOnRead,
DisableDirListing: *f.disableDirListing,
MaxMB: *f.maxMB,
SecretKey: *f.secretKey,
DirListingLimit: *f.dirListingLimit,
}) })
if nfs_err != nil { if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err) glog.Fatalf("Filer startup error: %v", nfs_err)

5
weed/command/fix.go

@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/types"
) )
func init() { func init() {
@ -54,11 +55,11 @@ func runFix(cmd *Command, args []string) bool {
}, false, func(n *storage.Needle, offset int64) error { }, false, func(n *storage.Needle, offset int64) error {
glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped()) glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped())
if n.Size > 0 { if n.Size > 0 {
pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size)
pe := nm.Put(n.Id, types.Offset(offset/types.NeedlePaddingSize), n.Size)
glog.V(2).Infof("saved %d with error %v", n.Size, pe) glog.V(2).Infof("saved %d with error %v", n.Size, pe)
} else { } else {
glog.V(2).Infof("skipping deleted file ...") glog.V(2).Infof("skipping deleted file ...")
return nm.Delete(n.Id, uint32(offset/storage.NeedlePaddingSize))
return nm.Delete(n.Id, types.Offset(offset/types.NeedlePaddingSize))
} }
return nil return nil
}) })

11
weed/operation/sync_volume.go

@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
type SyncVolumeResponse struct { type SyncVolumeResponse struct {
@ -37,14 +38,14 @@ func GetVolumeSyncStatus(server string, vid string) (*SyncVolumeResponse, error)
return &ret, nil return &ret, nil
} }
func GetVolumeIdxEntries(server string, vid string, eachEntryFn func(key uint64, offset, size uint32)) error {
func GetVolumeIdxEntries(server string, vid string, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
values := make(url.Values) values := make(url.Values)
values.Add("volume", vid) values.Add("volume", vid)
line := make([]byte, 16)
line := make([]byte, NeedleEntrySize)
err := util.GetBufferStream("http://"+server+"/admin/sync/index", values, line, func(bytes []byte) { err := util.GetBufferStream("http://"+server+"/admin/sync/index", values, line, func(bytes []byte) {
key := util.BytesToUint64(bytes[:8])
offset := util.BytesToUint32(bytes[8:12])
size := util.BytesToUint32(bytes[12:16])
key := BytesToNeedleId(line[:NeedleIdSize])
offset := BytesToOffset(line[NeedleIdSize:NeedleIdSize+OffsetSize])
size := util.BytesToUint32(line[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize])
eachEntryFn(key, offset, size) eachEntryFn(key, offset, size)
}) })
if err != nil { if err != nil {

9
weed/server/volume_server_handlers_sync.go

@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/storage/types"
) )
func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) {
@ -50,13 +51,17 @@ func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *ht
} }
offset := uint32(util.ParseUint64(r.FormValue("offset"), 0)) offset := uint32(util.ParseUint64(r.FormValue("offset"), 0))
size := uint32(util.ParseUint64(r.FormValue("size"), 0)) size := uint32(util.ParseUint64(r.FormValue("size"), 0))
content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size)
content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*types.NeedlePaddingSize, size)
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
return return
} }
id := util.ParseUint64(r.FormValue("id"), 0)
id, err := types.ParseNeedleId(r.FormValue("id"))
if err != nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
n := new(storage.Needle) n := new(storage.Needle)
n.ParseNeedleHeader(content) n.ParseNeedleHeader(content)
if id != n.Id { if id != n.Id {

35
weed/storage/file_id.go

@ -2,40 +2,27 @@ package storage
import ( import (
"encoding/hex" "encoding/hex"
"errors"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
type FileId struct { type FileId struct {
VolumeId VolumeId VolumeId VolumeId
Key uint64
Hashcode uint32
Key NeedleId
Cookie Cookie
} }
func NewFileIdFromNeedle(VolumeId VolumeId, n *Needle) *FileId { func NewFileIdFromNeedle(VolumeId VolumeId, n *Needle) *FileId {
return &FileId{VolumeId: VolumeId, Key: n.Id, Hashcode: n.Cookie}
}
func NewFileId(VolumeId VolumeId, Key uint64, Hashcode uint32) *FileId {
return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode}
return &FileId{VolumeId: VolumeId, Key: n.Id, Cookie: n.Cookie}
} }
func ParseFileId(fid string) (*FileId, error) {
a := strings.Split(fid, ",")
if len(a) != 2 {
glog.V(1).Infoln("Invalid fid ", fid, ", split length ", len(a))
return nil, errors.New("Invalid fid " + fid)
}
vid_string, key_hash_string := a[0], a[1]
volumeId, _ := NewVolumeId(vid_string)
key, hash, e := ParseKeyHash(key_hash_string)
return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}, e
func NewFileId(VolumeId VolumeId, key uint64, cookie uint32) *FileId {
return &FileId{VolumeId: VolumeId, Key: Uint64ToNeedleId(key), Cookie: Uint32ToCookie(cookie)}
} }
func (n *FileId) String() string { func (n *FileId) String() string {
bytes := make([]byte, 12)
util.Uint64toBytes(bytes[0:8], n.Key)
util.Uint32toBytes(bytes[8:12], n.Hashcode)
bytes := make([]byte, NeedleIdSize+CookieSize)
NeedleIdToBytes(bytes[0:NeedleIdSize], n.Key)
CookieToBytes(bytes[NeedleIdSize:NeedleIdSize+CookieSize], n.Cookie)
nonzero_index := 0 nonzero_index := 0
for ; bytes[nonzero_index] == 0; nonzero_index++ { for ; bytes[nonzero_index] == 0; nonzero_index++ {
} }

50
weed/storage/needle.go

@ -4,7 +4,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math"
"mime" "mime"
"net/http" "net/http"
"path" "path"
@ -15,15 +14,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
const ( const (
NeedleHeaderSize = 16 //should never change this
NeedlePaddingSize = 8
NeedleChecksumSize = 4
MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
TombstoneFileSize = math.MaxUint32
PairNamePrefix = "Seaweed-"
NeedleChecksumSize = 4
PairNamePrefix = "Seaweed-"
) )
/* /*
@ -31,18 +27,18 @@ const (
* Needle file size is limited to 4GB for now. * Needle file size is limited to 4GB for now.
*/ */
type Needle struct { type Needle struct {
Cookie uint32 `comment:"random number to mitigate brute force lookups"`
Id uint64 `comment:"needle id"`
Size uint32 `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"`
Cookie Cookie `comment:"random number to mitigate brute force lookups"`
Id NeedleId `comment:"needle id"`
Size uint32 `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"`
DataSize uint32 `comment:"Data size"` //version2 DataSize uint32 `comment:"Data size"` //version2
Data []byte `comment:"The actual file data"` Data []byte `comment:"The actual file data"`
Flags byte `comment:"boolean flags"` //version2
NameSize uint8 //version2
Flags byte `comment:"boolean flags"` //version2
NameSize uint8 //version2
Name []byte `comment:"maximum 256 characters"` //version2 Name []byte `comment:"maximum 256 characters"` //version2
MimeSize uint8 //version2
MimeSize uint8 //version2
Mime []byte `comment:"maximum 256 characters"` //version2 Mime []byte `comment:"maximum 256 characters"` //version2
PairsSize uint16 //version2
PairsSize uint16 //version2
Pairs []byte `comment:"additional name value pairs, json format, maximum 64kB"` Pairs []byte `comment:"additional name value pairs, json format, maximum 64kB"`
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
Ttl *TTL Ttl *TTL
@ -213,7 +209,7 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
dotSep := strings.LastIndex(r.URL.Path, ".") dotSep := strings.LastIndex(r.URL.Path, ".")
fid := r.URL.Path[commaSep+1:] fid := r.URL.Path[commaSep+1:]
if dotSep > 0 { if dotSep > 0 {
fid = r.URL.Path[commaSep+1 : dotSep]
fid = r.URL.Path[commaSep+1: dotSep]
} }
e = n.ParsePath(fid) e = n.ParsePath(fid)
@ -222,7 +218,7 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
} }
func (n *Needle) ParsePath(fid string) (err error) { func (n *Needle) ParsePath(fid string) (err error) {
length := len(fid) length := len(fid)
if length <= 8 {
if length <= CookieSize*2 {
return fmt.Errorf("Invalid fid: %s", fid) return fmt.Errorf("Invalid fid: %s", fid)
} }
delta := "" delta := ""
@ -230,13 +226,13 @@ func (n *Needle) ParsePath(fid string) (err error) {
if deltaIndex > 0 { if deltaIndex > 0 {
fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:] fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:]
} }
n.Id, n.Cookie, err = ParseKeyHash(fid)
n.Id, n.Cookie, err = ParseNeedleIdCookie(fid)
if err != nil { if err != nil {
return err return err
} }
if delta != "" { if delta != "" {
if d, e := strconv.ParseUint(delta, 10, 64); e == nil { if d, e := strconv.ParseUint(delta, 10, 64); e == nil {
n.Id += d
n.Id += NeedleId(d)
} else { } else {
return e return e
} }
@ -244,21 +240,21 @@ func (n *Needle) ParsePath(fid string) (err error) {
return err return err
} }
func ParseKeyHash(key_hash_string string) (uint64, uint32, error) {
if len(key_hash_string) <= 8 {
func ParseNeedleIdCookie(key_hash_string string) (NeedleId, Cookie, error) {
if len(key_hash_string) <= CookieSize*2 {
return 0, 0, fmt.Errorf("KeyHash is too short.") return 0, 0, fmt.Errorf("KeyHash is too short.")
} }
if len(key_hash_string) > 24 {
if len(key_hash_string) > (NeedleIdSize+CookieSize)*2 {
return 0, 0, fmt.Errorf("KeyHash is too long.") return 0, 0, fmt.Errorf("KeyHash is too long.")
} }
split := len(key_hash_string) - 8
key, err := strconv.ParseUint(key_hash_string[:split], 16, 64)
split := len(key_hash_string) - CookieSize*2
needleId, err := ParseNeedleId(key_hash_string[:split])
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("Parse key error: %v", err)
return 0, 0, fmt.Errorf("Parse needleId error: %v", err)
} }
hash, err := strconv.ParseUint(key_hash_string[split:], 16, 32)
cookie, err := ParseCookie(key_hash_string[split:])
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("Parse hash error: %v", err)
return 0, 0, fmt.Errorf("Parse cookie error: %v", err)
} }
return key, uint32(hash), nil
return needleId, cookie, nil
} }

7
weed/storage/needle/btree_map.go

@ -2,6 +2,7 @@ package needle
import ( import (
"github.com/google/btree" "github.com/google/btree"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
//This map assumes mostly inserting increasing keys //This map assumes mostly inserting increasing keys
@ -15,7 +16,7 @@ func NewBtreeMap() *BtreeMap {
} }
} }
func (cm *BtreeMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) {
func (cm *BtreeMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
found := cm.tree.ReplaceOrInsert(NeedleValue{key, offset, size}) found := cm.tree.ReplaceOrInsert(NeedleValue{key, offset, size})
if found != nil { if found != nil {
old := found.(NeedleValue) old := found.(NeedleValue)
@ -24,7 +25,7 @@ func (cm *BtreeMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32
return return
} }
func (cm *BtreeMap) Delete(key Key) (oldSize uint32) {
func (cm *BtreeMap) Delete(key NeedleId) (oldSize uint32) {
found := cm.tree.Delete(NeedleValue{key, 0, 0}) found := cm.tree.Delete(NeedleValue{key, 0, 0})
if found != nil { if found != nil {
old := found.(NeedleValue) old := found.(NeedleValue)
@ -32,7 +33,7 @@ func (cm *BtreeMap) Delete(key Key) (oldSize uint32) {
} }
return return
} }
func (cm *BtreeMap) Get(key Key) (*NeedleValue, bool) {
func (cm *BtreeMap) Get(key NeedleId) (*NeedleValue, bool) {
found := cm.tree.Get(NeedleValue{key, 0, 0}) found := cm.tree.Get(NeedleValue{key, 0, 0})
if found != nil { if found != nil {
old := found.(NeedleValue) old := found.(NeedleValue)

27
weed/storage/needle/compact_map.go

@ -2,27 +2,28 @@ package needle
import ( import (
"sync" "sync"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
type CompactSection struct { type CompactSection struct {
sync.RWMutex sync.RWMutex
values []NeedleValue values []NeedleValue
overflow map[Key]NeedleValue
start Key
end Key
overflow map[NeedleId]NeedleValue
start NeedleId
end NeedleId
counter int counter int
} }
func NewCompactSection(start Key) *CompactSection {
func NewCompactSection(start NeedleId) *CompactSection {
return &CompactSection{ return &CompactSection{
values: make([]NeedleValue, batch), values: make([]NeedleValue, batch),
overflow: make(map[Key]NeedleValue),
overflow: make(map[NeedleId]NeedleValue),
start: start, start: start,
} }
} }
//return old entry size //return old entry size
func (cs *CompactSection) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) {
func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
cs.Lock() cs.Lock()
if key > cs.end { if key > cs.end {
cs.end = key cs.end = key
@ -52,7 +53,7 @@ func (cs *CompactSection) Set(key Key, offset, size uint32) (oldOffset, oldSize
} }
//return old entry size //return old entry size
func (cs *CompactSection) Delete(key Key) uint32 {
func (cs *CompactSection) Delete(key NeedleId) uint32 {
cs.Lock() cs.Lock()
ret := uint32(0) ret := uint32(0)
if i := cs.binarySearchValues(key); i >= 0 { if i := cs.binarySearchValues(key); i >= 0 {
@ -68,7 +69,7 @@ func (cs *CompactSection) Delete(key Key) uint32 {
cs.Unlock() cs.Unlock()
return ret return ret
} }
func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) {
func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) {
cs.RLock() cs.RLock()
if v, ok := cs.overflow[key]; ok { if v, ok := cs.overflow[key]; ok {
cs.RUnlock() cs.RUnlock()
@ -81,7 +82,7 @@ func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) {
cs.RUnlock() cs.RUnlock()
return nil, false return nil, false
} }
func (cs *CompactSection) binarySearchValues(key Key) int {
func (cs *CompactSection) binarySearchValues(key NeedleId) int {
l, h := 0, cs.counter-1 l, h := 0, cs.counter-1
if h >= 0 && cs.values[h].Key < key { if h >= 0 && cs.values[h].Key < key {
return -2 return -2
@ -112,7 +113,7 @@ func NewCompactMap() *CompactMap {
return &CompactMap{} return &CompactMap{}
} }
func (cm *CompactMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint32) {
func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
x := cm.binarySearchCompactSection(key) x := cm.binarySearchCompactSection(key)
if x < 0 { if x < 0 {
//println(x, "creating", len(cm.list), "section, starting", key) //println(x, "creating", len(cm.list), "section, starting", key)
@ -130,21 +131,21 @@ func (cm *CompactMap) Set(key Key, offset, size uint32) (oldOffset, oldSize uint
} }
return cm.list[x].Set(key, offset, size) return cm.list[x].Set(key, offset, size)
} }
func (cm *CompactMap) Delete(key Key) uint32 {
func (cm *CompactMap) Delete(key NeedleId) uint32 {
x := cm.binarySearchCompactSection(key) x := cm.binarySearchCompactSection(key)
if x < 0 { if x < 0 {
return uint32(0) return uint32(0)
} }
return cm.list[x].Delete(key) return cm.list[x].Delete(key)
} }
func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) {
func (cm *CompactMap) Get(key NeedleId) (*NeedleValue, bool) {
x := cm.binarySearchCompactSection(key) x := cm.binarySearchCompactSection(key)
if x < 0 { if x < 0 {
return nil, false return nil, false
} }
return cm.list[x].Get(key) return cm.list[x].Get(key)
} }
func (cm *CompactMap) binarySearchCompactSection(key Key) int {
func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int {
l, h := 0, len(cm.list)-1 l, h := 0, len(cm.list)-1
if h < 0 { if h < 0 {
return -5 return -5

9
weed/storage/needle/compact_map_perf_test.go

@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
func TestMemoryUsage(t *testing.T) { func TestMemoryUsage(t *testing.T) {
@ -29,11 +30,11 @@ func loadNewNeedleMap(file *os.File) {
} }
for count > 0 && e == nil { for count > 0 && e == nil {
for i := 0; i < count; i += 16 { for i := 0; i < count; i += 16 {
key := util.BytesToUint64(bytes[i : i+8])
offset := util.BytesToUint32(bytes[i+8 : i+12])
size := util.BytesToUint32(bytes[i+12 : i+16])
key := util.BytesToUint64(bytes[i: i+8])
offset := util.BytesToUint32(bytes[i+8: i+12])
size := util.BytesToUint32(bytes[i+12: i+16])
if offset > 0 { if offset > 0 {
m.Set(Key(key), offset, size)
m.Set(NeedleId(key), offset, size)
} else { } else {
//delete(m, key) //delete(m, key)
} }

19
weed/storage/needle/compact_map_test.go

@ -2,16 +2,17 @@ package needle
import ( import (
"testing" "testing"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
func TestIssue52(t *testing.T) { func TestIssue52(t *testing.T) {
m := NewCompactMap() m := NewCompactMap()
m.Set(Key(10002), 10002, 10002)
if element, ok := m.Get(Key(10002)); ok {
m.Set(NeedleId(10002), 10002, 10002)
if element, ok := m.Get(NeedleId(10002)); ok {
println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size) println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size)
} }
m.Set(Key(10001), 10001, 10001)
if element, ok := m.Get(Key(10002)); ok {
m.Set(NeedleId(10001), 10001, 10001)
if element, ok := m.Get(NeedleId(10002)); ok {
println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size) println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size)
} else { } else {
t.Fatal("key 10002 missing after setting 10001") t.Fatal("key 10002 missing after setting 10001")
@ -21,15 +22,15 @@ func TestIssue52(t *testing.T) {
func TestXYZ(t *testing.T) { func TestXYZ(t *testing.T) {
m := NewCompactMap() m := NewCompactMap()
for i := uint32(0); i < 100*batch; i += 2 { for i := uint32(0); i < 100*batch; i += 2 {
m.Set(Key(i), i, i)
m.Set(NeedleId(i), i, i)
} }
for i := uint32(0); i < 100*batch; i += 37 { for i := uint32(0); i < 100*batch; i += 37 {
m.Delete(Key(i))
m.Delete(NeedleId(i))
} }
for i := uint32(0); i < 10*batch; i += 3 { for i := uint32(0); i < 10*batch; i += 3 {
m.Set(Key(i), i+11, i+5)
m.Set(NeedleId(i), i+11, i+5)
} }
// for i := uint32(0); i < 100; i++ { // for i := uint32(0); i < 100; i++ {
@ -39,7 +40,7 @@ func TestXYZ(t *testing.T) {
// } // }
for i := uint32(0); i < 10*batch; i++ { for i := uint32(0); i < 10*batch; i++ {
v, ok := m.Get(Key(i))
v, ok := m.Get(NeedleId(i))
if i%3 == 0 { if i%3 == 0 {
if !ok { if !ok {
t.Fatal("key", i, "missing!") t.Fatal("key", i, "missing!")
@ -59,7 +60,7 @@ func TestXYZ(t *testing.T) {
} }
for i := uint32(10 * batch); i < 100*batch; i++ { for i := uint32(10 * batch); i < 100*batch; i++ {
v, ok := m.Get(Key(i))
v, ok := m.Get(NeedleId(i))
if i%37 == 0 { if i%37 == 0 {
if ok && v.Size > 0 { if ok && v.Size > 0 {
t.Fatal("key", i, "should have been deleted needle value", v) t.Fatal("key", i, "should have been deleted needle value", v)

13
weed/storage/needle/needle_value.go

@ -1,9 +1,8 @@
package needle package needle
import ( import (
"strconv"
"github.com/google/btree" "github.com/google/btree"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
const ( const (
@ -11,8 +10,8 @@ const (
) )
type NeedleValue struct { type NeedleValue struct {
Key Key
Offset uint32 `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
Key NeedleId
Offset Offset `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
Size uint32 `comment:"Size of the data portion"` Size uint32 `comment:"Size of the data portion"`
} }
@ -20,9 +19,3 @@ func (this NeedleValue) Less(than btree.Item) bool {
that := than.(NeedleValue) that := than.(NeedleValue)
return this.Key < that.Key return this.Key < that.Key
} }
type Key uint64
func (k Key) String() string {
return strconv.FormatUint(uint64(k), 10)
}

10
weed/storage/needle/needle_value_map.go

@ -1,8 +1,12 @@
package needle package needle
import (
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
type NeedleValueMap interface { type NeedleValueMap interface {
Set(key Key, offset, size uint32) (oldOffset, oldSize uint32)
Delete(key Key) uint32
Get(key Key) (*NeedleValue, bool)
Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32)
Delete(key NeedleId) uint32
Get(key NeedleId) (*NeedleValue, bool)
Visit(visit func(NeedleValue) error) error Visit(visit func(NeedleValue) error) error
} }

31
weed/storage/needle_map.go

@ -7,6 +7,7 @@ import (
"sync" "sync"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
@ -19,21 +20,17 @@ const (
NeedleMapBtree NeedleMapBtree
) )
const (
NeedleIndexSize = 16
)
type NeedleMapper interface { type NeedleMapper interface {
Put(key uint64, offset uint32, size uint32) error
Get(key uint64) (element *needle.NeedleValue, ok bool)
Delete(key uint64, offset uint32) error
Put(key NeedleId, offset Offset, size uint32) error
Get(key NeedleId) (element *needle.NeedleValue, ok bool)
Delete(key NeedleId, offset Offset) error
Close() Close()
Destroy() error Destroy() error
ContentSize() uint64 ContentSize() uint64
DeletedSize() uint64 DeletedSize() uint64
FileCount() int FileCount() int
DeletedCount() int DeletedCount() int
MaxFileKey() uint64
MaxFileKey() NeedleId
IndexFileSize() uint64 IndexFileSize() uint64
IndexFileContent() ([]byte, error) IndexFileContent() ([]byte, error)
IndexFileName() string IndexFileName() string
@ -58,17 +55,17 @@ func (nm *baseNeedleMapper) IndexFileName() string {
return nm.indexFile.Name() return nm.indexFile.Name()
} }
func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) {
key = util.BytesToUint64(bytes[:8])
offset = util.BytesToUint32(bytes[8:12])
size = util.BytesToUint32(bytes[12:16])
func IdxFileEntry(bytes []byte) (key NeedleId, offset Offset, size uint32) {
key = BytesToNeedleId(bytes[:NeedleIdSize])
offset = BytesToOffset(bytes[NeedleIdSize:NeedleIdSize+OffsetSize])
size = util.BytesToUint32(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize])
return return
} }
func (nm *baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error {
bytes := make([]byte, 16)
util.Uint64toBytes(bytes[0:8], key)
util.Uint32toBytes(bytes[8:12], offset)
util.Uint32toBytes(bytes[12:16], size)
func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size uint32) error {
bytes := make([]byte, NeedleIdSize+OffsetSize+SizeSize)
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
OffsetToBytes(bytes[NeedleIdSize:NeedleIdSize+OffsetSize], offset)
util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], size)
nm.indexFileAccessLock.Lock() nm.indexFileAccessLock.Lock()
defer nm.indexFileAccessLock.Unlock() defer nm.indexFileAccessLock.Unlock()

44
weed/storage/needle_map_boltdb.go

@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
@ -64,7 +65,7 @@ func generateBoltDbFile(dbFileName string, indexFile *os.File) error {
return err return err
} }
defer db.Close() defer db.Close()
return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error {
return WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size uint32) error {
if offset > 0 && size != TombstoneFileSize { if offset > 0 && size != TombstoneFileSize {
boltDbWrite(db, key, offset, size) boltDbWrite(db, key, offset, size)
} else { } else {
@ -74,10 +75,11 @@ func generateBoltDbFile(dbFileName string, indexFile *os.File) error {
}) })
} }
func (m *BoltDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) {
var offset, size uint32
bytes := make([]byte, 8)
util.Uint64toBytes(bytes, key)
func (m *BoltDbNeedleMap) Get(key NeedleId) (element *needle.NeedleValue, ok bool) {
var offset Offset
var size uint32
bytes := make([]byte, NeedleIdSize)
NeedleIdToBytes(bytes, key)
err := m.db.View(func(tx *bolt.Tx) error { err := m.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(boltdbBucket) bucket := tx.Bucket(boltdbBucket)
if bucket == nil { if bucket == nil {
@ -86,13 +88,13 @@ func (m *BoltDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool)
data := bucket.Get(bytes) data := bucket.Get(bytes)
if len(data) != 8 {
if len(data) != OffsetSize+SizeSize {
glog.V(0).Infof("wrong data length: %d", len(data)) glog.V(0).Infof("wrong data length: %d", len(data))
return fmt.Errorf("wrong data length: %d", len(data)) return fmt.Errorf("wrong data length: %d", len(data))
} }
offset = util.BytesToUint32(data[0:4])
size = util.BytesToUint32(data[4:8])
offset = BytesToOffset(data[0:OffsetSize])
size = util.BytesToUint32(data[OffsetSize:OffsetSize+SizeSize])
return nil return nil
}) })
@ -100,10 +102,10 @@ func (m *BoltDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool)
if err != nil { if err != nil {
return nil, false return nil, false
} }
return &needle.NeedleValue{Key: needle.Key(key), Offset: offset, Size: size}, true
return &needle.NeedleValue{Key: NeedleId(key), Offset: offset, Size: size}, true
} }
func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
func (m *BoltDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
var oldSize uint32 var oldSize uint32
if oldNeedle, ok := m.Get(key); ok { if oldNeedle, ok := m.Get(key); ok {
oldSize = oldNeedle.Size oldSize = oldNeedle.Size
@ -117,27 +119,29 @@ func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
} }
func boltDbWrite(db *bolt.DB, func boltDbWrite(db *bolt.DB,
key uint64, offset uint32, size uint32) error {
bytes := make([]byte, 16)
util.Uint64toBytes(bytes[0:8], key)
util.Uint32toBytes(bytes[8:12], offset)
util.Uint32toBytes(bytes[12:16], size)
key NeedleId, offset Offset, size uint32) error {
bytes := make([]byte, NeedleIdSize+OffsetSize+SizeSize)
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
OffsetToBytes(bytes[NeedleIdSize:NeedleIdSize+OffsetSize], offset)
util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], size)
return db.Update(func(tx *bolt.Tx) error { return db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
if err != nil { if err != nil {
return err return err
} }
err = bucket.Put(bytes[0:8], bytes[8:16])
err = bucket.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize])
if err != nil { if err != nil {
return err return err
} }
return nil return nil
}) })
} }
func boltDbDelete(db *bolt.DB, key uint64) error {
bytes := make([]byte, 8)
util.Uint64toBytes(bytes, key)
func boltDbDelete(db *bolt.DB, key NeedleId) error {
bytes := make([]byte, NeedleIdSize)
NeedleIdToBytes(bytes, key)
return db.Update(func(tx *bolt.Tx) error { return db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) bucket, err := tx.CreateBucketIfNotExists(boltdbBucket)
if err != nil { if err != nil {
@ -152,7 +156,7 @@ func boltDbDelete(db *bolt.DB, key uint64) error {
}) })
} }
func (m *BoltDbNeedleMap) Delete(key uint64, offset uint32) error {
func (m *BoltDbNeedleMap) Delete(key NeedleId, offset Offset) error {
if oldNeedle, ok := m.Get(key); ok { if oldNeedle, ok := m.Get(key); ok {
m.logDelete(oldNeedle.Size) m.logDelete(oldNeedle.Size)
} }

41
weed/storage/needle_map_leveldb.go

@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
@ -62,7 +63,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
return err return err
} }
defer db.Close() defer db.Close()
return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error {
return WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size uint32) error {
if offset > 0 && size != TombstoneFileSize { if offset > 0 && size != TombstoneFileSize {
levelDbWrite(db, key, offset, size) levelDbWrite(db, key, offset, size)
} else { } else {
@ -72,19 +73,19 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
}) })
} }
func (m *LevelDbNeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) {
bytes := make([]byte, 8)
util.Uint64toBytes(bytes, key)
func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle.NeedleValue, ok bool) {
bytes := make([]byte, NeedleIdSize)
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
data, err := m.db.Get(bytes, nil) data, err := m.db.Get(bytes, nil)
if err != nil || len(data) != 8 {
if err != nil || len(data) != OffsetSize+SizeSize {
return nil, false return nil, false
} }
offset := util.BytesToUint32(data[0:4])
size := util.BytesToUint32(data[4:8])
return &needle.NeedleValue{Key: needle.Key(key), Offset: offset, Size: size}, true
offset := BytesToOffset(data[0:OffsetSize])
size := util.BytesToUint32(data[OffsetSize:OffsetSize+SizeSize])
return &needle.NeedleValue{Key: NeedleId(key), Offset: offset, Size: size}, true
} }
func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
var oldSize uint32 var oldSize uint32
if oldNeedle, ok := m.Get(key); ok { if oldNeedle, ok := m.Get(key); ok {
oldSize = oldNeedle.Size oldSize = oldNeedle.Size
@ -98,23 +99,25 @@ func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error {
} }
func levelDbWrite(db *leveldb.DB, func levelDbWrite(db *leveldb.DB,
key uint64, offset uint32, size uint32) error {
bytes := make([]byte, 16)
util.Uint64toBytes(bytes[0:8], key)
util.Uint32toBytes(bytes[8:12], offset)
util.Uint32toBytes(bytes[12:16], size)
if err := db.Put(bytes[0:8], bytes[8:16], nil); err != nil {
key NeedleId, offset Offset, size uint32) error {
bytes := make([]byte, NeedleIdSize+OffsetSize+SizeSize)
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
OffsetToBytes(bytes[NeedleIdSize:NeedleIdSize+OffsetSize], offset)
util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], size)
if err := db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil {
return fmt.Errorf("failed to write leveldb: %v", err) return fmt.Errorf("failed to write leveldb: %v", err)
} }
return nil return nil
} }
func levelDbDelete(db *leveldb.DB, key uint64) error {
bytes := make([]byte, 8)
util.Uint64toBytes(bytes, key)
func levelDbDelete(db *leveldb.DB, key NeedleId) error {
bytes := make([]byte, NeedleIdSize)
NeedleIdToBytes(bytes, key)
return db.Delete(bytes, nil) return db.Delete(bytes, nil)
} }
func (m *LevelDbNeedleMap) Delete(key uint64, offset uint32) error {
func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
if oldNeedle, ok := m.Get(key); ok { if oldNeedle, ok := m.Get(key); ok {
m.logDelete(oldNeedle.Size) m.logDelete(oldNeedle.Size)
} }

34
weed/storage/needle_map_memory.go

@ -6,6 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
type NeedleMap struct { type NeedleMap struct {
@ -45,21 +46,21 @@ func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) {
} }
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
e := WalkIndexFile(file, func(key uint64, offset, size uint32) error {
e := WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error {
if key > nm.MaximumFileKey { if key > nm.MaximumFileKey {
nm.MaximumFileKey = key nm.MaximumFileKey = key
} }
if offset > 0 && size != TombstoneFileSize { if offset > 0 && size != TombstoneFileSize {
nm.FileCounter++ nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size) nm.FileByteCounter = nm.FileByteCounter + uint64(size)
oldOffset, oldSize := nm.m.Set(needle.Key(key), offset, size)
oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size)
// glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) // glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
if oldOffset > 0 && oldSize != TombstoneFileSize { if oldOffset > 0 && oldSize != TombstoneFileSize {
nm.DeletionCounter++ nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
} }
} else { } else {
oldSize := nm.m.Delete(needle.Key(key))
oldSize := nm.m.Delete(NeedleId(key))
// glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) // glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
nm.DeletionCounter++ nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
@ -72,21 +73,22 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
// walks through the index file, calls fn function with each key, offset, size // walks through the index file, calls fn function with each key, offset, size
// stops with the error returned by the fn function // stops with the error returned by the fn function
func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error {
func WalkIndexFile(r *os.File, fn func(key NeedleId, offset Offset, size uint32) error) error {
var readerOffset int64 var readerOffset int64
bytes := make([]byte, NeedleIndexSize*RowsToRead)
bytes := make([]byte, NeedleEntrySize*RowsToRead)
count, e := r.ReadAt(bytes, readerOffset) count, e := r.ReadAt(bytes, readerOffset)
glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
readerOffset += int64(count) readerOffset += int64(count)
var ( var (
key uint64
offset, size uint32
i int
key NeedleId
offset Offset
size uint32
i int
) )
for count > 0 && e == nil || e == io.EOF { for count > 0 && e == nil || e == io.EOF {
for i = 0; i+NeedleIndexSize <= count; i += NeedleIndexSize {
key, offset, size = idxFileEntry(bytes[i: i+NeedleIndexSize])
for i = 0; i+NeedleEntrySize <= count; i += NeedleEntrySize {
key, offset, size = IdxFileEntry(bytes[i: i+NeedleEntrySize])
if e = fn(key, offset, size); e != nil { if e = fn(key, offset, size); e != nil {
return e return e
} }
@ -101,17 +103,17 @@ func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) e
return e return e
} }
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error {
_, oldSize := nm.m.Set(needle.Key(key), offset, size)
func (nm *NeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
_, oldSize := nm.m.Set(NeedleId(key), offset, size)
nm.logPut(key, oldSize, size) nm.logPut(key, oldSize, size)
return nm.appendToIndexFile(key, offset, size) return nm.appendToIndexFile(key, offset, size)
} }
func (nm *NeedleMap) Get(key uint64) (element *needle.NeedleValue, ok bool) {
element, ok = nm.m.Get(needle.Key(key))
func (nm *NeedleMap) Get(key NeedleId) (element *needle.NeedleValue, ok bool) {
element, ok = nm.m.Get(NeedleId(key))
return return
} }
func (nm *NeedleMap) Delete(key uint64, offset uint32) error {
deletedBytes := nm.m.Delete(needle.Key(key))
func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error {
deletedBytes := nm.m.Delete(NeedleId(key))
nm.logDelete(deletedBytes) nm.logDelete(deletedBytes)
return nm.appendToIndexFile(key, offset, TombstoneFileSize) return nm.appendToIndexFile(key, offset, TombstoneFileSize)
} }

35
weed/storage/needle_map_metric.go

@ -5,15 +5,15 @@ import (
"os" "os"
"github.com/willf/bloom" "github.com/willf/bloom"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"encoding/binary"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
type mapMetric struct { type mapMetric struct {
DeletionCounter int `json:"DeletionCounter"`
FileCounter int `json:"FileCounter"`
DeletionByteCounter uint64 `json:"DeletionByteCounter"`
FileByteCounter uint64 `json:"FileByteCounter"`
MaximumFileKey uint64 `json:"MaxFileKey"`
DeletionCounter int `json:"DeletionCounter"`
FileCounter int `json:"FileCounter"`
DeletionByteCounter uint64 `json:"DeletionByteCounter"`
FileByteCounter uint64 `json:"FileByteCounter"`
MaximumFileKey NeedleId `json:"MaxFileKey"`
} }
func (mm *mapMetric) logDelete(deletedByteCount uint32) { func (mm *mapMetric) logDelete(deletedByteCount uint32) {
@ -21,7 +21,7 @@ func (mm *mapMetric) logDelete(deletedByteCount uint32) {
mm.DeletionCounter++ mm.DeletionCounter++
} }
func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) {
func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) {
if key > mm.MaximumFileKey { if key > mm.MaximumFileKey {
mm.MaximumFileKey = key mm.MaximumFileKey = key
} }
@ -45,23 +45,22 @@ func (mm mapMetric) FileCount() int {
func (mm mapMetric) DeletedCount() int { func (mm mapMetric) DeletedCount() int {
return mm.DeletionCounter return mm.DeletionCounter
} }
func (mm mapMetric) MaxFileKey() uint64 {
func (mm mapMetric) MaxFileKey() NeedleId {
return mm.MaximumFileKey return mm.MaximumFileKey
} }
func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) { func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
mm = &mapMetric{} mm = &mapMetric{}
var bf *bloom.BloomFilter var bf *bloom.BloomFilter
buf := make([]byte, 8)
buf := make([]byte, NeedleIdSize)
err = reverseWalkIndexFile(r, func(entryCount int64) { err = reverseWalkIndexFile(r, func(entryCount int64) {
bf = bloom.NewWithEstimates(uint(entryCount), 0.001) bf = bloom.NewWithEstimates(uint(entryCount), 0.001)
}, func(key uint64, offset, size uint32) error {
}, func(key NeedleId, offset Offset, size uint32) error {
if key > mm.MaximumFileKey { if key > mm.MaximumFileKey {
mm.MaximumFileKey = key mm.MaximumFileKey = key
} }
binary.BigEndian.PutUint64(buf, key)
NeedleIdToBytes(buf, key)
if size != TombstoneFileSize { if size != TombstoneFileSize {
mm.FileByteCounter += uint64(size) mm.FileByteCounter += uint64(size)
} }
@ -82,23 +81,23 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
return return
} }
func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key uint64, offset, size uint32) error) error {
func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key NeedleId, offset Offset, size uint32) error) error {
fi, err := r.Stat() fi, err := r.Stat()
if err != nil { if err != nil {
return fmt.Errorf("file %s stat error: %v", r.Name(), err) return fmt.Errorf("file %s stat error: %v", r.Name(), err)
} }
fileSize := fi.Size() fileSize := fi.Size()
if fileSize%NeedleIndexSize != 0 {
if fileSize%NeedleEntrySize != 0 {
return fmt.Errorf("unexpected file %s size: %d", r.Name(), fileSize) return fmt.Errorf("unexpected file %s size: %d", r.Name(), fileSize)
} }
initFn(fileSize / NeedleIndexSize)
initFn(fileSize / NeedleEntrySize)
bytes := make([]byte, NeedleIndexSize)
for readerOffset := fileSize - NeedleIndexSize; readerOffset >= 0; readerOffset -= NeedleIndexSize {
bytes := make([]byte, NeedleEntrySize)
for readerOffset := fileSize - NeedleEntrySize; readerOffset >= 0; readerOffset -= NeedleEntrySize {
count, e := r.ReadAt(bytes, readerOffset) count, e := r.ReadAt(bytes, readerOffset)
glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
key, offset, size := idxFileEntry(bytes)
key, offset, size := IdxFileEntry(bytes)
if e = fn(key, offset, size); e != nil { if e = fn(key, offset, size); e != nil {
return e return e
} }

71
weed/storage/needle_read_write.go

@ -6,6 +6,7 @@ import (
"io" "io"
"os" "os"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
@ -43,27 +44,27 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
} }
switch version { switch version {
case Version1: case Version1:
header := make([]byte, NeedleHeaderSize)
util.Uint32toBytes(header[0:4], n.Cookie)
util.Uint64toBytes(header[4:12], n.Id)
header := make([]byte, NeedleEntrySize)
CookieToBytes(header[0:CookieSize], n.Cookie)
NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
n.Size = uint32(len(n.Data)) n.Size = uint32(len(n.Data))
size = n.Size size = n.Size
util.Uint32toBytes(header[12:16], n.Size)
util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
if _, err = w.Write(header); err != nil { if _, err = w.Write(header); err != nil {
return return
} }
if _, err = w.Write(n.Data); err != nil { if _, err = w.Write(n.Data); err != nil {
return return
} }
actualSize = NeedleHeaderSize + int64(n.Size)
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
actualSize = NeedleEntrySize + int64(n.Size)
padding := NeedlePaddingSize - ((NeedleEntrySize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
_, err = w.Write(header[0 : NeedleChecksumSize+padding])
_, err = w.Write(header[0: NeedleChecksumSize+padding])
return return
case Version2: case Version2:
header := make([]byte, NeedleHeaderSize)
util.Uint32toBytes(header[0:4], n.Cookie)
util.Uint64toBytes(header[4:12], n.Id)
header := make([]byte, NeedleEntrySize)
CookieToBytes(header[0:CookieSize], n.Cookie)
NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime)) n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime))
if n.DataSize > 0 { if n.DataSize > 0 {
n.Size = 4 + n.DataSize + 1 n.Size = 4 + n.DataSize + 1
@ -86,7 +87,7 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
n.Size = 0 n.Size = 0
} }
size = n.DataSize size = n.DataSize
util.Uint32toBytes(header[12:16], n.Size)
util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
if _, err = w.Write(header); err != nil { if _, err = w.Write(header); err != nil {
return return
} }
@ -122,7 +123,7 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
} }
if n.HasLastModifiedDate() { if n.HasLastModifiedDate() {
util.Uint64toBytes(header[0:8], n.LastModified) util.Uint64toBytes(header[0:8], n.LastModified)
if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil {
if _, err = w.Write(header[8-LastModifiedBytesLength: 8]); err != nil {
return return
} }
} }
@ -142,9 +143,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
} }
} }
} }
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
padding := NeedlePaddingSize - ((NeedleEntrySize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
_, err = w.Write(header[0 : NeedleChecksumSize+padding])
_, err = w.Write(header[0: NeedleChecksumSize+padding])
return n.DataSize, getActualSize(n.Size), err return n.DataSize, getActualSize(n.Size), err
} }
@ -152,7 +153,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
} }
func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, err error) { func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, err error) {
return getBytesForFileBlock(r, offset, int(getActualSize(size)))
dataSlice = make([]byte, int(getActualSize(size)))
_, err = r.ReadAt(dataSlice, offset)
return dataSlice, err
} }
func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
@ -166,14 +169,14 @@ func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version
} }
switch version { switch version {
case Version1: case Version1:
n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
n.Data = bytes[NeedleEntrySize: NeedleEntrySize+size]
case Version2: case Version2:
n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)])
n.readNeedleDataVersion2(bytes[NeedleEntrySize: NeedleEntrySize+int(n.Size)])
} }
if size == 0 { if size == 0 {
return nil return nil
} }
checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
checksum := util.BytesToUint32(bytes[NeedleEntrySize+size: NeedleEntrySize+size+NeedleChecksumSize])
newChecksum := NewCRC(n.Data) newChecksum := NewCRC(n.Data)
if checksum != newChecksum.Value() { if checksum != newChecksum.Value() {
return errors.New("CRC error! Data On Disk Corrupted") return errors.New("CRC error! Data On Disk Corrupted")
@ -181,22 +184,24 @@ func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version
n.Checksum = newChecksum n.Checksum = newChecksum
return nil return nil
} }
func (n *Needle) ParseNeedleHeader(bytes []byte) { func (n *Needle) ParseNeedleHeader(bytes []byte) {
n.Cookie = util.BytesToUint32(bytes[0:4])
n.Id = util.BytesToUint64(bytes[4:12])
n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize])
n.Cookie = BytesToCookie(bytes[0:CookieSize])
n.Id = BytesToNeedleId(bytes[CookieSize:CookieSize+NeedleIdSize])
n.Size = util.BytesToUint32(bytes[CookieSize+NeedleIdSize:NeedleEntrySize])
} }
func (n *Needle) readNeedleDataVersion2(bytes []byte) { func (n *Needle) readNeedleDataVersion2(bytes []byte) {
index, lenBytes := 0, len(bytes) index, lenBytes := 0, len(bytes)
if index < lenBytes { if index < lenBytes {
n.DataSize = util.BytesToUint32(bytes[index : index+4])
n.DataSize = util.BytesToUint32(bytes[index: index+4])
index = index + 4 index = index + 4
if int(n.DataSize)+index > lenBytes { if int(n.DataSize)+index > lenBytes {
// this if clause is due to bug #87 and #93, fixed in v0.69 // this if clause is due to bug #87 and #93, fixed in v0.69
// remove this clause later // remove this clause later
return return
} }
n.Data = bytes[index : index+int(n.DataSize)]
n.Data = bytes[index: index+int(n.DataSize)]
index = index + int(n.DataSize) index = index + int(n.DataSize)
n.Flags = bytes[index] n.Flags = bytes[index]
index = index + 1 index = index + 1
@ -204,25 +209,25 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) {
if index < lenBytes && n.HasName() { if index < lenBytes && n.HasName() {
n.NameSize = uint8(bytes[index]) n.NameSize = uint8(bytes[index])
index = index + 1 index = index + 1
n.Name = bytes[index : index+int(n.NameSize)]
n.Name = bytes[index: index+int(n.NameSize)]
index = index + int(n.NameSize) index = index + int(n.NameSize)
} }
if index < lenBytes && n.HasMime() { if index < lenBytes && n.HasMime() {
n.MimeSize = uint8(bytes[index]) n.MimeSize = uint8(bytes[index])
index = index + 1 index = index + 1
n.Mime = bytes[index : index+int(n.MimeSize)]
n.Mime = bytes[index: index+int(n.MimeSize)]
index = index + int(n.MimeSize) index = index + int(n.MimeSize)
} }
if index < lenBytes && n.HasLastModifiedDate() { if index < lenBytes && n.HasLastModifiedDate() {
n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
n.LastModified = util.BytesToUint64(bytes[index: index+LastModifiedBytesLength])
index = index + LastModifiedBytesLength index = index + LastModifiedBytesLength
} }
if index < lenBytes && n.HasTtl() { if index < lenBytes && n.HasTtl() {
n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
n.Ttl = LoadTTLFromBytes(bytes[index: index+TtlBytesLength])
index = index + TtlBytesLength index = index + TtlBytesLength
} }
if index < lenBytes && n.HasPairs() { if index < lenBytes && n.HasPairs() {
n.PairsSize = util.BytesToUint16(bytes[index : index+2])
n.PairsSize = util.BytesToUint16(bytes[index: index+2])
index += 2 index += 2
end := index + int(n.PairsSize) end := index + int(n.PairsSize)
n.Pairs = bytes[index:end] n.Pairs = bytes[index:end]
@ -230,25 +235,25 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) {
} }
} }
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength int64, err error) {
n = new(Needle) n = new(Needle)
if version == Version1 || version == Version2 { if version == Version1 || version == Version2 {
bytes := make([]byte, NeedleHeaderSize)
bytes := make([]byte, NeedleEntrySize)
var count int var count int
count, err = r.ReadAt(bytes, offset) count, err = r.ReadAt(bytes, offset)
if count <= 0 || err != nil { if count <= 0 || err != nil {
return nil, 0, err return nil, 0, err
} }
n.ParseNeedleHeader(bytes) n.ParseNeedleHeader(bytes)
padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
bodyLength = n.Size + NeedleChecksumSize + padding
padding := NeedlePaddingSize - ((n.Size + NeedleEntrySize + NeedleChecksumSize) % NeedlePaddingSize)
bodyLength = int64(n.Size) + NeedleChecksumSize + int64(padding)
} }
return return
} }
//n should be a needle already read the header //n should be a needle already read the header
//the input stream will read until next file entry //the input stream will read until next file entry
func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength uint32) (err error) {
func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength int64) (err error) {
if bodyLength <= 0 { if bodyLength <= 0 {
return nil return nil
} }

4
weed/storage/needle_test.go

@ -26,7 +26,7 @@ func TestParseKeyHash(t *testing.T) {
} }
for _, tc := range testcases { for _, tc := range testcases {
if id, cookie, err := ParseKeyHash(tc.KeyHash); err != nil && !tc.Err {
if id, cookie, err := ParseNeedleIdCookie(tc.KeyHash); err != nil && !tc.Err {
t.Fatalf("Parse %s error: %v", tc.KeyHash, err) t.Fatalf("Parse %s error: %v", tc.KeyHash, err)
} else if err == nil && tc.Err { } else if err == nil && tc.Err {
t.Fatalf("Parse %s expected error got nil", tc.KeyHash) t.Fatalf("Parse %s expected error got nil", tc.KeyHash)
@ -40,6 +40,6 @@ func BenchmarkParseKeyHash(b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
ParseKeyHash("4ed44ed44ed44ed4c8116e41")
ParseNeedleIdCookie("4ed44ed44ed44ed4c8116e41")
} }
} }

5
weed/storage/store.go

@ -5,6 +5,7 @@ import (
"strconv" "strconv"
"strings" "strings"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
) )
@ -160,7 +161,7 @@ func (s *Store) SetRack(rack string) {
func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var volumeMessages []*master_pb.VolumeInformationMessage var volumeMessages []*master_pb.VolumeInformationMessage
maxVolumeCount := 0 maxVolumeCount := 0
var maxFileKey uint64
var maxFileKey NeedleId
for _, location := range s.Locations { for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
location.Lock() location.Lock()
@ -199,7 +200,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
Port: uint32(s.Port), Port: uint32(s.Port),
PublicUrl: s.PublicUrl, PublicUrl: s.PublicUrl,
MaxVolumeCount: uint32(maxVolumeCount), MaxVolumeCount: uint32(maxVolumeCount),
MaxFileKey: maxFileKey,
MaxFileKey: NeedleIdToUint64(maxFileKey),
DataCenter: s.dataCenter, DataCenter: s.dataCenter,
Rack: s.rack, Rack: s.rack,
Volumes: volumeMessages, Volumes: volumeMessages,

79
weed/storage/types/needle_types.go

@ -0,0 +1,79 @@
package types
import (
"math"
"github.com/chrislusf/seaweedfs/weed/util"
"strconv"
"fmt"
)
type NeedleId uint64
type Offset uint32
type Cookie uint32
const (
NeedleIdSize = 8
OffsetSize = 4
SizeSize = 4 // uint32 size
NeedleEntrySize = NeedleIdSize + OffsetSize + SizeSize
NeedlePaddingSize = 8
MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
TombstoneFileSize = math.MaxUint32
CookieSize = 4
)
func NeedleIdToBytes(bytes []byte, needleId NeedleId) {
util.Uint64toBytes(bytes, uint64(needleId))
}
// NeedleIdToUint64 used to send max needle id to master
func NeedleIdToUint64(needleId NeedleId) uint64 {
return uint64(needleId)
}
func Uint64ToNeedleId(needleId uint64) (NeedleId) {
return NeedleId(needleId)
}
func BytesToNeedleId(bytes []byte) (NeedleId) {
return NeedleId(util.BytesToUint64(bytes))
}
func CookieToBytes(bytes []byte, cookie Cookie) {
util.Uint32toBytes(bytes, uint32(cookie))
}
func Uint32ToCookie(cookie uint32) (Cookie) {
return Cookie(cookie)
}
func BytesToCookie(bytes []byte) (Cookie) {
return Cookie(util.BytesToUint32(bytes[0:4]))
}
func OffsetToBytes(bytes []byte, offset Offset) {
util.Uint32toBytes(bytes, uint32(offset))
}
func BytesToOffset(bytes []byte) (Offset) {
return Offset(util.BytesToUint32(bytes[0:4]))
}
func (k NeedleId) String() string {
return strconv.FormatUint(uint64(k), 10)
}
func ParseNeedleId(idString string) (NeedleId, error) {
key, err := strconv.ParseUint(idString, 16, 64)
if err != nil {
return 0, fmt.Errorf("needle id %s format error: %v", idString, err)
}
return NeedleId(key), nil
}
func ParseCookie(cookieString string) (Cookie, error) {
cookie, err := strconv.ParseUint(cookieString, 16, 32)
if err != nil {
return 0, fmt.Errorf("needle cookie %s format error: %v", cookieString, err)
}
return Cookie(cookie), nil
}

15
weed/storage/volume_checking.go

@ -4,12 +4,13 @@ import (
"fmt" "fmt"
"os" "os"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
func getActualSize(size uint32) int64 { func getActualSize(size uint32) int64 {
padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
return NeedleHeaderSize + int64(size) + NeedleChecksumSize + int64(padding)
padding := NeedlePaddingSize - ((NeedleEntrySize + size + NeedleChecksumSize) % NeedlePaddingSize)
return NeedleEntrySize + int64(size) + NeedleChecksumSize + int64(padding)
} }
func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error { func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error {
@ -22,10 +23,10 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error {
return nil return nil
} }
var lastIdxEntry []byte var lastIdxEntry []byte
if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleIndexSize); e != nil {
if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleEntrySize); e != nil {
return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
} }
key, offset, size := idxFileEntry(lastIdxEntry)
key, offset, size := IdxFileEntry(lastIdxEntry)
if offset == 0 || size == TombstoneFileSize { if offset == 0 || size == TombstoneFileSize {
return nil return nil
} }
@ -38,7 +39,7 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error {
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) { func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
if indexSize, err = util.GetFileSize(indexFile); err == nil { if indexSize, err = util.GetFileSize(indexFile); err == nil {
if indexSize%NeedleIndexSize != 0 {
if indexSize%NeedleEntrySize != 0 {
err = fmt.Errorf("index file's size is %d bytes, maybe corrupted", indexSize) err = fmt.Errorf("index file's size is %d bytes, maybe corrupted", indexSize)
} }
} }
@ -50,12 +51,12 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
err = fmt.Errorf("offset %d for index file is invalid", offset) err = fmt.Errorf("offset %d for index file is invalid", offset)
return return
} }
bytes = make([]byte, NeedleIndexSize)
bytes = make([]byte, NeedleEntrySize)
_, err = indexFile.ReadAt(bytes, offset) _, err = indexFile.ReadAt(bytes, offset)
return return
} }
func verifyNeedleIntegrity(datFile *os.File, v Version, offset int64, key uint64, size uint32) error {
func verifyNeedleIntegrity(datFile *os.File, v Version, offset int64, key NeedleId, size uint32) error {
n := new(Needle) n := new(Needle)
err := n.ReadData(datFile, offset, size, v) err := n.ReadData(datFile, offset, size, v)
if err != nil { if err != nil {

13
weed/storage/volume_read_write.go

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
) )
// isFileUnchanged checks whether this needle to write is same as last one. // isFileUnchanged checks whether this needle to write is same as last one.
@ -109,7 +110,7 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) {
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
if err = v.nm.Put(n.Id, Offset(offset/NeedlePaddingSize), n.Size); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
} }
} }
@ -134,7 +135,7 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) {
if err != nil { if err != nil {
return size, err return size, err
} }
if err := v.nm.Delete(n.Id, uint32(offset/NeedlePaddingSize)); err != nil {
if err := v.nm.Delete(n.Id, Offset(offset/NeedlePaddingSize)); err != nil {
return size, err return size, err
} }
n.Data = nil n.Data = nil
@ -197,7 +198,7 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId,
} }
for n != nil { for n != nil {
if readNeedleBody { if readNeedleBody {
if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
if err = n.ReadNeedleBody(v.dataFile, version, offset+NeedleEntrySize, rest); err != nil {
glog.V(0).Infof("cannot read needle body: %v", err) glog.V(0).Infof("cannot read needle body: %v", err)
//err = fmt.Errorf("cannot read needle body: %v", err) //err = fmt.Errorf("cannot read needle body: %v", err)
//return //return
@ -207,9 +208,9 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId,
// fixed in v0.69 // fixed in v0.69
// remove this whole "if" clause later, long after 0.69 // remove this whole "if" clause later, long after 0.69
oldRest, oldSize := rest, n.Size oldRest, oldSize := rest, n.Size
padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
padding := NeedlePaddingSize - ((n.Size + NeedleEntrySize + NeedleChecksumSize) % NeedlePaddingSize)
n.Size = 0 n.Size = 0
rest = n.Size + NeedleChecksumSize + padding
rest = int64(n.Size + NeedleChecksumSize + padding)
if rest%NeedlePaddingSize != 0 { if rest%NeedlePaddingSize != 0 {
rest += (NeedlePaddingSize - rest%NeedlePaddingSize) rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
} }
@ -219,7 +220,7 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId,
if err = visitNeedle(n, offset); err != nil { if err = visitNeedle(n, offset); err != nil {
glog.V(0).Infof("visit needle error: %v", err) glog.V(0).Infof("visit needle error: %v", err)
} }
offset += int64(NeedleHeaderSize) + int64(rest)
offset += NeedleEntrySize + rest
glog.V(4).Infof("==> new entry offset %d", offset) glog.V(4).Infof("==> new entry offset %d", offset)
if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil { if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
if err == io.EOF { if err == io.EOF {

15
weed/storage/volume_sync.go

@ -12,6 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
@ -93,7 +94,7 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.Compact
if needleValue.Key == 0 { if needleValue.Key == 0 {
return nil return nil
} }
if _, ok := slaveMap.Get(uint64(needleValue.Key)); ok {
if _, ok := slaveMap.Get(needleValue.Key); ok {
return nil // skip intersection return nil // skip intersection
} }
delta = append(delta, needleValue) delta = append(delta, needleValue)
@ -147,12 +148,12 @@ func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m *needle.Compac
} }
total := 0 total := 0
err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) {
err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key NeedleId, offset Offset, size uint32) {
// println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size) // println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
if offset > 0 && size != TombstoneFileSize { if offset > 0 && size != TombstoneFileSize {
m.Set(needle.Key(key), offset, size)
m.Set(NeedleId(key), offset, size)
} else { } else {
m.Delete(needle.Key(key))
m.Delete(NeedleId(key))
} }
total++ total++
}) })
@ -179,9 +180,9 @@ func (v *Volume) IndexFileContent() ([]byte, error) {
} }
// removeNeedle removes one needle by needle key // removeNeedle removes one needle by needle key
func (v *Volume) removeNeedle(key needle.Key) {
func (v *Volume) removeNeedle(key NeedleId) {
n := new(Needle) n := new(Needle)
n.Id = uint64(key)
n.Id = key
v.deleteNeedle(n) v.deleteNeedle(n)
} }
@ -208,7 +209,7 @@ func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string,
return fmt.Errorf("Appending volume %d error: %v", v.Id, err) return fmt.Errorf("Appending volume %d error: %v", v.Id, err)
} }
// println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size) // println("add key", needleValue.Key, "offset", offset, "size", needleValue.Size)
v.nm.Put(uint64(needleValue.Key), uint32(offset/NeedlePaddingSize), needleValue.Size)
v.nm.Put(needleValue.Key, Offset(offset/NeedlePaddingSize), needleValue.Size)
return nil return nil
}) })
} }

23
weed/storage/volume_vacuum.go

@ -5,6 +5,7 @@ import (
"os" "os"
"time" "time"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
@ -122,17 +123,17 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
} }
type keyField struct { type keyField struct {
offset uint32
offset Offset
size uint32 size uint32
} }
incrementedHasUpdatedIndexEntry := make(map[uint64]keyField)
incrementedHasUpdatedIndexEntry := make(map[NeedleId]keyField)
for idx_offset := indexSize - NeedleIndexSize; uint64(idx_offset) >= v.lastCompactIndexOffset; idx_offset -= NeedleIndexSize {
for idx_offset := indexSize - NeedleEntrySize; uint64(idx_offset) >= v.lastCompactIndexOffset; idx_offset -= NeedleEntrySize {
var IdxEntry []byte var IdxEntry []byte
if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil { if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil {
return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err) return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err)
} }
key, offset, size := idxFileEntry(IdxEntry)
key, offset, size := IdxFileEntry(IdxEntry)
glog.V(4).Infof("key %d offset %d size %d", key, offset, size) glog.V(4).Infof("key %d offset %d size %d", key, offset, size)
if _, found := incrementedHasUpdatedIndexEntry[key]; !found { if _, found := incrementedHasUpdatedIndexEntry[key]; !found {
incrementedHasUpdatedIndexEntry[key] = keyField{ incrementedHasUpdatedIndexEntry[key] = keyField{
@ -170,11 +171,11 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision) return fmt.Errorf("oldDatFile %s 's compact revision is %d while newDatFile %s 's compact revision is %d", oldDatFileName, oldDatCompactRevision, newDatFileName, newDatCompactRevision)
} }
idx_entry_bytes := make([]byte, 16)
idx_entry_bytes := make([]byte, NeedleIdSize+OffsetSize+SizeSize)
for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry { for key, incre_idx_entry := range incrementedHasUpdatedIndexEntry {
util.Uint64toBytes(idx_entry_bytes[0:8], key)
util.Uint32toBytes(idx_entry_bytes[8:12], incre_idx_entry.offset)
util.Uint32toBytes(idx_entry_bytes[12:16], incre_idx_entry.size)
NeedleIdToBytes(idx_entry_bytes[0:NeedleIdSize], key)
OffsetToBytes(idx_entry_bytes[NeedleIdSize:NeedleIdSize+OffsetSize], incre_idx_entry.offset)
util.Uint32toBytes(idx_entry_bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], incre_idx_entry.size)
var offset int64 var offset int64
if offset, err = dst.Seek(0, 2); err != nil { if offset, err = dst.Seek(0, 2); err != nil {
@ -255,7 +256,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
nv, ok := v.nm.Get(n.Id) nv, ok := v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
if err = nm.Put(n.Id, Offset(new_offset/NeedlePaddingSize), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err) return fmt.Errorf("cannot put needle: %s", err)
} }
if _, _, err := n.Append(dst, v.Version()); err != nil { if _, _, err := n.Append(dst, v.Version()); err != nil {
@ -296,7 +297,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
dst.Write(v.SuperBlock.Bytes()) dst.Write(v.SuperBlock.Bytes())
new_offset := int64(v.SuperBlock.BlockSize()) new_offset := int64(v.SuperBlock.BlockSize())
WalkIndexFile(oldIndexFile, func(key uint64, offset, size uint32) error {
WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error {
if offset == 0 || size == TombstoneFileSize { if offset == 0 || size == TombstoneFileSize {
return nil return nil
} }
@ -315,7 +316,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if nv.Offset == offset && nv.Size > 0 { if nv.Offset == offset && nv.Size > 0 {
if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
if err = nm.Put(n.Id, Offset(new_offset/NeedlePaddingSize), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err) return fmt.Errorf("cannot put needle: %s", err)
} }
if _, _, err = n.Append(dst, v.Version()); err != nil { if _, _, err = n.Append(dst, v.Version()); err != nil {

Loading…
Cancel
Save