diff --git a/weed/command/backup.go b/weed/command/backup.go index 65d2a745b..9849f566e 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -112,7 +112,7 @@ func runBackup(cmd *Command, args []string) bool { return true } } - v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, false) + v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true @@ -137,7 +137,7 @@ func runBackup(cmd *Command, args []string) bool { // remove the old data v.Destroy() // recreate an empty volume - v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, false) + v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true diff --git a/weed/command/compact.go b/weed/command/compact.go index f10b0b2bc..03f1a14db 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -38,7 +38,7 @@ func runCompact(cmd *Command, args []string) bool { vid := needle.VolumeId(*compactVolumeId) v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, - storage.NeedleMapInMemory, nil, nil, preallocate, false) + storage.NeedleMapInMemory, nil, nil, preallocate, 0) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/weed/pb/master.proto b/weed/pb/master.proto index 2bc05e2f0..f4230d029 100644 --- a/weed/pb/master.proto +++ b/weed/pb/master.proto @@ -139,7 +139,7 @@ message AssignRequest { string data_center = 5; string rack = 6; string data_node = 7; - bool in_memory = 8; + uint32 MemoryMapMaxSizeMB = 8; } message AssignResponse { string fid = 1; diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go index ed7ef8082..a5d665289 100644 --- a/weed/pb/master_pb/master.pb.go +++ b/weed/pb/master_pb/master.pb.go @@ -651,14 +651,14 @@ func (m *Location) GetPublicUrl() string { } type AssignRequest struct { - Count uint64 `protobuf:"varint,1,opt,name=count" json:"count,omitempty"` - Replication string `protobuf:"bytes,2,opt,name=replication" json:"replication,omitempty"` - Collection string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"` - Ttl string `protobuf:"bytes,4,opt,name=ttl" json:"ttl,omitempty"` - DataCenter string `protobuf:"bytes,5,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"` - Rack string `protobuf:"bytes,6,opt,name=rack" json:"rack,omitempty"` - DataNode string `protobuf:"bytes,7,opt,name=data_node,json=dataNode" json:"data_node,omitempty"` - InMemory bool `protobuf:"bytes,4,opt,name=inmemory" json:"inmemory,omitempty"` + Count uint64 `protobuf:"varint,1,opt,name=count" json:"count,omitempty"` + Replication string `protobuf:"bytes,2,opt,name=replication" json:"replication,omitempty"` + Collection string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"` + Ttl string `protobuf:"bytes,4,opt,name=ttl" json:"ttl,omitempty"` + DataCenter string `protobuf:"bytes,5,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"` + Rack string `protobuf:"bytes,6,opt,name=rack" json:"rack,omitempty"` + DataNode string `protobuf:"bytes,7,opt,name=data_node,json=dataNode" json:"data_node,omitempty"` + MemoryMapMaxSizeMB uint32 `protobuf:"varint,8,opt,name=memorymapmaxsizemb" json:"memorymapmaxsizemb,omitempty"` } func (m *AssignRequest) Reset() { *m = AssignRequest{} } @@ -715,11 +715,11 @@ func (m *AssignRequest) GetDataNode() string { return "" } -func (m *AssignRequest) GetInMemory() bool { +func (m *AssignRequest) GetMemoryMapMaxSizeMB() uint32 { if m != nil { - return m.InMemory + return m.MemoryMapMaxSizeMB } - return false + return 0 } type AssignResponse struct { diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index bee054cd5..04114981d 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -127,7 +127,7 @@ message AllocateVolumeRequest { int64 preallocate = 3; string replication = 4; string ttl = 5; - bool inmemory = 6; + int32 memorymapmaxsizemb = 6; } message AllocateVolumeResponse { } diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index 897ad7fbb..addaf9a65 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -316,12 +316,12 @@ func (*DeleteCollectionResponse) ProtoMessage() {} func (*DeleteCollectionResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } type AllocateVolumeRequest struct { - VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` - Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"` - Preallocate int64 `protobuf:"varint,3,opt,name=preallocate" json:"preallocate,omitempty"` - Replication string `protobuf:"bytes,4,opt,name=replication" json:"replication,omitempty"` - Ttl string `protobuf:"bytes,5,opt,name=ttl" json:"ttl,omitempty"` - InMemory bool `protobuf:"varint,6,opt,name=inmemory" json:"inmemory,omitempty"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"` + Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"` + Preallocate int64 `protobuf:"varint,3,opt,name=preallocate" json:"preallocate,omitempty"` + Replication string `protobuf:"bytes,4,opt,name=replication" json:"replication,omitempty"` + Ttl string `protobuf:"bytes,5,opt,name=ttl" json:"ttl,omitempty"` + MemoryMapMaxSizeMB uint32 `protobuf:"varint,6,opt,name=memorymapmaxsizemb" json:"memorymapmaxsizemb,omitempty"` } func (m *AllocateVolumeRequest) Reset() { *m = AllocateVolumeRequest{} } @@ -364,11 +364,11 @@ func (m *AllocateVolumeRequest) GetTtl() string { return "" } -func (m *AllocateVolumeRequest) GetInMemory() bool { +func (m *AllocateVolumeRequest) GetMemoryMapMaxSizeMB() uint32 { if m != nil { - return m.InMemory + return m.MemoryMapMaxSizeMB } - return false + return 0 } type AllocateVolumeResponse struct { @@ -1978,6 +1978,19 @@ func _VolumeServer_DeleteCollection_Handler(srv interface{}, ctx context.Context func _VolumeServer_AllocateVolume_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(AllocateVolumeRequest) + + if in.MemoryMapMaxSizeMB > 0 { + test := 6 + test += 5 + } + if in.MemoryMapMaxSizeMB == 77 { + test := 7 + test += 656 + } + if in.Ttl != "" { + test := 2345 + test += 567 + } if err := dec(in); err != nil { return nil, err } diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 286ca355c..b94895798 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -62,13 +62,14 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest } option := &topology.VolumeGrowOption{ - Collection: req.Collection, - ReplicaPlacement: replicaPlacement, - Ttl: ttl, - Prealloacte: ms.preallocateSize, - DataCenter: req.DataCenter, - Rack: req.Rack, - DataNode: req.DataNode, + Collection: req.Collection, + ReplicaPlacement: replicaPlacement, + Ttl: ttl, + Prealloacte: ms.preallocateSize, + DataCenter: req.DataCenter, + Rack: req.Rack, + DataNode: req.DataNode, + MemoryMapMaxSizeMB: req.MemoryMapMaxSizeMB, } if !ms.Topo.HasWritableVolume(option) { diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 4bcff0822..553bfd181 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -148,6 +148,11 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if err != nil { return nil, err } + memoryMapMaxSizeMB, err := needle.ReadMemoryMapMaxSizeMB(r.FormValue("memorymapmaxsizemb")) + if err != nil { + return nil, err + } + preallocate := ms.preallocateSize if r.FormValue("preallocate") != "" { preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64) @@ -156,13 +161,14 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr } } volumeGrowOption := &topology.VolumeGrowOption{ - Collection: r.FormValue("collection"), - ReplicaPlacement: replicaPlacement, - Ttl: ttl, - Prealloacte: preallocate, - DataCenter: r.FormValue("dataCenter"), - Rack: r.FormValue("rack"), - DataNode: r.FormValue("dataNode"), + Collection: r.FormValue("collection"), + ReplicaPlacement: replicaPlacement, + Ttl: ttl, + Prealloacte: preallocate, + DataCenter: r.FormValue("dataCenter"), + Rack: r.FormValue("rack"), + DataNode: r.FormValue("dataNode"), + MemoryMapMaxSizeMB: memoryMapMaxSizeMB, } return volumeGrowOption, nil } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index f52f9fd2d..4118437b6 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -35,7 +35,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p req.Replication, req.Ttl, req.Preallocate, - req.InMemory, + req.MemoryMapMaxSizeMB, ) if err != nil { diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index a1104ed8e..a8f3ec11e 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -60,7 +60,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne _, found := l.volumes[vid] l.RUnlock() if !found { - if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, false); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0); e == nil { l.Lock() l.volumes[vid] = v l.Unlock() diff --git a/weed/storage/needle/volume_memory_map_max_size.go b/weed/storage/needle/volume_memory_map_max_size.go new file mode 100644 index 000000000..ee6c6ede6 --- /dev/null +++ b/weed/storage/needle/volume_memory_map_max_size.go @@ -0,0 +1,14 @@ +package needle + +import "strconv" + +func ReadMemoryMapMaxSizeMB(MemoryMapMaxSizeMBString string) (uint32, error) { + if MemoryMapMaxSizeMBString == "" { + return 0, nil + } + var MemoryMapMaxSize64 uint64 + var err error + MemoryMapMaxSize64, err = strconv.ParseUint(MemoryMapMaxSizeMBString, 10, 32) + MemoryMapMaxSize := uint32(MemoryMapMaxSize64) + return MemoryMapMaxSize, err +} diff --git a/weed/storage/needle/volume_memory_map_max_size_test.go b/weed/storage/needle/volume_memory_map_max_size_test.go new file mode 100644 index 000000000..ee9923f2b --- /dev/null +++ b/weed/storage/needle/volume_memory_map_max_size_test.go @@ -0,0 +1,10 @@ +package needle + +import "testing" + +func TestMemoryMapMaxSizeReadWrite(t *testing.T) { + memoryMapSize, _ := ReadMemoryMapMaxSizeMB("5000") + if memoryMapSize != 5000 { + t.Errorf("empty memoryMapSize:%v", memoryMapSize) + } +} diff --git a/weed/storage/store.go b/weed/storage/store.go index 89b92f229..abb4d6264 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -59,7 +59,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di return } -func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, memoryMapped bool) error { +func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, memoryMapped uint32) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -101,7 +101,7 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapped bool) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapped uint32) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index efe0328cd..7d1a2802a 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -25,7 +25,7 @@ type Volume struct { nm NeedleMapper needleMapKind NeedleMapType readOnly bool - MemoryMapped bool + MemoryMapped uint32 SuperBlock @@ -39,7 +39,7 @@ type Volume struct { isCompacting bool } -func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapped bool) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapped uint32) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapped: memoryMapped} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go index a3d889de2..d1935f0cd 100644 --- a/weed/storage/volume_create.go +++ b/weed/storage/volume_create.go @@ -8,7 +8,7 @@ import ( "github.com/joeslay/seaweedfs/weed/glog" ) -func createVolumeFile(fileName string, preallocate int64, useMemoryMap bool) (*os.File, error) { +func createVolumeFile(fileName string, preallocate int64, useMemoryMap uint32) (*os.File, error) { file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if preallocate > 0 { glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) diff --git a/weed/storage/volume_create_linux.go b/weed/storage/volume_create_linux.go index 47c00fc66..f21267f01 100644 --- a/weed/storage/volume_create_linux.go +++ b/weed/storage/volume_create_linux.go @@ -9,7 +9,7 @@ import ( "github.com/joeslay/seaweedfs/weed/glog" ) -func createVolumeFile(fileName string, preallocate int64, useMemoryMap bool) (file *os.File, e error) { +func createVolumeFile(fileName string, preallocate int64, useMemoryMap uint32) (file *os.File, e error) { file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if preallocate != 0 { syscall.Fallocate(int(file.Fd()), 1, 0, preallocate) diff --git a/weed/storage/volume_create_windows.go b/weed/storage/volume_create_windows.go index 494b13728..92f4523ba 100644 --- a/weed/storage/volume_create_windows.go +++ b/weed/storage/volume_create_windows.go @@ -12,12 +12,12 @@ import ( "github.com/joeslay/seaweedfs/weed/os_overloads" ) -func createVolumeFile(fileName string, preallocate int64, useMemoryMap bool) (*os.File, error) { +func createVolumeFile(fileName string, preallocate int64, useMemoryMap uint32) (*os.File, error) { mMap, exists := memory_map.FileMemoryMap[fileName] if !exists { - file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, useMemoryMap) - if useMemoryMap { + file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, useMemoryMap > 0) + if useMemoryMap > 0 { memory_map.FileMemoryMap[fileName] = new(memory_map.MemoryMap) new_mMap := memory_map.FileMemoryMap[fileName] diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 48b3958d5..301c8bb0e 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -23,7 +23,7 @@ func (v *Volume) garbageLevel() float64 { func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error { - if !v.MemoryMapped { //it makes no sense to compact in memory + if v.MemoryMapped > 0 { //it makes no sense to compact in memory glog.V(3).Infof("Compacting volume %d ...", v.Id) //no need to lock for copy on write //v.accessLock.Lock() @@ -46,7 +46,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error func (v *Volume) Compact2() error { - if !v.MemoryMapped { //it makes no sense to compact in memory + if v.MemoryMapped > 0 { //it makes no sense to compact in memory glog.V(3).Infof("Compact2 volume %d ...", v.Id) v.isCompacting = true @@ -63,7 +63,7 @@ func (v *Volume) Compact2() error { } func (v *Volume) CommitCompact() error { - if !v.MemoryMapped { //it makes no sense to compact in memory + if v.MemoryMapped>0 { //it makes no sense to compact in memory glog.V(0).Infof("Committing volume %d vacuuming...", v.Id) v.isCompacting = true diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index 91a67681a..9dfd46825 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -18,12 +18,12 @@ func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.Vol return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{ - VolumeId: uint32(vid), - Collection: option.Collection, - Replication: option.ReplicaPlacement.String(), - Ttl: option.Ttl.String(), - Preallocate: option.Prealloacte, - InMemory: option.InMemory, + VolumeId: uint32(vid), + Collection: option.Collection, + Replication: option.ReplicaPlacement.String(), + Ttl: option.Ttl.String(), + Preallocate: option.Prealloacte, + MemoryMapMaxSizeMB: option.MemoryMapMaxSizeMB, }) return deleteErr }) diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index b9b2b69f7..e4446ade0 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -21,14 +21,14 @@ This package is created to resolve these replica placement issues: */ type VolumeGrowOption struct { - Collection string - ReplicaPlacement *storage.ReplicaPlacement - Ttl *needle.TTL - Prealloacte int64 - DataCenter string - Rack string - DataNode string - InMemory bool + Collection string + ReplicaPlacement *storage.ReplicaPlacement + Ttl *needle.TTL + Prealloacte int64 + DataCenter string + Rack string + DataNode string + MemoryMapMaxSizeMB uint32 } type VolumeGrowth struct {