Browse Source

Merge branch 'master' into add_remote_storage

pull/2241/head
Chris Lu 3 years ago
parent
commit
f5a69a0e44
  1. 4
      k8s/seaweedfs/Chart.yaml
  2. 2
      k8s/seaweedfs/values.yaml
  3. 2
      unmaintained/see_meta/see_meta.go
  4. 8
      weed/command/filer.go
  5. 9
      weed/command/server.go
  6. 54
      weed/shell/command_collection_list.go
  7. 15
      weed/shell/command_ec_decode.go
  8. 2
      weed/shell/command_volume_check_disk.go
  9. 2
      weed/shell/command_volume_fsck.go
  10. 26
      weed/shell/command_volume_tier_move.go
  11. 2
      weed/util/constants.go

4
k8s/seaweedfs/Chart.yaml

@ -1,5 +1,5 @@
apiVersion: v1 apiVersion: v1
description: SeaweedFS description: SeaweedFS
name: seaweedfs name: seaweedfs
appVersion: "2.59"
version: "2.59"
appVersion: "2.60"
version: "2.60"

2
k8s/seaweedfs/values.yaml

@ -4,7 +4,7 @@ global:
registry: "" registry: ""
repository: "" repository: ""
imageName: chrislusf/seaweedfs imageName: chrislusf/seaweedfs
# imageTag: "2.59" - started using {.Chart.appVersion}
# imageTag: "2.60" - started using {.Chart.appVersion}
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret imagePullSecrets: imagepullsecret
restartPolicy: Always restartPolicy: Always

2
unmaintained/see_meta/see_meta.go

@ -60,7 +60,7 @@ func walkMetaFile(dst *os.File) error {
fmt.Fprintf(os.Stdout, "file %s %v\n", util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name), fullEntry.Entry.Attributes.String()) fmt.Fprintf(os.Stdout, "file %s %v\n", util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name), fullEntry.Entry.Attributes.String())
for i, chunk := range fullEntry.Entry.Chunks { for i, chunk := range fullEntry.Entry.Chunks {
fmt.Fprintf(os.Stdout, " chunk %d %v\n", i+1, chunk.String())
fmt.Fprintf(os.Stdout, " chunk: %d %v %d,%x%08x\n", i+1, chunk, chunk.Fid.VolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie)
} }
} }

8
weed/command/filer.go

@ -3,6 +3,7 @@ package command
import ( import (
"fmt" "fmt"
"net/http" "net/http"
_ "net/http/pprof"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@ -50,6 +51,8 @@ type FilerOptions struct {
saveToFilerLimit *int saveToFilerLimit *int
defaultLevelDbDirectory *string defaultLevelDbDirectory *string
concurrentUploadLimitMB *int concurrentUploadLimitMB *int
debug *bool
debugPort *int
} }
func init() { func init() {
@ -73,6 +76,8 @@ func init() {
f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store") f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging")
// start s3 on filer // start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@ -122,6 +127,9 @@ var cmdFiler = &Command{
} }
func runFiler(cmd *Command, args []string) bool { func runFiler(cmd *Command, args []string) bool {
if *f.debug {
go http.ListenAndServe(fmt.Sprintf(":%d", *f.debugPort), nil)
}
util.LoadConfiguration("security", false) util.LoadConfiguration("security", false)

9
weed/command/server.go

@ -3,6 +3,7 @@ package command
import ( import (
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/chrislusf/seaweedfs/weed/util/grace"
"net/http"
"os" "os"
"strings" "strings"
"time" "time"
@ -16,6 +17,8 @@ import (
type ServerOptions struct { type ServerOptions struct {
cpuprofile *string cpuprofile *string
memprofile *string memprofile *string
debug *bool
debugPort *int
v VolumeServerOptions v VolumeServerOptions
} }
@ -78,6 +81,8 @@ var (
func init() { func init() {
serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file") serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file")
serverOptions.memprofile = cmdServer.Flag.String("memprofile", "", "memory profile output file") serverOptions.memprofile = cmdServer.Flag.String("memprofile", "", "memory profile output file")
serverOptions.debug = cmdServer.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:6060/debug/pprof/goroutine?debug=2")
serverOptions.debugPort = cmdServer.Flag.Int("debug.port", 6060, "http port for debugging")
masterOptions.port = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") masterOptions.port = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
masterOptions.metaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") masterOptions.metaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
@ -139,6 +144,10 @@ func init() {
func runServer(cmd *Command, args []string) bool { func runServer(cmd *Command, args []string) bool {
if *serverOptions.debug {
go http.ListenAndServe(fmt.Sprintf(":%d", *serverOptions.debugPort), nil)
}
util.LoadConfiguration("security", false) util.LoadConfiguration("security", false)
util.LoadConfiguration("master", false) util.LoadConfiguration("master", false)

54
weed/shell/command_collection_list.go

@ -22,6 +22,14 @@ func (c *commandCollectionList) Help() string {
return `list all collections` return `list all collections`
} }
type CollectionInfo struct {
FileCount uint64
DeleteCount uint64
DeletedByteCount uint64
Size uint64
VolumeCount int
}
func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
collections, err := ListCollectionNames(commandEnv, true, true) collections, err := ListCollectionNames(commandEnv, true, true)
@ -30,8 +38,21 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer
return err return err
} }
topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil {
return err
}
collectionInfos := make(map[string]*CollectionInfo)
writeCollectionInfo(writer, topologyInfo, collectionInfos)
for _, c := range collections { for _, c := range collections {
fmt.Fprintf(writer, "collection:\"%s\"\n", c)
cif, found := collectionInfos[c]
if !found {
continue
}
fmt.Fprintf(writer, "collection:\"%s\"\tvolumeCount:%d\tsize:%d\tfileCount:%d\tdeletedBytes:%d\tdeletion:%d\n", c, cif.VolumeCount, cif.Size, cif.FileCount, cif.DeletedByteCount, cif.DeleteCount)
} }
fmt.Fprintf(writer, "Total %d collections.\n", len(collections)) fmt.Fprintf(writer, "Total %d collections.\n", len(collections))
@ -56,3 +77,34 @@ func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEc
} }
return return
} }
func addToCollection(collectionInfos map[string]*CollectionInfo, vif *master_pb.VolumeInformationMessage) {
c := vif.Collection
cif, found := collectionInfos[c]
if !found {
cif = &CollectionInfo{}
collectionInfos[c] = cif
}
cif.Size += vif.Size
cif.DeleteCount += vif.DeleteCount
cif.FileCount += vif.FileCount
cif.DeletedByteCount += vif.DeletedByteCount
cif.VolumeCount++
}
func writeCollectionInfo(writer io.Writer, t *master_pb.TopologyInfo, collectionInfos map[string]*CollectionInfo) {
for _, dc := range t.DataCenterInfos {
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
for _, diskInfo := range dn.DiskInfos {
for _, vi := range diskInfo.VolumeInfos {
addToCollection(collectionInfos, vi)
}
//for _, ecShardInfo := range diskInfo.EcShardInfos {
//
//}
}
}
}
}
}

15
weed/shell/command_ec_decode.go

@ -223,21 +223,6 @@ func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyIn
} }
func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) {
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
for _, v := range diskInfo.EcShardInfos {
if v.Collection == selectedCollection && v.Id == uint32(vid) {
ecShardInfos = append(ecShardInfos, v)
}
}
}
})
return
}
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) { func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
vidMap := make(map[uint32]bool) vidMap := make(map[uint32]bool)

2
weed/shell/command_volume_check_disk.go

@ -90,7 +90,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
} }
if err := c.syncTwoReplicas(aDB, bDB, a, verbose, writer, b, err, applyChanges, nonRepairThreshold); err != nil { if err := c.syncTwoReplicas(aDB, bDB, a, verbose, writer, b, err, applyChanges, nonRepairThreshold); err != nil {
fmt.Fprintf(writer, "snyc volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
} }
replicas = replicas[1:] replicas = replicas[1:]
} }

2
weed/shell/command_volume_fsck.go

@ -103,7 +103,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
} }
// for each volume, check filer file ids // for each volume, check filer file ids
if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil { if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
return fmt.Errorf("findFilerChunksMissingInVolumeServers: %v", err)
} }
} else { } else {
// collect all filer file ids // collect all filer file ids

26
weed/shell/command_volume_tier_move.go

@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/chrislusf/seaweedfs/weed/wdclient"
"io" "io"
"path/filepath"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
@ -26,7 +27,7 @@ func (c *commandVolumeTierMove) Name() string {
func (c *commandVolumeTierMove) Help() string { func (c *commandVolumeTierMove) Help() string {
return `change a volume from one disk type to another return `change a volume from one disk type to another
volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h]
volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h]
Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped. Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped.
So "volume.fix.replication" and "volume.balance" should be followed. So "volume.fix.replication" and "volume.balance" should be followed.
@ -41,7 +42,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
} }
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
collection := tierCommand.String("collection", "", "the collection name")
collectionPattern := tierCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
source := tierCommand.String("fromDiskType", "", "the source disk type") source := tierCommand.String("fromDiskType", "", "the source disk type")
@ -65,7 +66,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
} }
// collect all volumes that should change // collect all volumes that should change
volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collection, *fullPercentage, *quietPeriod)
volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collectionPattern, *fullPercentage, *quietPeriod)
if err != nil { if err != nil {
return err return err
} }
@ -73,7 +74,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
_, allLocations := collectVolumeReplicaLocations(topologyInfo) _, allLocations := collectVolumeReplicaLocations(topologyInfo)
for _, vid := range volumeIds { for _, vid := range volumeIds {
if err = doVolumeTierMove(commandEnv, writer, *collection, vid, toDiskType, allLocations, *applyChange); err != nil {
if err = doVolumeTierMove(commandEnv, writer, vid, toDiskType, allLocations, *applyChange); err != nil {
fmt.Printf("tier move volume %d: %v\n", vid, err) fmt.Printf("tier move volume %d: %v\n", vid, err)
} }
} }
@ -90,7 +91,7 @@ func isOneOf(server string, locations []wdclient.Location) bool {
return false return false
} }
func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) {
func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) {
// find volume location // find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found { if !found {
@ -149,7 +150,7 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin
return nil return nil
} }
func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
quietSeconds := int64(quietPeriod / time.Second) quietSeconds := int64(quietPeriod / time.Second)
nowUnixSeconds := time.Now().Unix() nowUnixSeconds := time.Now().Unix()
@ -160,7 +161,18 @@ func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, diskInfo := range dn.DiskInfos { for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos { for _, v := range diskInfo.VolumeInfos {
if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == sourceTier {
// check collection name pattern
if collectionPattern != "" {
matched, err := filepath.Match(collectionPattern, v.Collection)
if err != nil {
return
}
if !matched {
continue
}
}
if v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == sourceTier {
if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 { if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
vidMap[v.Id] = true vidMap[v.Id] = true
} }

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
) )
var ( var (
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 59)
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 60)
COMMIT = "" COMMIT = ""
) )

Loading…
Cancel
Save