Browse Source

Merge pull request #508 from chrislusf/skip_memory_pool

skip bytes cache
pull/533/head
Chris Lu 8 years ago
committed by GitHub
parent
commit
761c0eb1ed
  1. 1
      weed/command/server.go
  2. 3
      weed/command/volume.go
  3. 4
      weed/server/volume_server.go
  4. 1
      weed/server/volume_server_handlers_read.go
  5. 3
      weed/server/volume_server_handlers_sync.go
  6. 4
      weed/server/volume_server_handlers_write.go
  7. 2
      weed/storage/needle.go
  8. 75
      weed/storage/needle_byte_cache.go
  9. 5
      weed/storage/needle_read_write.go
  10. 2
      weed/storage/volume_read_write.go
  11. 3
      weed/storage/volume_vacuum.go
  12. 127
      weed/util/bytes_pool.go
  13. 41
      weed/util/bytes_pool_test.go

1
weed/command/server.go

@ -266,7 +266,6 @@ func runServer(cmd *Command, args []string) bool {
volumeNeedleMapKind, volumeNeedleMapKind,
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack,
serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect, serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect,
*volumeEnableBytesCache,
) )
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort)) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort))

3
weed/command/volume.go

@ -36,7 +36,6 @@ type VolumeServerOptions struct {
indexType *string indexType *string
fixJpgOrientation *bool fixJpgOrientation *bool
readRedirect *bool readRedirect *bool
enableBytesCache *bool
} }
func init() { func init() {
@ -55,7 +54,6 @@ func init() {
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.") v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.")
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
v.enableBytesCache = cmdVolume.Flag.Bool("cache.enable", false, "direct cache instead of OS cache, cost more memory.")
} }
var cmdVolume = &Command{ var cmdVolume = &Command{
@ -136,7 +134,6 @@ func runVolume(cmd *Command, args []string) bool {
*v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack,
v.whiteList, v.whiteList,
*v.fixJpgOrientation, *v.readRedirect, *v.fixJpgOrientation, *v.readRedirect,
*v.enableBytesCache,
) )
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)

4
weed/server/volume_server.go

@ -32,8 +32,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
dataCenter string, rack string, dataCenter string, rack string,
whiteList []string, whiteList []string,
fixJpgOrientation bool, fixJpgOrientation bool,
readRedirect bool,
enableBytesCache bool) *VolumeServer {
readRedirect bool) *VolumeServer {
vs := &VolumeServer{ vs := &VolumeServer{
pulseSeconds: pulseSeconds, pulseSeconds: pulseSeconds,
dataCenter: dataCenter, dataCenter: dataCenter,
@ -44,7 +43,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
} }
vs.SetMasterNode(masterNode) vs.SetMasterNode(masterNode)
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
storage.EnableBytesCache = enableBytesCache
vs.guard = security.NewGuard(whiteList, "") vs.guard = security.NewGuard(whiteList, "")

1
weed/server/volume_server_handlers_read.go

@ -72,7 +72,6 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return
} }
defer n.ReleaseMemory()
if n.Cookie != cookie { if n.Cookie != cookie {
glog.V(0).Infoln("request", r.URL.Path, "with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent()) glog.V(0).Infoln("request", r.URL.Path, "with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent())
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)

3
weed/server/volume_server_handlers_sync.go

@ -50,8 +50,7 @@ func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *ht
} }
offset := uint32(util.ParseUint64(r.FormValue("offset"), 0)) offset := uint32(util.ParseUint64(r.FormValue("offset"), 0))
size := uint32(util.ParseUint64(r.FormValue("size"), 0)) size := uint32(util.ParseUint64(r.FormValue("size"), 0))
content, block, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size)
defer storage.ReleaseBytes(block.Bytes)
content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size)
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
return return

4
weed/server/volume_server_handlers_write.go

@ -64,7 +64,6 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
writeJsonQuiet(w, r, http.StatusNotFound, m) writeJsonQuiet(w, r, http.StatusNotFound, m)
return return
} }
defer n.ReleaseMemory()
if n.Cookie != cookie { if n.Cookie != cookie {
glog.V(0).Infoln("delete", r.URL.Path, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) glog.V(0).Infoln("delete", r.URL.Path, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
@ -133,7 +132,6 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
Status: http.StatusNotAcceptable, Status: http.StatusNotAcceptable,
Error: "ChunkManifest: not allowed in batch delete mode.", Error: "ChunkManifest: not allowed in batch delete mode.",
}) })
n.ReleaseMemory()
continue continue
} }
@ -144,7 +142,6 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
Error: "File Random Cookie does not match.", Error: "File Random Cookie does not match.",
}) })
glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
n.ReleaseMemory()
return return
} }
if size, err := vs.store.Delete(volumeId, n); err != nil { if size, err := vs.store.Delete(volumeId, n); err != nil {
@ -160,7 +157,6 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
Size: int(size)}, Size: int(size)},
) )
} }
n.ReleaseMemory()
} }
writeJsonQuiet(w, r, http.StatusAccepted, ret) writeJsonQuiet(w, r, http.StatusAccepted, ret)

2
weed/storage/needle.go

@ -49,8 +49,6 @@ type Needle struct {
Checksum CRC `comment:"CRC32 to check integrity"` Checksum CRC `comment:"CRC32 to check integrity"`
Padding []byte `comment:"Aligned to 8 bytes"` Padding []byte `comment:"Aligned to 8 bytes"`
rawBlock *Block // underlying supporing []byte, fetched and released into a pool
} }
func (n *Needle) String() (str string) { func (n *Needle) String() (str string) {

75
weed/storage/needle_byte_cache.go

@ -1,80 +1,11 @@
package storage package storage
import ( import (
"fmt"
"os" "os"
"sync/atomic"
"github.com/hashicorp/golang-lru"
"github.com/chrislusf/seaweedfs/weed/util"
) )
var (
EnableBytesCache = true
bytesCache *lru.Cache
bytesPool *util.BytesPool
)
/*
There are one level of caching, and one level of pooling.
In pooling, all []byte are fetched and returned to the pool bytesPool.
In caching, the string~[]byte mapping is cached
*/
func init() {
bytesPool = util.NewBytesPool()
bytesCache, _ = lru.NewWithEvict(512, func(key interface{}, value interface{}) {
value.(*Block).decreaseReference()
})
}
type Block struct {
Bytes []byte
refCount int32
}
func (block *Block) decreaseReference() {
if atomic.AddInt32(&block.refCount, -1) == 0 {
bytesPool.Put(block.Bytes)
}
}
func (block *Block) increaseReference() {
atomic.AddInt32(&block.refCount, 1)
}
// get bytes from the LRU cache of []byte first, then from the bytes pool
// when []byte in LRU cache is evicted, it will be put back to the bytes pool
func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, block *Block, err error) {
// check cache, return if found
cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize)
if EnableBytesCache {
if obj, found := bytesCache.Get(cacheKey); found {
block = obj.(*Block)
block.increaseReference()
dataSlice = block.Bytes[0:readSize]
return dataSlice, block, nil
}
}
// get the []byte from pool
b := bytesPool.Get(readSize)
// refCount = 2, one by the bytesCache, one by the actual needle object
block = &Block{Bytes: b, refCount: 2}
dataSlice = block.Bytes[0:readSize]
func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, err error) {
dataSlice = make([]byte, readSize)
_, err = r.ReadAt(dataSlice, offset) _, err = r.ReadAt(dataSlice, offset)
if EnableBytesCache {
bytesCache.Add(cacheKey, block)
}
return dataSlice, block, err
}
func (n *Needle) ReleaseMemory() {
if n.rawBlock != nil {
n.rawBlock.decreaseReference()
}
}
func ReleaseBytes(b []byte) {
bytesPool.Put(b)
return dataSlice, err
} }

5
weed/storage/needle_read_write.go

@ -151,16 +151,15 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
} }
func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) {
func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, err error) {
return getBytesForFileBlock(r, offset, int(getActualSize(size))) return getBytesForFileBlock(r, offset, int(getActualSize(size)))
} }
func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
bytes, block, err := ReadNeedleBlob(r, offset, size)
bytes, err := ReadNeedleBlob(r, offset, size)
if err != nil { if err != nil {
return err return err
} }
n.rawBlock = block
n.ParseNeedleHeader(bytes) n.ParseNeedleHeader(bytes)
if n.Size != size { if n.Size != size {
return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size) return fmt.Errorf("File Entry Not Found. Needle %d Memory %d", n.Size, size)

2
weed/storage/volume_read_write.go

@ -25,7 +25,6 @@ func (v *Volume) isFileUnchanged(n *Needle) bool {
glog.V(0).Infof("Failed to check updated file %v", err) glog.V(0).Infof("Failed to check updated file %v", err)
return false return false
} }
defer oldNeedle.ReleaseMemory()
if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) { if oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
n.DataSize = oldNeedle.DataSize n.DataSize = oldNeedle.DataSize
return true return true
@ -172,7 +171,6 @@ func (v *Volume) readNeedle(n *Needle) (int, error) {
if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) { if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
return bytesRead, nil return bytesRead, nil
} }
n.ReleaseMemory()
return -1, errors.New("Not Found") return -1, errors.New("Not Found")
} }

3
weed/storage/volume_vacuum.go

@ -178,7 +178,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 { if incre_idx_entry.offset != 0 && incre_idx_entry.size != 0 {
//even the needle cache in memory is hit, the need_bytes is correct //even the needle cache in memory is hit, the need_bytes is correct
var needle_bytes []byte var needle_bytes []byte
needle_bytes, _, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size)
needle_bytes, err = ReadNeedleBlob(oldDatFile, int64(incre_idx_entry.offset)*NeedlePaddingSize, incre_idx_entry.size)
if err != nil { if err != nil {
return return
} }
@ -291,7 +291,6 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
n := new(Needle) n := new(Needle)
n.ReadData(v.dataFile, int64(offset)*NeedlePaddingSize, size, v.Version()) n.ReadData(v.dataFile, int64(offset)*NeedlePaddingSize, size, v.Version())
defer n.ReleaseMemory()
if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
return nil return nil

127
weed/util/bytes_pool.go

@ -1,127 +0,0 @@
package util
import (
"bytes"
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
ChunkSizes = []int{
1 << 4, // index 0, 16 bytes, inclusive
1 << 6, // index 1, 64 bytes
1 << 8, // index 2, 256 bytes
1 << 10, // index 3, 1K bytes
1 << 12, // index 4, 4K bytes
1 << 14, // index 5, 16K bytes
1 << 16, // index 6, 64K bytes
1 << 18, // index 7, 256K bytes
1 << 20, // index 8, 1M bytes
1 << 22, // index 9, 4M bytes
1 << 24, // index 10, 16M bytes
1 << 26, // index 11, 64M bytes
1 << 28, // index 12, 128M bytes
}
_DEBUG = false
)
type BytesPool struct {
chunkPools []*byteChunkPool
}
func NewBytesPool() *BytesPool {
var bp BytesPool
for _, size := range ChunkSizes {
bp.chunkPools = append(bp.chunkPools, newByteChunkPool(size))
}
ret := &bp
if _DEBUG {
t := time.NewTicker(10 * time.Second)
go func() {
for {
println("buffer:", ret.String())
<-t.C
}
}()
}
return ret
}
func (m *BytesPool) String() string {
var buf bytes.Buffer
for index, size := range ChunkSizes {
if m.chunkPools[index].count > 0 {
buf.WriteString(fmt.Sprintf("size:%d count:%d\n", size, m.chunkPools[index].count))
}
}
return buf.String()
}
func findChunkPoolIndex(size int) int {
if size <= 0 {
return -1
}
size = (size - 1) >> 4
ret := 0
for size > 0 {
size = size >> 2
ret = ret + 1
}
if ret >= len(ChunkSizes) {
return -1
}
return ret
}
func (m *BytesPool) Get(size int) []byte {
index := findChunkPoolIndex(size)
// println("get index:", index)
if index < 0 {
return make([]byte, size)
}
return m.chunkPools[index].Get()
}
func (m *BytesPool) Put(b []byte) {
index := findChunkPoolIndex(len(b))
// println("put index:", index)
if index < 0 {
return
}
m.chunkPools[index].Put(b)
}
// a pool of fix-sized []byte chunks. The pool size is managed by Go GC
type byteChunkPool struct {
sync.Pool
chunkSizeLimit int
count int64
}
var count int
func newByteChunkPool(chunkSizeLimit int) *byteChunkPool {
var m byteChunkPool
m.chunkSizeLimit = chunkSizeLimit
m.Pool.New = func() interface{} {
count++
// println("creating []byte size", m.chunkSizeLimit, "new", count, "count", m.count)
return make([]byte, m.chunkSizeLimit)
}
return &m
}
func (m *byteChunkPool) Get() []byte {
// println("before get size:", m.chunkSizeLimit, "count:", m.count)
atomic.AddInt64(&m.count, 1)
return m.Pool.Get().([]byte)
}
func (m *byteChunkPool) Put(b []byte) {
atomic.AddInt64(&m.count, -1)
// println("after put get size:", m.chunkSizeLimit, "count:", m.count)
m.Pool.Put(b)
}

41
weed/util/bytes_pool_test.go

@ -1,41 +0,0 @@
package util
import (
"testing"
)
func TestTTLReadWrite(t *testing.T) {
var tests = []struct {
n int // input
expected int // expected result
}{
{0, -1},
{1, 0},
{1 << 4, 0},
{1 << 6, 1},
{1 << 8, 2},
{1 << 10, 3},
{1 << 12, 4},
{1 << 14, 5},
{1 << 16, 6},
{1 << 18, 7},
{1<<4 + 1, 1},
{1<<6 + 1, 2},
{1<<8 + 1, 3},
{1<<10 + 1, 4},
{1<<12 + 1, 5},
{1<<14 + 1, 6},
{1<<16 + 1, 7},
{1<<18 + 1, 8},
{1<<28 - 1, 12},
{1 << 28, 12},
{1<<28 + 2134, -1},
{1080, 4},
}
for _, tt := range tests {
actual := findChunkPoolIndex(tt.n)
if actual != tt.expected {
t.Errorf("findChunkPoolIndex(%d): expected %d, actual %d", tt.n, tt.expected, actual)
}
}
}
Loading…
Cancel
Save