Browse Source

Merge branch 'support_ssd_volume'

pull/1794/head
Chris Lu 4 years ago
parent
commit
2bf5ea87d6
  1. 13
      other/java/client/src/main/proto/filer.proto
  2. 3
      weed/command/benchmark.go
  3. 4
      weed/command/filer_copy.go
  4. 18
      weed/command/filer_sync.go
  5. 2
      weed/command/mount.go
  6. 8
      weed/command/mount_std.go
  7. 1
      weed/command/server.go
  8. 6
      weed/command/upload.go
  9. 23
      weed/command/volume.go
  10. 1
      weed/filer/entry.go
  11. 2
      weed/filer/entry_codec.go
  12. 2
      weed/filer/filer_conf.go
  13. 3
      weed/filesys/wfs.go
  14. 1
      weed/filesys/wfs_write.go
  15. 5
      weed/operation/assign_file_id.go
  16. 7
      weed/operation/submit.go
  17. 13
      weed/pb/filer.proto
  18. 1086
      weed/pb/filer_pb/filer.pb.go
  19. 19
      weed/pb/master.proto
  20. 1150
      weed/pb/master_pb/master.pb.go
  21. 3
      weed/pb/volume_server.proto
  22. 1430
      weed/pb/volume_server_pb/volume_server.pb.go
  23. 1
      weed/replication/sink/filersink/fetch_write.go
  24. 5
      weed/replication/sink/filersink/filer_sink.go
  25. 1
      weed/server/common.go
  26. 6
      weed/server/filer_grpc_server.go
  27. 8
      weed/server/filer_server_handlers_write.go
  28. 1
      weed/server/filer_server_handlers_write_autochunk.go
  29. 1
      weed/server/filer_server_handlers_write_cipher.go
  30. 8
      weed/server/master_grpc_server.go
  31. 12
      weed/server/master_grpc_server_volume.go
  32. 2
      weed/server/master_server_handlers.go
  33. 12
      weed/server/master_server_handlers_admin.go
  34. 2
      weed/server/volume_grpc_admin.go
  35. 11
      weed/server/volume_grpc_copy.go
  36. 2
      weed/server/volume_grpc_erasure_coding.go
  37. 4
      weed/server/volume_server.go
  38. 8
      weed/server/volume_server_handlers_admin.go
  39. 4
      weed/server/volume_server_handlers_ui.go
  40. 2
      weed/server/volume_server_ui/templates.go
  41. 2
      weed/server/webdav_server.go
  42. 2
      weed/shell/command_fs_configure.go
  43. 89
      weed/shell/command_volume_balance.go
  44. 2
      weed/shell/command_volume_server_evacuate.go
  45. 11
      weed/storage/disk_location.go
  46. 48
      weed/storage/store.go
  47. 5
      weed/storage/volume.go
  48. 4
      weed/storage/volume_info.go
  49. 23
      weed/storage/volume_type.go
  50. 1
      weed/topology/allocate_volume.go
  51. 19
      weed/topology/collection.go
  52. 3
      weed/topology/data_center.go
  53. 24
      weed/topology/data_node.go
  54. 66
      weed/topology/node.go
  55. 6
      weed/topology/rack.go
  56. 32
      weed/topology/topology.go
  57. 8
      weed/topology/topology_event_handling.go
  58. 6
      weed/topology/topology_map.go
  59. 29
      weed/topology/topology_test.go
  60. 31
      weed/topology/volume_growth.go
  61. 4
      weed/topology/volume_layout.go

13
other/java/client/src/main/proto/filer.proto

@ -156,6 +156,7 @@ message FuseAttributes {
repeated string group_name = 12; // for hdfs
string symlink_target = 13;
bytes md5 = 14;
string disk_type = 15;
}
message CreateEntryRequest {
@ -220,6 +221,7 @@ message AssignVolumeRequest {
string data_center = 5;
string path = 6;
string rack = 7;
string disk_type = 8;
}
message AssignVolumeResponse {
@ -270,11 +272,9 @@ message StatisticsRequest {
string replication = 1;
string collection = 2;
string ttl = 3;
string disk_type = 4;
}
message StatisticsResponse {
string replication = 1;
string collection = 2;
string ttl = 3;
uint64 total_size = 4;
uint64 used_size = 5;
uint64 file_count = 6;
@ -358,12 +358,7 @@ message FilerConf {
string collection = 2;
string replication = 3;
string ttl = 4;
enum DiskType {
NONE = 0;
HDD = 1;
SSD = 2;
}
DiskType disk_type = 5;
string disk_type = 5;
bool fsync = 6;
uint32 volume_growth_count = 7;
}

3
weed/command/benchmark.go

@ -35,6 +35,7 @@ type BenchmarkOptions struct {
sequentialRead *bool
collection *string
replication *string
diskType *string
cpuprofile *string
maxCpu *int
grpcDialOption grpc.DialOption
@ -62,6 +63,7 @@ func init() {
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd] choose between hard drive or solid state drive")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
@ -234,6 +236,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
Count: 1,
Collection: *b.collection,
Replication: *b.replication,
DiskType: *b.diskType,
}
if assignResult, err := operation.Assign(b.masterClient.GetMaster(), b.grpcDialOption, ar); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection

4
weed/command/filer_copy.go

@ -37,6 +37,7 @@ type CopyOptions struct {
replication *string
collection *string
ttl *string
diskType *string
maxMB *int
masterClient *wdclient.MasterClient
concurrenctFiles *int
@ -54,6 +55,7 @@ func init() {
copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd] choose between hard drive or solid state drive")
copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
@ -311,6 +313,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
DiskType: *worker.options.diskType,
Path: task.destinationUrlPath,
}
@ -405,6 +408,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
DiskType: *worker.options.diskType,
Path: task.destinationUrlPath + fileName,
}

18
weed/command/filer_sync.go

@ -31,6 +31,8 @@ type SyncOptions struct {
bCollection *string
aTtlSec *int
bTtlSec *int
aDiskType *string
bDiskType *string
aDebug *bool
bDebug *bool
aProxyByFiler *bool
@ -56,6 +58,8 @@ func init() {
syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd] choose between hard drive or solid state drive on filer A")
syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd] choose between hard drive or solid state drive on filer B")
syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
@ -90,9 +94,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
go func() {
for {
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler,
*syncOptions.filerB, *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler,
*syncOptions.bDebug)
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, *syncOptions.filerB,
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond)
@ -103,9 +106,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
if !*syncOptions.isActivePassive {
go func() {
for {
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler,
*syncOptions.filerA, *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler,
*syncOptions.aDebug)
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, *syncOptions.filerA,
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond)
@ -120,7 +122,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
}
func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler, debug bool) error {
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error {
// read source filer signature
sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
@ -146,7 +148,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
filerSource := &source.FilerSource{}
filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler)
filerSink := &filersink.FilerSink{}
filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption, sinkWriteChunkByFiler)
filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {

2
weed/command/mount.go

@ -12,6 +12,7 @@ type MountOptions struct {
dirAutoCreate *bool
collection *string
replication *string
diskType *string
ttlSec *int
chunkSizeLimitMB *int
concurrentWriters *int
@ -41,6 +42,7 @@ func init() {
mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.diskType = cmdMount.Flag.String("disk", "", "[hdd|ssd] choose between hard drive or solid state drive")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 128, "limit concurrent goroutine writers if not 0")

8
weed/command/mount_std.go

@ -5,6 +5,7 @@ package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage"
"os"
"os/user"
"path"
@ -168,6 +169,12 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
mountRoot = mountRoot[0 : len(mountRoot)-1]
}
diskType, err := storage.ToDiskType(*option.diskType)
if err != nil {
fmt.Printf("failed to parse volume type: %v\n", err)
return false
}
seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
MountDirectory: dir,
FilerAddress: filer,
@ -177,6 +184,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
Collection: *option.collection,
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
DiskType: diskType,
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
ConcurrentWriters: *option.concurrentWriters,
CacheDir: *option.cacheDir,

1
weed/command/server.go

@ -102,6 +102,7 @@ func init() {
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd] choose between hard drive or solid state drive")
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")

6
weed/command/upload.go

@ -27,6 +27,7 @@ type UploadOptions struct {
collection *string
dataCenter *string
ttl *string
diskType *string
maxMB *int
usePublicUrl *bool
}
@ -40,6 +41,7 @@ func init() {
upload.replication = cmdUpload.Flag.String("replication", "", "replication type")
upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name")
upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name")
upload.diskType = cmdUpload.Flag.String("disk", "", "[hdd|ssd] choose between hard drive or solid state drive")
upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit")
upload.usePublicUrl = cmdUpload.Flag.Bool("usePublicUrl", false, "upload to public url from volume server")
@ -94,7 +96,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, *upload.usePublicUrl)
results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@ -111,7 +113,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
fmt.Println(e.Error())
}
results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.maxMB, *upload.usePublicUrl)
results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
}

23
weed/command/volume.go

@ -49,6 +49,7 @@ type VolumeServerOptions struct {
rack *string
whiteList []string
indexType *string
diskType *string
fixJpgOrientation *bool
readRedirect *bool
cpuProfile *string
@ -76,6 +77,7 @@ func init() {
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd] choose between hard drive or solid state drive")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.")
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
@ -167,6 +169,25 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
glog.Fatalf("%d directories by -dir, but only %d minFreeSpacePercent is set by -minFreeSpacePercent", len(v.folders), len(v.minFreeSpacePercents))
}
// set disk types
var diskTypes []storage.DiskType
diskTypeStrings := strings.Split(*v.diskType, ",")
for _, diskTypeString := range diskTypeStrings {
if diskType, err := storage.ToDiskType(diskTypeString); err == nil {
diskTypes = append(diskTypes, diskType)
} else {
glog.Fatalf("failed to parse volume type: %v", err)
}
}
if len(diskTypes) == 1 && len(v.folders) > 1 {
for i := 0; i < len(v.folders)-1; i++ {
diskTypes = append(diskTypes, diskTypes[0])
}
}
if len(v.folders) != len(diskTypes) {
glog.Fatalf("%d directories by -dir, but only %d disk types is set by -disk", len(v.folders), len(diskTypes))
}
// security related white list configuration
if volumeWhiteListOption != "" {
v.whiteList = strings.Split(volumeWhiteListOption, ",")
@ -212,7 +233,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl,
v.folders, v.folderMaxLimits, v.minFreeSpacePercents,
v.folders, v.folderMaxLimits, v.minFreeSpacePercents, diskTypes,
*v.idxFolder,
volumeNeedleMapKind,
strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,

1
weed/filer/entry.go

@ -18,6 +18,7 @@ type Attr struct {
Replication string // replication
Collection string // collection name
TtlSec int32 // ttl in seconds
DiskType string
UserName string
GroupNames []string
SymlinkTarget string

2
weed/filer/entry_codec.go

@ -56,6 +56,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
Collection: entry.Attr.Collection,
Replication: entry.Attr.Replication,
TtlSec: entry.Attr.TtlSec,
DiskType: entry.Attr.DiskType,
UserName: entry.Attr.UserName,
GroupName: entry.Attr.GroupNames,
SymlinkTarget: entry.Attr.SymlinkTarget,
@ -81,6 +82,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
t.Collection = attr.Collection
t.Replication = attr.Replication
t.TtlSec = attr.TtlSec
t.DiskType = attr.DiskType
t.UserName = attr.UserName
t.GroupNames = attr.GroupName
t.SymlinkTarget = attr.SymlinkTarget

2
weed/filer/filer_conf.go

@ -116,7 +116,7 @@ func mergePathConf(a, b *filer_pb.FilerConf_PathConf) {
a.Collection = util.Nvl(b.Collection, a.Collection)
a.Replication = util.Nvl(b.Replication, a.Replication)
a.Ttl = util.Nvl(b.Ttl, a.Ttl)
if b.DiskType != filer_pb.FilerConf_PathConf_NONE {
if b.DiskType != "" {
a.DiskType = b.DiskType
}
a.Fsync = b.Fsync || a.Fsync

3
weed/filesys/wfs.go

@ -3,6 +3,7 @@ package filesys
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
@ -34,6 +35,7 @@ type Option struct {
Collection string
Replication string
TtlSec int32
DiskType storage.DiskType
ChunkSizeLimit int64
ConcurrentWriters int
CacheDir string
@ -194,6 +196,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
Collection: wfs.option.Collection,
Replication: wfs.option.Replication,
Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
DiskType: string(wfs.option.DiskType),
}
glog.V(4).Infof("reading filer stats: %+v", request)

1
weed/filesys/wfs_write.go

@ -26,6 +26,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
Replication: wfs.option.Replication,
Collection: wfs.option.Collection,
TtlSec: wfs.option.TtlSec,
DiskType: string(wfs.option.DiskType),
DataCenter: wfs.option.DataCenter,
Path: string(fullPath),
}

5
weed/operation/assign_file_id.go

@ -18,6 +18,7 @@ type VolumeAssignRequest struct {
Replication string
Collection string
Ttl string
DiskType string
DataCenter string
Rack string
DataNode string
@ -54,6 +55,7 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum
Replication: request.Replication,
Collection: request.Collection,
Ttl: request.Ttl,
DiskType: request.DiskType,
DataCenter: request.DataCenter,
Rack: request.Rack,
DataNode: request.DataNode,
@ -105,6 +107,7 @@ func LookupJwt(master string, fileId string) security.EncodedJwt {
type StorageOption struct {
Replication string
DiskType string
Collection string
DataCenter string
Rack string
@ -123,6 +126,7 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a
Replication: so.Replication,
Collection: so.Collection,
Ttl: so.TtlString(),
DiskType: so.DiskType,
DataCenter: so.DataCenter,
Rack: so.Rack,
WritableVolumeCount: so.VolumeGrowthCount,
@ -133,6 +137,7 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a
Replication: so.Replication,
Collection: so.Collection,
Ttl: so.TtlString(),
DiskType: so.DiskType,
DataCenter: "",
Rack: "",
WritableVolumeCount: so.VolumeGrowthCount,

7
weed/operation/submit.go

@ -25,6 +25,7 @@ type FilePart struct {
Collection string
DataCenter string
Ttl string
DiskType string
Server string //this comes from assign result
Fid string //this comes from assign result, but customizable
Fsync bool
@ -38,7 +39,7 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
@ -49,6 +50,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
Collection: collection,
DataCenter: dataCenter,
Ttl: ttl,
DiskType: diskType,
}
ret, err := Assign(master, grpcDialOption, ar)
if err != nil {
@ -70,6 +72,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
file.Collection = collection
file.DataCenter = dataCenter
file.Ttl = ttl
file.DiskType = diskType
results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
@ -143,6 +146,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
Replication: fi.Replication,
Collection: fi.Collection,
Ttl: fi.Ttl,
DiskType: fi.DiskType,
}
ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
@ -156,6 +160,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
Replication: fi.Replication,
Collection: fi.Collection,
Ttl: fi.Ttl,
DiskType: fi.DiskType,
}
ret, err = Assign(master, grpcDialOption, ar)
if err != nil {

13
weed/pb/filer.proto

@ -156,6 +156,7 @@ message FuseAttributes {
repeated string group_name = 12; // for hdfs
string symlink_target = 13;
bytes md5 = 14;
string disk_type = 15;
}
message CreateEntryRequest {
@ -220,6 +221,7 @@ message AssignVolumeRequest {
string data_center = 5;
string path = 6;
string rack = 7;
string disk_type = 8;
}
message AssignVolumeResponse {
@ -270,11 +272,9 @@ message StatisticsRequest {
string replication = 1;
string collection = 2;
string ttl = 3;
string disk_type = 4;
}
message StatisticsResponse {
string replication = 1;
string collection = 2;
string ttl = 3;
uint64 total_size = 4;
uint64 used_size = 5;
uint64 file_count = 6;
@ -358,12 +358,7 @@ message FilerConf {
string collection = 2;
string replication = 3;
string ttl = 4;
enum DiskType {
NONE = 0;
HDD = 1;
SSD = 2;
}
DiskType disk_type = 5;
string disk_type = 5;
bool fsync = 6;
uint32 volume_growth_count = 7;
}

1086
weed/pb/filer_pb/filer.pb.go
File diff suppressed because it is too large
View File

19
weed/pb/master.proto

@ -44,7 +44,6 @@ message Heartbeat {
string ip = 1;
uint32 port = 2;
string public_url = 3;
uint32 max_volume_count = 4;
uint64 max_file_key = 5;
string data_center = 6;
string rack = 7;
@ -62,6 +61,9 @@ message Heartbeat {
repeated VolumeEcShardInformationMessage deleted_ec_shards = 18;
bool has_no_ec_shards = 19;
uint32 max_volume_count = 4;
uint32 max_ssd_volume_count = 20;
}
message HeartbeatResponse {
@ -87,6 +89,7 @@ message VolumeInformationMessage {
int64 modified_at_second = 12;
string remote_storage_name = 13;
string remote_storage_key = 14;
string disk_type = 15;
}
message VolumeShortInformationMessage {
@ -95,6 +98,7 @@ message VolumeShortInformationMessage {
uint32 replica_placement = 8;
uint32 version = 9;
uint32 ttl = 10;
string disk_type = 15;
}
message VolumeEcShardInformationMessage {
@ -163,6 +167,7 @@ message AssignRequest {
string data_node = 7;
uint32 memory_map_max_size_mb = 8;
uint32 Writable_volume_count = 9;
string disk_type = 10;
}
message AssignResponse {
string fid = 1;
@ -177,11 +182,9 @@ message StatisticsRequest {
string replication = 1;
string collection = 2;
string ttl = 3;
string disk_type = 4;
}
message StatisticsResponse {
string replication = 1;
string collection = 2;
string ttl = 3;
uint64 total_size = 4;
uint64 used_size = 5;
uint64 file_count = 6;
@ -219,6 +222,8 @@ message DataNodeInfo {
repeated VolumeInformationMessage volume_infos = 6;
repeated VolumeEcShardInformationMessage ec_shard_infos = 7;
uint64 remote_volume_count = 8;
uint64 max_ssd_volume_count = 9;
uint64 ssd_volume_count = 10;
}
message RackInfo {
string id = 1;
@ -228,6 +233,8 @@ message RackInfo {
uint64 active_volume_count = 5;
repeated DataNodeInfo data_node_infos = 6;
uint64 remote_volume_count = 7;
uint64 max_ssd_volume_count = 8;
uint64 ssd_volume_count = 9;
}
message DataCenterInfo {
string id = 1;
@ -237,6 +244,8 @@ message DataCenterInfo {
uint64 active_volume_count = 5;
repeated RackInfo rack_infos = 6;
uint64 remote_volume_count = 7;
uint64 max_ssd_volume_count = 8;
uint64 ssd_volume_count = 9;
}
message TopologyInfo {
string id = 1;
@ -246,6 +255,8 @@ message TopologyInfo {
uint64 active_volume_count = 5;
repeated DataCenterInfo data_center_infos = 6;
uint64 remote_volume_count = 7;
uint64 max_ssd_volume_count = 8;
uint64 ssd_volume_count = 9;
}
message VolumeListRequest {
}

1150
weed/pb/master_pb/master.pb.go
File diff suppressed because it is too large
View File

3
weed/pb/volume_server.proto

@ -157,6 +157,7 @@ message AllocateVolumeRequest {
string replication = 4;
string ttl = 5;
uint32 memory_map_max_size_mb = 6;
string disk_type = 7;
}
message AllocateVolumeResponse {
}
@ -361,6 +362,7 @@ message ReadVolumeFileStatusResponse {
uint64 file_count = 6;
uint32 compaction_revision = 7;
string collection = 8;
string disk_type = 9;
}
message DiskStatus {
@ -370,6 +372,7 @@ message DiskStatus {
uint64 free = 4;
float percent_free = 5;
float percent_used = 6;
string disk_type = 7;
}
message MemStatus {

1430
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File

1
weed/replication/sink/filersink/fetch_write.go

@ -78,6 +78,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
Collection: fs.collection,
TtlSec: fs.ttlSec,
DataCenter: fs.dataCenter,
DiskType: fs.diskType,
Path: path,
}

5
weed/replication/sink/filersink/filer_sink.go

@ -25,6 +25,7 @@ type FilerSink struct {
replication string
collection string
ttlSec int32
diskType string
dataCenter string
grpcDialOption grpc.DialOption
address string
@ -51,6 +52,7 @@ func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string)
configuration.GetString(prefix+"replication"),
configuration.GetString(prefix+"collection"),
configuration.GetInt(prefix+"ttlSec"),
configuration.GetString(prefix+"disk"),
security.LoadClientTLS(util.GetViper(), "grpc.client"),
false)
}
@ -60,7 +62,7 @@ func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
}
func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
replication string, collection string, ttlSec int, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
fs.address = address
if fs.address == "" {
fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
@ -70,6 +72,7 @@ func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
fs.replication = replication
fs.collection = collection
fs.ttlSec = int32(ttlSec)
fs.diskType = diskType
fs.grpcDialOption = grpcDialOption
fs.writeChunkByFiler = writeChunkByFiler
return nil

1
weed/server/common.go

@ -131,6 +131,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
Replication: r.FormValue("replication"),
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
DiskType: r.FormValue("disk"),
}
assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar)
if ae != nil {

6
weed/server/filer_grpc_server.go

@ -263,6 +263,7 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
newEntry.Attributes.Collection,
newEntry.Attributes.Replication,
newEntry.Attributes.TtlSec,
newEntry.Attributes.DiskType,
"",
"",
)
@ -306,7 +307,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
}
entry.Chunks = append(entry.Chunks, req.Chunks...)
so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, "", "")
so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "")
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.Chunks)
if err != nil {
// not good, but should be ok
@ -332,7 +333,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DataCenter, req.Rack)
so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack)
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
@ -402,6 +403,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
Replication: req.Replication,
Collection: req.Collection,
Ttl: req.Ttl,
DiskType: req.DiskType,
})
if grpcErr != nil {
return grpcErr

8
weed/server/filer_server_handlers_write.go

@ -61,6 +61,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
query.Get("collection"),
query.Get("replication"),
query.Get("ttl"),
query.Get("disk"),
query.Get("dataCenter"),
query.Get("rack"),
)
@ -104,7 +105,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, dataCenter, rack string) *operation.StorageOption {
func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType string, dataCenter, rack string) *operation.StorageOption {
collection := util.Nvl(qCollection, fs.option.Collection)
replication := util.Nvl(qReplication, fs.option.DefaultReplication)
@ -134,17 +135,18 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
DataCenter: util.Nvl(dataCenter, fs.option.DataCenter),
Rack: util.Nvl(rack, fs.option.Rack),
TtlSeconds: ttlSeconds,
DiskType: util.Nvl(diskType, rule.DiskType),
Fsync: fsync || rule.Fsync,
VolumeGrowthCount: rule.VolumeGrowthCount,
}
}
func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, dataCenter, rack string) *operation.StorageOption {
func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) *operation.StorageOption {
ttl, err := needle.ReadTTL(qTtl)
if err != nil {
glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
}
return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, dataCenter, rack)
return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack)
}

1
weed/server/filer_server_handlers_write_autochunk.go

@ -186,6 +186,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
Replication: so.Replication,
Collection: so.Collection,
TtlSec: so.TtlSeconds,
DiskType: so.DiskType,
Mime: contentType,
Md5: md5bytes,
FileSize: uint64(chunkOffset),

1
weed/server/filer_server_handlers_write_cipher.go

@ -68,6 +68,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
Replication: so.Replication,
Collection: so.Collection,
TtlSec: so.TtlSeconds,
DiskType: so.DiskType,
Mime: pu.MimeType,
Md5: util.Base64Md5ToBytes(pu.ContentMd5),
},

8
weed/server/master_grpc_server.go

@ -67,9 +67,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn = rack.GetOrCreateDataNode(heartbeat.Ip,
int(heartbeat.Port), heartbeat.PublicUrl,
int64(heartbeat.MaxVolumeCount))
dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, int64(heartbeat.MaxVolumeCount), int64(heartbeat.MaxSsdVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
@ -83,6 +81,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
delta := int64(heartbeat.MaxVolumeCount) - dn.GetMaxVolumeCount()
dn.UpAdjustMaxVolumeCountDelta(delta)
}
if heartbeat.MaxSsdVolumeCount != 0 && dn.GetMaxSsdVolumeCount() != int64(heartbeat.MaxSsdVolumeCount) {
delta := int64(heartbeat.MaxSsdVolumeCount) - dn.GetMaxSsdVolumeCount()
dn.UpAdjustMaxSsdVolumeCountDelta(delta)
}
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
message := &master_pb.VolumeLocation{

12
weed/server/master_grpc_server_volume.go

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
@ -60,11 +61,16 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if err != nil {
return nil, err
}
diskType, err := storage.ToDiskType(req.DiskType)
if err != nil {
return nil, err
}
option := &topology.VolumeGrowOption{
Collection: req.Collection,
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
DiskType: diskType,
Prealloacte: ms.preallocateSize,
DataCenter: req.DataCenter,
Rack: req.Rack,
@ -73,7 +79,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.FreeSpace() <= 0 {
if ms.Topo.AvailableSpaceFor(option) <= 0 {
return nil, fmt.Errorf("No free volumes left!")
}
ms.vgLock.Lock()
@ -117,10 +123,10 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
return nil, err
}
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl)
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, storage.DiskType(req.DiskType))
stats := volumeLayout.Stats()
totalSize := ms.Topo.GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
totalSize := (ms.Topo.GetMaxVolumeCount() + ms.Topo.GetMaxSsdVolumeCount()) * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
resp := &master_pb.StatisticsResponse{
TotalSize: uint64(totalSize),

2
weed/server/master_server_handlers.go

@ -112,7 +112,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
}
if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.FreeSpace() <= 0 {
if ms.Topo.AvailableSpaceFor(option) <= 0 {
writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left!"})
return
}

12
weed/server/master_server_handlers_admin.go

@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage"
"math/rand"
"net/http"
"strconv"
@ -75,8 +76,8 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
}
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) {
err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount())
if ms.Topo.AvailableSpaceFor(option) < int64(count*option.ReplicaPlacement.GetCopyCount()) {
err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), count*option.ReplicaPlacement.GetCopyCount())
} else {
count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo)
}
@ -136,7 +137,7 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *
}
func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool {
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
return vl.GetActiveVolumeCount(option) > 0
}
@ -157,6 +158,10 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
diskType, err := storage.ToDiskType(r.FormValue("disk"))
if err != nil {
return nil, err
}
preallocate := ms.preallocateSize
if r.FormValue("preallocate") != "" {
@ -169,6 +174,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
Collection: r.FormValue("collection"),
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
DiskType: diskType,
Prealloacte: preallocate,
DataCenter: r.FormValue("dataCenter"),
Rack: r.FormValue("rack"),

2
weed/server/volume_grpc_admin.go

@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage"
"path/filepath"
"github.com/chrislusf/seaweedfs/weed/glog"
@ -41,6 +42,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p
req.Ttl,
req.Preallocate,
req.MemoryMapMaxSizeMb,
storage.DiskType(req.DiskType),
)
if err != nil {

11
weed/server/volume_grpc_copy.go

@ -36,11 +36,6 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId)
}
location := vs.store.FindFreeLocation()
if location == nil {
return nil, fmt.Errorf("no space left")
}
// the master will not start compaction for read-only volumes, so it is safe to just copy files directly
// copy .dat and .idx files
// read .idx .dat file size and timestamp
@ -59,6 +54,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
return fmt.Errorf("read volume file status failed, %v", err)
}
location := vs.store.FindFreeLocation(storage.DiskType(volFileInfoResp.DiskType))
if location == nil {
return fmt.Errorf("no space left")
}
dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId))
@ -206,6 +206,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se
resp.FileCount = v.FileCount()
resp.CompactionRevision = uint32(v.CompactionRevision)
resp.Collection = v.Collection
resp.DiskType = string(v.DiskType())
return resp, nil
}

2
weed/server/volume_grpc_erasure_coding.go

@ -105,7 +105,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
location := vs.store.FindFreeLocation()
location := vs.store.FindFreeLocation(storage.HardDriveType)
if location == nil {
return nil, fmt.Errorf("no space left")
}

4
weed/server/volume_server.go

@ -37,7 +37,7 @@ type VolumeServer struct {
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string,
folders []string, maxCounts []int, minFreeSpacePercents []float32,
folders []string, maxCounts []int, minFreeSpacePercents []float32, diskTypes []storage.DiskType,
idxFolder string,
needleMapKind storage.NeedleMapKind,
masterNodes []string, pulseSeconds int,
@ -76,7 +76,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
vs.checkWithMaster()
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind)
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind, diskTypes)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux)

8
weed/server/volume_server_handlers_admin.go

@ -16,7 +16,9 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
var ds []*volume_server_pb.DiskStatus
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {
ds = append(ds, stats.NewDiskStatus(dir))
newDiskStatus := stats.NewDiskStatus(dir)
newDiskStatus.DiskType = loc.GetDiskType()
ds = append(ds, newDiskStatus)
}
}
m["DiskStatuses"] = ds
@ -31,7 +33,9 @@ func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request)
var ds []*volume_server_pb.DiskStatus
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {
ds = append(ds, stats.NewDiskStatus(dir))
newDiskStatus := stats.NewDiskStatus(dir)
newDiskStatus.DiskType = loc.GetDiskType()
ds = append(ds, newDiskStatus)
}
}
m["DiskStatuses"] = ds

4
weed/server/volume_server_handlers_ui.go

@ -19,7 +19,9 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
var ds []*volume_server_pb.DiskStatus
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {
ds = append(ds, stats.NewDiskStatus(dir))
newDiskStatus := stats.NewDiskStatus(dir)
newDiskStatus.DiskType = loc.GetDiskType()
ds = append(ds, newDiskStatus)
}
}
volumeInfos := vs.store.VolumeInfos()

2
weed/server/volume_server_ui/templates.go

@ -69,6 +69,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<thead>
<tr>
<th>Path</th>
<th>Type</th>
<th>Total</th>
<th>Free</th>
<th>Usage</th>
@ -78,6 +79,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
{{ range .DiskStatuses }}
<tr>
<td>{{ .Dir }}</td>
<td>{{ .DiskType }}</td>
<td>{{ bytesToHumanReadable .All }}</td>
<td>{{ bytesToHumanReadable .Free }}</td>
<td>{{ percentFrom .All .Used}}%</td>

2
weed/server/webdav_server.go

@ -33,6 +33,7 @@ type WebDavOption struct {
BucketsPath string
GrpcDialOption grpc.DialOption
Collection string
DiskType string
Uid uint32
Gid uint32
Cipher bool
@ -382,6 +383,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
Count: 1,
Replication: "",
Collection: f.fs.option.Collection,
DiskType: f.fs.option.DiskType,
Path: name,
}

2
weed/shell/command_fs_configure.go

@ -52,6 +52,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
collection := fsConfigureCommand.String("collection", "", "assign writes to this collection")
replication := fsConfigureCommand.String("replication", "", "assign writes with this replication")
ttl := fsConfigureCommand.String("ttl", "", "assign writes with this ttl")
diskType := fsConfigureCommand.String("disk", "", "[hdd|ssd] choose between hard drive or solid state drive")
fsync := fsConfigureCommand.Bool("fsync", false, "fsync for the writes")
volumeGrowthCount := fsConfigureCommand.Int("volumeGrowthCount", 0, "the number of physical volumes to add if no writable volumes")
isDelete := fsConfigureCommand.Bool("delete", false, "delete the configuration by locationPrefix")
@ -81,6 +82,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
Replication: *replication,
Ttl: *ttl,
Fsync: *fsync,
DiskType: *diskType,
VolumeGrowthCount: uint32(*volumeGrowthCount),
}

89
weed/shell/command_volume_balance.go

@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"io"
"os"
@ -111,7 +112,8 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
// balance writable volumes
// balance writable hdd volumes
// fmt.Fprintf(os.Stdout, "\nbalance collection %s writable hdd volumes\n", collection)
for _, n := range nodes {
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
if collection != "ALL_COLLECTIONS" {
@ -119,14 +121,15 @@ func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*V
return false
}
}
return !v.ReadOnly && v.Size < volumeSizeLimit
return v.DiskType == string(storage.HardDriveType) && (!v.ReadOnly && v.Size < volumeSizeLimit)
})
}
if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil {
if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount, sortWritableVolumes, applyBalancing); err != nil {
return err
}
// balance readable volumes
// balance readable hdd volumes
// fmt.Fprintf(os.Stdout, "\nbalance collection %s readable hdd volumes\n", collection)
for _, n := range nodes {
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
if collection != "ALL_COLLECTIONS" {
@ -134,10 +137,42 @@ func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*V
return false
}
}
return v.ReadOnly || v.Size >= volumeSizeLimit
return v.DiskType == string(storage.HardDriveType) && (v.ReadOnly || v.Size >= volumeSizeLimit)
})
}
if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortReadOnlyVolumes, applyBalancing); err != nil {
if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount, sortReadOnlyVolumes, applyBalancing); err != nil {
return err
}
// balance writable ssd volumes
// fmt.Fprintf(os.Stdout, "\nbalance collection %s writable ssd volumes\n", collection)
for _, n := range nodes {
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
if collection != "ALL_COLLECTIONS" {
if v.Collection != collection {
return false
}
}
return v.DiskType == string(storage.SsdType) && (!v.ReadOnly && v.Size < volumeSizeLimit)
})
}
if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxSsdVolumeCount, sortWritableVolumes, applyBalancing); err != nil {
return err
}
// balance readable ssd volumes
// fmt.Fprintf(os.Stdout, "\nbalance collection %s readable ssd volumes\n", collection)
for _, n := range nodes {
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
if collection != "ALL_COLLECTIONS" {
if v.Collection != collection {
return false
}
}
return v.DiskType == string(storage.SsdType) && (v.ReadOnly || v.Size >= volumeSizeLimit)
})
}
if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxSsdVolumeCount, sortReadOnlyVolumes, applyBalancing); err != nil {
return err
}
@ -169,12 +204,21 @@ type Node struct {
rack string
}
func (n *Node) localVolumeRatio() float64 {
return divide(len(n.selectedVolumes), int(n.info.MaxVolumeCount))
type CapacityFunc func(*master_pb.DataNodeInfo) int
func capacityByMaxSsdVolumeCount(info *master_pb.DataNodeInfo) int {
return int(info.MaxSsdVolumeCount)
}
func capacityByMaxVolumeCount(info *master_pb.DataNodeInfo) int {
return int(info.MaxVolumeCount)
}
func (n *Node) localVolumeNextRatio() float64 {
return divide(len(n.selectedVolumes)+1, int(n.info.MaxVolumeCount))
func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 {
return divide(len(n.selectedVolumes), capacityFunc(n.info))
}
func (n *Node) localVolumeNextRatio(capacityFunc CapacityFunc) float64 {
return divide(len(n.selectedVolumes)+1, capacityFunc(n.info))
}
func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) {
@ -198,33 +242,40 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) {
})
}
func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) {
func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, capacityFunc CapacityFunc, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) {
selectedVolumeCount, volumeMaxCount := 0, 0
var nodesWithCapacity []*Node
for _, dn := range nodes {
selectedVolumeCount += len(dn.selectedVolumes)
volumeMaxCount += int(dn.info.MaxVolumeCount)
capacity := capacityFunc(dn.info)
if capacity > 0 {
nodesWithCapacity = append(nodesWithCapacity, dn)
}
volumeMaxCount += capacity
}
idealVolumeRatio := divide(selectedVolumeCount, volumeMaxCount)
hasMoved := true
// fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio)
for hasMoved {
hasMoved = false
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].localVolumeRatio() < nodes[j].localVolumeRatio()
sort.Slice(nodesWithCapacity, func(i, j int) bool {
return nodesWithCapacity[i].localVolumeRatio(capacityFunc) < nodesWithCapacity[j].localVolumeRatio(capacityFunc)
})
fullNode := nodes[len(nodes)-1]
fullNode := nodesWithCapacity[len(nodesWithCapacity)-1]
var candidateVolumes []*master_pb.VolumeInformationMessage
for _, v := range fullNode.selectedVolumes {
candidateVolumes = append(candidateVolumes, v)
}
sortCandidatesFn(candidateVolumes)
for i := 0; i < len(nodes)-1; i++ {
emptyNode := nodes[i]
if !(fullNode.localVolumeRatio() > idealVolumeRatio && emptyNode.localVolumeNextRatio() <= idealVolumeRatio) {
for i := 0; i < len(nodesWithCapacity)-1; i++ {
emptyNode := nodesWithCapacity[i]
if !(fullNode.localVolumeRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeNextRatio(capacityFunc) <= idealVolumeRatio) {
// no more volume servers with empty slots
break
}
@ -279,7 +330,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
if v.Collection == "" {
collectionPrefix = ""
}
fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
if applyChange {
return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second)
}

2
weed/shell/command_volume_server_evacuate.go

@ -175,7 +175,7 @@ func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEc
func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
sort.Slice(otherNodes, func(i, j int) bool {
return otherNodes[i].localVolumeRatio() < otherNodes[j].localVolumeRatio()
return otherNodes[i].localVolumeRatio(capacityByMaxVolumeCount)+otherNodes[i].localVolumeRatio(capacityByMaxSsdVolumeCount) < otherNodes[j].localVolumeRatio(capacityByMaxVolumeCount)+otherNodes[j].localVolumeRatio(capacityByMaxSsdVolumeCount)
})
for i := 0; i < len(otherNodes); i++ {

11
weed/storage/disk_location.go

@ -19,6 +19,7 @@ import (
type DiskLocation struct {
Directory string
IdxDirectory string
DiskType DiskType
MaxVolumeCount int
OriginalMaxVolumeCount int
MinFreeSpacePercent float32
@ -32,7 +33,7 @@ type DiskLocation struct {
isDiskSpaceLow bool
}
func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string) *DiskLocation {
func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string, diskType DiskType) *DiskLocation {
dir = util.ResolvePath(dir)
if idxDir == "" {
idxDir = dir
@ -42,6 +43,7 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32
location := &DiskLocation{
Directory: dir,
IdxDirectory: idxDir,
DiskType: diskType,
MaxVolumeCount: maxVolumeCount,
OriginalMaxVolumeCount: maxVolumeCount,
MinFreeSpacePercent: minFreeSpacePercent,
@ -371,3 +373,10 @@ func (l *DiskLocation) CheckDiskSpace() {
}
}
func (l *DiskLocation) GetDiskType() string {
if l.DiskType == SsdType {
return "SSD"
}
return "HDD"
}

48
weed/storage/store.go

@ -52,11 +52,11 @@ func (s *Store) String() (str string) {
return
}
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapKind) (s *Store) {
func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store) {
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapKind: needleMapKind}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder)
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder, diskTypes[i])
location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location)
stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i]))
@ -69,7 +69,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di
return
}
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error {
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
@ -78,7 +78,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap
if e != nil {
return e
}
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb)
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType)
return e
}
func (s *Store) DeleteCollection(collection string) (e error) {
@ -100,9 +100,12 @@ func (s *Store) findVolume(vid needle.VolumeId) *Volume {
}
return nil
}
func (s *Store) FindFreeLocation() (ret *DiskLocation) {
func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) {
max := 0
for _, location := range s.Locations {
if diskType != location.DiskType {
continue
}
currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
currentFreeCount *= erasure_coding.DataShardsCount
currentFreeCount -= location.EcVolumesLen()
@ -114,11 +117,11 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
}
return ret
}
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.FindFreeLocation(); location != nil {
if location := s.FindFreeLocation(diskType); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
@ -130,6 +133,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
ReplicaPlacement: uint32(replicaPlacement.Byte()),
Version: uint32(volume.Version()),
Ttl: ttl.ToUint32(),
DiskType: string(diskType),
}
return nil
} else {
@ -203,12 +207,18 @@ func (s *Store) GetRack() string {
func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var volumeMessages []*master_pb.VolumeInformationMessage
maxVolumeCount := 0
maxSsdVolumeCount := 0
var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64)
collectionVolumeReadOnlyCount := make(map[string]map[string]uint8)
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
switch location.DiskType {
case SsdType:
maxSsdVolumeCount = maxSsdVolumeCount + location.MaxVolumeCount
case HardDriveType:
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
}
location.volumesLock.RLock()
for _, v := range location.volumes {
curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage()
@ -280,15 +290,16 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
}
return &master_pb.Heartbeat{
Ip: s.Ip,
Port: uint32(s.Port),
PublicUrl: s.PublicUrl,
MaxVolumeCount: uint32(maxVolumeCount),
MaxFileKey: NeedleIdToUint64(maxFileKey),
DataCenter: s.dataCenter,
Rack: s.rack,
Volumes: volumeMessages,
HasNoVolumes: len(volumeMessages) == 0,
Ip: s.Ip,
Port: uint32(s.Port),
PublicUrl: s.PublicUrl,
MaxVolumeCount: uint32(maxVolumeCount),
MaxSsdVolumeCount: uint32(maxSsdVolumeCount),
MaxFileKey: NeedleIdToUint64(maxFileKey),
DataCenter: s.dataCenter,
Rack: s.rack,
Volumes: volumeMessages,
HasNoVolumes: len(volumeMessages) == 0,
}
}
@ -371,6 +382,7 @@ func (s *Store) MountVolume(i needle.VolumeId) error {
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
DiskType: string(v.location.DiskType),
}
return nil
}
@ -390,6 +402,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error {
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
DiskType: string(v.location.DiskType),
}
for _, location := range s.Locations {
@ -414,6 +427,7 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error {
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
DiskType: string(v.location.DiskType),
}
for _, location := range s.Locations {
if err := location.DeleteVolume(i); err == nil {

5
weed/storage/volume.go

@ -171,6 +171,10 @@ func (v *Volume) IndexFileSize() uint64 {
return v.nm.IndexFileSize()
}
func (v *Volume) DiskType() DiskType {
return v.location.DiskType
}
// Close cleanly shuts down this volume
func (v *Volume) Close() {
v.dataFileAccessLock.Lock()
@ -262,6 +266,7 @@ func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.Volume
Ttl: v.Ttl.ToUint32(),
CompactRevision: uint32(v.SuperBlock.CompactionRevision),
ModifiedAtSecond: modTime.Unix(),
DiskType: string(v.location.DiskType),
}
volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey()

4
weed/storage/volume_info.go

@ -14,6 +14,7 @@ type VolumeInfo struct {
Size uint64
ReplicaPlacement *super_block.ReplicaPlacement
Ttl *needle.TTL
DiskType string
Collection string
Version needle.Version
FileCount int
@ -40,6 +41,7 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er
ModifiedAtSecond: m.ModifiedAtSecond,
RemoteStorageName: m.RemoteStorageName,
RemoteStorageKey: m.RemoteStorageKey,
DiskType: m.DiskType,
}
rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {
@ -62,6 +64,7 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu
}
vi.ReplicaPlacement = rp
vi.Ttl = needle.LoadTTLFromUint32(m.Ttl)
vi.DiskType = m.DiskType
return vi, nil
}
@ -90,6 +93,7 @@ func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMe
ModifiedAtSecond: vi.ModifiedAtSecond,
RemoteStorageName: vi.RemoteStorageName,
RemoteStorageKey: vi.RemoteStorageKey,
DiskType: vi.DiskType,
}
}

23
weed/storage/volume_type.go

@ -0,0 +1,23 @@
package storage
import "fmt"
type DiskType string
const (
HardDriveType DiskType = ""
SsdType = "ssd"
)
func ToDiskType(vt string) (diskType DiskType, err error) {
diskType = HardDriveType
switch vt {
case "", "hdd":
diskType = HardDriveType
case "ssd":
diskType = SsdType
default:
err = fmt.Errorf("parse DiskType %s: expecting hdd or ssd\n", vt)
}
return
}

1
weed/topology/allocate_volume.go

@ -24,6 +24,7 @@ func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.Vol
Ttl: option.Ttl.String(),
Preallocate: option.Prealloacte,
MemoryMapMaxSizeMb: option.MemoryMapMaxSizeMb,
DiskType: string(option.DiskType),
})
return deleteErr
})

19
weed/topology/collection.go

@ -2,6 +2,7 @@ package topology
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
@ -29,17 +30,31 @@ func (c *Collection) String() string {
return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout)
}
func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType storage.DiskType) *VolumeLayout {
keyString := rp.String()
if ttl != nil {
keyString += ttl.String()
}
if diskType != storage.HardDriveType {
keyString += string(diskType)
}
vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
return NewVolumeLayout(rp, ttl, c.volumeSizeLimit, c.replicationAsMin)
return NewVolumeLayout(rp, ttl, diskType, c.volumeSizeLimit, c.replicationAsMin)
})
return vl.(*VolumeLayout)
}
func (c *Collection) DeleteVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType storage.DiskType) {
keyString := rp.String()
if ttl != nil {
keyString += ttl.String()
}
if diskType != storage.HardDriveType {
keyString += string(diskType)
}
c.storageType2VolumeLayout.Delete(keyString)
}
func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode {
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {

3
weed/topology/data_center.go

@ -31,6 +31,7 @@ func (dc *DataCenter) ToMap() interface{} {
m := make(map[string]interface{})
m["Id"] = dc.Id()
m["Max"] = dc.GetMaxVolumeCount()
m["MaxSsd"] = dc.GetMaxSsdVolumeCount()
m["Free"] = dc.FreeSpace()
var racks []interface{}
for _, c := range dc.Children() {
@ -46,6 +47,8 @@ func (dc *DataCenter) ToDataCenterInfo() *master_pb.DataCenterInfo {
Id: string(dc.Id()),
VolumeCount: uint64(dc.GetVolumeCount()),
MaxVolumeCount: uint64(dc.GetMaxVolumeCount()),
MaxSsdVolumeCount: uint64(dc.GetMaxSsdVolumeCount()),
SsdVolumeCount: uint64(dc.GetSsdVolumeCount()),
FreeVolumeCount: uint64(dc.FreeSpace()),
ActiveVolumeCount: uint64(dc.GetActiveVolumeCount()),
RemoteVolumeCount: uint64(dc.GetRemoteVolumeCount()),

24
weed/topology/data_node.go

@ -50,7 +50,11 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO
func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
if oldV, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1)
if v.DiskType == storage.SsdType {
dn.UpAdjustSsdVolumeCountDelta(1)
} else {
dn.UpAdjustVolumeCountDelta(1)
}
if v.IsRemote() {
dn.UpAdjustRemoteVolumeCountDelta(1)
}
@ -89,7 +93,11 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
glog.V(0).Infoln("Deleting volume id:", vid)
delete(dn.volumes, vid)
deletedVolumes = append(deletedVolumes, v)
dn.UpAdjustVolumeCountDelta(-1)
if v.DiskType == storage.SsdType {
dn.UpAdjustSsdVolumeCountDelta(-1)
} else {
dn.UpAdjustVolumeCountDelta(-1)
}
if v.IsRemote() {
dn.UpAdjustRemoteVolumeCountDelta(-1)
}
@ -116,7 +124,11 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu
for _, v := range deletedVolumes {
delete(dn.volumes, v.Id)
dn.UpAdjustVolumeCountDelta(-1)
if v.DiskType == storage.SsdType {
dn.UpAdjustSsdVolumeCountDelta(-1)
} else {
dn.UpAdjustVolumeCountDelta(-1)
}
if v.IsRemote() {
dn.UpAdjustRemoteVolumeCountDelta(-1)
}
@ -181,10 +193,10 @@ func (dn *DataNode) Url() string {
func (dn *DataNode) ToMap() interface{} {
ret := make(map[string]interface{})
ret["Url"] = dn.Url()
ret["Volumes"] = dn.GetVolumeCount()
ret["Volumes"] = dn.GetVolumeCount() + dn.GetSsdVolumeCount()
ret["VolumeIds"] = dn.GetVolumeIds()
ret["EcShards"] = dn.GetEcShardCount()
ret["Max"] = dn.GetMaxVolumeCount()
ret["Max"] = dn.GetMaxVolumeCount() + dn.GetMaxSsdVolumeCount()
ret["Free"] = dn.FreeSpace()
ret["PublicUrl"] = dn.PublicUrl
return ret
@ -195,6 +207,8 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
Id: string(dn.Id()),
VolumeCount: uint64(dn.GetVolumeCount()),
MaxVolumeCount: uint64(dn.GetMaxVolumeCount()),
MaxSsdVolumeCount: uint64(dn.GetMaxSsdVolumeCount()),
SsdVolumeCount: uint64(dn.GetSsdVolumeCount()),
FreeVolumeCount: uint64(dn.FreeSpace()),
ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()),
RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()),

66
weed/topology/node.go

@ -2,6 +2,7 @@ package topology
import (
"errors"
"github.com/chrislusf/seaweedfs/weed/storage"
"math/rand"
"strings"
"sync"
@ -17,19 +18,24 @@ type Node interface {
Id() NodeId
String() string
FreeSpace() int64
ReserveOneVolume(r int64) (*DataNode, error)
AvailableSpaceFor(option *VolumeGrowOption) int64
ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64)
UpAdjustVolumeCountDelta(volumeCountDelta int64)
UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64)
UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64)
UpAdjustEcShardCountDelta(ecShardCountDelta int64)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
UpAdjustMaxVolumeId(vid needle.VolumeId)
GetVolumeCount() int64
GetSsdVolumeCount() int64
GetEcShardCount() int64
GetActiveVolumeCount() int64
GetRemoteVolumeCount() int64
GetMaxVolumeCount() int64
GetMaxSsdVolumeCount() int64
GetMaxVolumeId() needle.VolumeId
SetParent(Node)
LinkChildNode(node Node)
@ -47,9 +53,11 @@ type Node interface {
type NodeImpl struct {
volumeCount int64
remoteVolumeCount int64
ssdVolumeCount int64
activeVolumeCount int64
ecShardCount int64
maxVolumeCount int64
maxSsdVolumeCount int64
id NodeId
parent Node
sync.RWMutex // lock children
@ -62,7 +70,7 @@ type NodeImpl struct {
}
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
var totalWeights int64
var errs []string
n.RLock()
@ -70,12 +78,12 @@ func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(d
candidatesWeights := make([]int64, 0, len(n.children))
//pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
for _, node := range n.children {
if node.FreeSpace() <= 0 {
if node.AvailableSpaceFor(option) <= 0 {
continue
}
totalWeights += node.FreeSpace()
totalWeights += node.AvailableSpaceFor(option)
candidates = append(candidates, node)
candidatesWeights = append(candidatesWeights, node.FreeSpace())
candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option))
}
n.RUnlock()
if len(candidates) < numberOfNodes {
@ -142,8 +150,18 @@ func (n *NodeImpl) String() string {
func (n *NodeImpl) Id() NodeId {
return n.id
}
func (n *NodeImpl) FreeSpace() int64 {
func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount
if option.DiskType == storage.SsdType {
freeVolumeSlotCount = n.maxSsdVolumeCount - n.ssdVolumeCount
}
if n.ecShardCount > 0 {
freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
}
return freeVolumeSlotCount
}
func (n *NodeImpl) FreeSpace() int64 {
freeVolumeSlotCount := n.maxVolumeCount + n.maxSsdVolumeCount + n.remoteVolumeCount - n.volumeCount - n.ssdVolumeCount
if n.ecShardCount > 0 {
freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
}
@ -166,11 +184,11 @@ func (n *NodeImpl) Parent() Node {
func (n *NodeImpl) GetValue() interface{} {
return n.value
}
func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) {
func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
n.RLock()
defer n.RUnlock()
for _, node := range n.children {
freeSpace := node.FreeSpace()
freeSpace := node.AvailableSpaceFor(option)
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 {
continue
@ -178,11 +196,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error)
if r >= freeSpace {
r -= freeSpace
} else {
if node.IsDataNode() && node.FreeSpace() > 0 {
if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
return node.(*DataNode), nil
}
assignedNode, err = node.ReserveOneVolume(r)
assignedNode, err = node.ReserveOneVolume(r, option)
if err == nil {
return
}
@ -200,6 +218,15 @@ func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //ca
n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta int64) { //can be negative
if maxSsdVolumeCountDelta == 0 {
return
}
atomic.AddInt64(&n.maxSsdVolumeCount, maxSsdVolumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustMaxSsdVolumeCountDelta(maxSsdVolumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
if volumeCountDelta == 0 {
return
@ -218,6 +245,15 @@ func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64)
n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta int64) { //can be negative
if ssdVolumeCountDelta == 0 {
return
}
atomic.AddInt64(&n.ssdVolumeCount, ssdVolumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustSsdVolumeCountDelta(ssdVolumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
if ecShardCountDelta == 0 {
return
@ -250,6 +286,9 @@ func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
func (n *NodeImpl) GetVolumeCount() int64 {
return n.volumeCount
}
func (n *NodeImpl) GetSsdVolumeCount() int64 {
return n.ssdVolumeCount
}
func (n *NodeImpl) GetEcShardCount() int64 {
return n.ecShardCount
}
@ -262,6 +301,9 @@ func (n *NodeImpl) GetActiveVolumeCount() int64 {
func (n *NodeImpl) GetMaxVolumeCount() int64 {
return n.maxVolumeCount
}
func (n *NodeImpl) GetMaxSsdVolumeCount() int64 {
return n.maxSsdVolumeCount
}
func (n *NodeImpl) LinkChildNode(node Node) {
n.Lock()
@ -269,8 +311,10 @@ func (n *NodeImpl) LinkChildNode(node Node) {
if n.children[node.Id()] == nil {
n.children[node.Id()] = node
n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
n.UpAdjustMaxSsdVolumeCountDelta(node.GetMaxSsdVolumeCount())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
n.UpAdjustSsdVolumeCountDelta(node.GetSsdVolumeCount())
n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount())
n.UpAdjustEcShardCountDelta(node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
@ -287,10 +331,12 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
node.SetParent(nil)
delete(n.children, node.Id())
n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
n.UpAdjustSsdVolumeCountDelta(-node.GetSsdVolumeCount())
n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount())
n.UpAdjustEcShardCountDelta(-node.GetEcShardCount())
n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
n.UpAdjustMaxSsdVolumeCountDelta(-node.GetMaxSsdVolumeCount())
glog.V(0).Infoln(n, "removes", node.Id())
}
}

6
weed/topology/rack.go

@ -28,7 +28,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
}
return nil
}
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64) *DataNode {
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64, maxSsdVolumeCount int64) *DataNode {
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
@ -41,6 +41,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
dn.Port = port
dn.PublicUrl = publicUrl
dn.maxVolumeCount = maxVolumeCount
dn.maxSsdVolumeCount = maxSsdVolumeCount
dn.LastSeen = time.Now().Unix()
r.LinkChildNode(dn)
return dn
@ -50,6 +51,7 @@ func (r *Rack) ToMap() interface{} {
m := make(map[string]interface{})
m["Id"] = r.Id()
m["Max"] = r.GetMaxVolumeCount()
m["MaxSsd"] = r.GetMaxSsdVolumeCount()
m["Free"] = r.FreeSpace()
var dns []interface{}
for _, c := range r.Children() {
@ -65,6 +67,8 @@ func (r *Rack) ToRackInfo() *master_pb.RackInfo {
Id: string(r.Id()),
VolumeCount: uint64(r.GetVolumeCount()),
MaxVolumeCount: uint64(r.GetMaxVolumeCount()),
MaxSsdVolumeCount: uint64(r.GetMaxSsdVolumeCount()),
SsdVolumeCount: uint64(r.GetSsdVolumeCount()),
FreeVolumeCount: uint64(r.FreeSpace()),
ActiveVolumeCount: uint64(r.GetActiveVolumeCount()),
RemoteVolumeCount: uint64(r.GetRemoteVolumeCount()),

32
weed/topology/topology.go

@ -121,12 +121,12 @@ func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
}
func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
return vl.GetActiveVolumeCount(option) > 0
}
func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType).PickForWrite(count, option)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
}
@ -137,10 +137,10 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType storage.DiskType) *VolumeLayout {
return t.collectionMap.Get(collectionName, func() interface{} {
return NewCollection(collectionName, t.volumeSizeLimit, t.replicationAsMin)
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl, diskType)
}
func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) (ret []string) {
@ -176,17 +176,30 @@ func (t *Topology) DeleteCollection(collectionName string) {
t.collectionMap.Delete(collectionName)
}
func (t *Topology) DeleteLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType storage.DiskType) {
collection, found := t.FindCollection(collectionName)
if !found {
return
}
collection.DeleteVolumeLayout(rp, ttl, diskType)
if len(collection.storageType2VolumeLayout.Items()) == 0 {
t.DeleteCollection(collectionName)
}
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
diskType, _ := storage.ToDiskType(v.DiskType)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
vl.RegisterVolume(&v, dn)
vl.EnsureCorrectWritables(&v)
}
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
glog.Infof("removing volume info:%+v", v)
volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
glog.Infof("removing volume info: %+v", v)
diskType, _ := storage.ToDiskType(v.DiskType)
volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
volumeLayout.UnRegisterVolume(&v, dn)
if volumeLayout.isEmpty() {
t.DeleteCollection(v.Collection)
t.DeleteLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
}
}
@ -222,7 +235,8 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati
t.UnRegisterVolumeLayout(v, dn)
}
for _, v := range changedVolumes {
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
diskType, _ := storage.ToDiskType(v.DiskType)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
vl.EnsureCorrectWritables(&v)
}
return

8
weed/topology/topology_event_handling.go

@ -37,7 +37,8 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
}()
}
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl)
diskType, _ := storage.ToDiskType(volumeInfo.DiskType)
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, diskType)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
@ -55,13 +56,16 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.GetVolumes() {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn.Id())
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
diskType, _ := storage.ToDiskType(v.DiskType)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
vl.SetVolumeUnavailable(dn, v.Id)
}
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
dn.UpAdjustSsdVolumeCountDelta(-dn.GetSsdVolumeCount())
dn.UpAdjustRemoteVolumeCountDelta(-dn.GetRemoteVolumeCount())
dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
dn.UpAdjustMaxSsdVolumeCountDelta(-dn.GetMaxSsdVolumeCount())
if dn.Parent() != nil {
dn.Parent().UnlinkChildNode(dn.Id())
}

6
weed/topology/topology_map.go

@ -4,7 +4,7 @@ import "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
func (t *Topology) ToMap() interface{} {
m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount()
m["Max"] = t.GetMaxVolumeCount() + t.GetMaxSsdVolumeCount()
m["Free"] = t.FreeSpace()
var dcs []interface{}
for _, c := range t.Children() {
@ -29,7 +29,7 @@ func (t *Topology) ToMap() interface{} {
func (t *Topology) ToVolumeMap() interface{} {
m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount()
m["Max"] = t.GetMaxVolumeCount() + t.GetMaxSsdVolumeCount()
m["Free"] = t.FreeSpace()
dcs := make(map[NodeId]interface{})
for _, c := range t.Children() {
@ -83,9 +83,11 @@ func (t *Topology) ToTopologyInfo() *master_pb.TopologyInfo {
Id: string(t.Id()),
VolumeCount: uint64(t.GetVolumeCount()),
MaxVolumeCount: uint64(t.GetMaxVolumeCount()),
MaxSsdVolumeCount: uint64(t.GetMaxSsdVolumeCount()),
FreeVolumeCount: uint64(t.FreeSpace()),
ActiveVolumeCount: uint64(t.GetActiveVolumeCount()),
RemoteVolumeCount: uint64(t.GetRemoteVolumeCount()),
SsdVolumeCount: uint64(t.GetSsdVolumeCount()),
}
for _, c := range t.Children() {
dc := c.(*DataCenter)

29
weed/topology/topology_test.go

@ -27,7 +27,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25)
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25, 12)
{
volumeCount := 7
@ -48,10 +48,28 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
volumeMessages = append(volumeMessages, volumeMessage)
}
for k := 1; k <= volumeCount; k++ {
volumeMessage := &master_pb.VolumeInformationMessage{
Id: uint32(volumeCount + k),
Size: uint64(25432),
Collection: "",
FileCount: uint64(2343),
DeleteCount: uint64(345),
DeletedByteCount: 34524,
ReadOnly: false,
ReplicaPlacement: uint32(0),
Version: uint32(needle.CurrentVersion),
Ttl: 0,
DiskType: "ssd",
}
volumeMessages = append(volumeMessages, volumeMessage)
}
topo.SyncDataNodeRegistration(volumeMessages, dn)
assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount*2)
assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
assert(t, "ssdVolumeCount", int(topo.ssdVolumeCount), volumeCount)
}
{
@ -96,7 +114,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
nil,
dn)
rp, _ := super_block.NewReplicaPlacementFromString("000")
layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, storage.HardDriveType)
assert(t, "writables after repeated add", len(layout.writables), volumeCount)
assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
@ -115,7 +133,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
nil,
dn)
for vid, _ := range layout.vid2location {
for vid := range layout.vid2location {
println("after add volume id", vid)
}
for _, vid := range layout.writables {
@ -144,12 +162,13 @@ func TestAddRemoveVolume(t *testing.T) {
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25)
dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25, 12)
v := storage.VolumeInfo{
Id: needle.VolumeId(1),
Size: 100,
Collection: "xcollection",
DiskType: "ssd",
FileCount: 123,
DeleteCount: 23,
DeletedByteCount: 45,

31
weed/topology/volume_growth.go

@ -27,6 +27,7 @@ type VolumeGrowOption struct {
Collection string
ReplicaPlacement *super_block.ReplicaPlacement
Ttl *needle.TTL
DiskType storage.DiskType
Prealloacte int64
DataCenter string
Rack string
@ -113,21 +114,21 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
//find main datacenter and other data centers
rp := option.ReplicaPlacement
mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, func(node Node) error {
mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, option, func(node Node) error {
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
}
if len(node.Children()) < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
}
if node.FreeSpace() < int64(rp.DiffRackCount+rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
if node.AvailableSpaceFor(option) < int64(rp.DiffRackCount+rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.DiffRackCount+rp.SameRackCount+1)
}
possibleRacksCount := 0
for _, rack := range node.Children() {
possibleDataNodesCount := 0
for _, n := range rack.Children() {
if n.FreeSpace() >= 1 {
if n.AvailableSpaceFor(option) >= 1 {
possibleDataNodesCount++
}
}
@ -145,12 +146,12 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
}
//find main rack and other racks
mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).PickNodesByWeight(rp.DiffRackCount+1, func(node Node) error {
mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).PickNodesByWeight(rp.DiffRackCount+1, option, func(node Node) error {
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
}
if node.FreeSpace() < int64(rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
if node.AvailableSpaceFor(option) < int64(rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.SameRackCount+1)
}
if len(node.Children()) < rp.SameRackCount+1 {
// a bit faster way to test free racks
@ -158,7 +159,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
}
possibleDataNodesCount := 0
for _, n := range node.Children() {
if n.FreeSpace() >= 1 {
if n.AvailableSpaceFor(option) >= 1 {
possibleDataNodesCount++
}
}
@ -172,12 +173,12 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
}
//find main rack and other racks
mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, func(node Node) error {
mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, option, func(node Node) error {
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
}
if node.FreeSpace() < 1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1)
if node.AvailableSpaceFor(option) < 1 {
return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), 1)
}
return nil
})
@ -190,16 +191,16 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
servers = append(servers, server.(*DataNode))
}
for _, rack := range otherRacks {
r := rand.Int63n(rack.FreeSpace())
if server, e := rack.ReserveOneVolume(r); e == nil {
r := rand.Int63n(rack.AvailableSpaceFor(option))
if server, e := rack.ReserveOneVolume(r, option); e == nil {
servers = append(servers, server)
} else {
return servers, e
}
}
for _, datacenter := range otherDataCenters {
r := rand.Int63n(datacenter.FreeSpace())
if server, e := datacenter.ReserveOneVolume(r); e == nil {
r := rand.Int63n(datacenter.AvailableSpaceFor(option))
if server, e := datacenter.ReserveOneVolume(r, option); e == nil {
servers = append(servers, server)
} else {
return servers, e

4
weed/topology/volume_layout.go

@ -103,6 +103,7 @@ func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState {
type VolumeLayout struct {
rp *super_block.ReplicaPlacement
ttl *needle.TTL
diskType storage.DiskType
vid2location map[needle.VolumeId]*VolumeLocationList
writables []needle.VolumeId // transient array of writable volume id
readonlyVolumes *volumesBinaryState // readonly volumes
@ -118,10 +119,11 @@ type VolumeLayoutStats struct {
FileCount uint64
}
func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout {
func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType storage.DiskType, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout {
return &VolumeLayout{
rp: rp,
ttl: ttl,
diskType: diskType,
vid2location: make(map[needle.VolumeId]*VolumeLocationList),
writables: *new([]needle.VolumeId),
readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()),

Loading…
Cancel
Save