Browse Source

shell: support configure volume ttl

Signed-off-by: Lei Liu <liul.stone@gmail.com>
pull/1389/head
Lei Liu 5 years ago
parent
commit
f5c889151e
No known key found for this signature in database GPG Key ID: CFEC474454780D7B
  1. 1
      unmaintained/see_dat/see_dat_gzip.go
  2. 2
      unmaintained/see_log_entry/see_log_entry.go
  3. 24
      weed/command/master.go
  4. 2
      weed/filer2/meta_aggregator.go
  5. 2
      weed/filesys/meta_cache/meta_cache.go
  6. 2
      weed/filesys/unimplemented.go
  7. 3
      weed/operation/needle_parse_test.go
  8. 4
      weed/pb/filer_pb/filer.pb.go
  9. 4
      weed/pb/iam_pb/iam.pb.go
  10. 4
      weed/pb/master_pb/master.pb.go
  11. 4
      weed/pb/messaging_pb/messaging.pb.go
  12. 2
      weed/pb/volume_server.proto
  13. 1177
      weed/pb/volume_server_pb/volume_server.pb.go
  14. 12
      weed/server/master_server.go
  15. 2
      weed/server/volume_grpc_admin.go
  16. 38
      weed/shell/command_volume_configure.go
  17. 2
      weed/storage/erasure_coding/ec_decoder.go
  18. 4
      weed/storage/erasure_coding/ec_volume_test.go
  19. 3
      weed/storage/store.go
  20. 5
      weed/util/bounded_tree/bounded_tree_test.go
  21. 1
      weed/util/compression.go

1
unmaintained/see_dat/see_dat_gzip.go

@ -9,6 +9,7 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"

2
unmaintained/see_log_entry/see_log_entry.go

@ -15,7 +15,7 @@ import (
) )
var ( var (
logdataFile = flag.String("logdata", "", "log data file saved under "+ filer2.SystemLogDir)
logdataFile = flag.String("logdata", "", "log data file saved under "+filer2.SystemLogDir)
) )
func main() { func main() {

24
weed/command/master.go

@ -26,13 +26,13 @@ var (
) )
type MasterOptions struct { type MasterOptions struct {
port *int
ip *string
ipBind *string
metaFolder *string
peers *string
volumeSizeLimitMB *uint
volumePreallocate *bool
port *int
ip *string
ipBind *string
metaFolder *string
peers *string
volumeSizeLimitMB *uint
volumePreallocate *bool
// pulseSeconds *int // pulseSeconds *int
defaultReplication *string defaultReplication *string
garbageThreshold *float64 garbageThreshold *float64
@ -173,11 +173,11 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st
func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption { func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption {
return &weed_server.MasterOption{ return &weed_server.MasterOption{
Host: *m.ip,
Port: *m.port,
MetaFolder: *m.metaFolder,
VolumeSizeLimitMB: *m.volumeSizeLimitMB,
VolumePreallocate: *m.volumePreallocate,
Host: *m.ip,
Port: *m.port,
MetaFolder: *m.metaFolder,
VolumeSizeLimitMB: *m.volumeSizeLimitMB,
VolumePreallocate: *m.volumePreallocate,
// PulseSeconds: *m.pulseSeconds, // PulseSeconds: *m.pulseSeconds,
DefaultReplicaPlacement: *m.defaultReplication, DefaultReplicaPlacement: *m.defaultReplication,
GarbageThreshold: *m.garbageThreshold, GarbageThreshold: *m.garbageThreshold,

2
weed/filer2/meta_aggregator.go

@ -98,7 +98,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
for { for {
err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
ClientName: "filer:"+self,
ClientName: "filer:" + self,
PathPrefix: "/", PathPrefix: "/",
SinceNs: lastTsNs, SinceNs: lastTsNs,
}) })

2
weed/filesys/meta_cache/meta_cache.go

@ -60,7 +60,7 @@ func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPat
return err return err
} }
} }
}else{
} else {
// println("unknown old directory:", oldDir) // println("unknown old directory:", oldDir)
} }

2
weed/filesys/unimplemented.go

@ -10,11 +10,13 @@ import (
// https://github.com/bazil/fuse/issues/130 // https://github.com/bazil/fuse/issues/130
var _ = fs.NodeAccesser(&Dir{}) var _ = fs.NodeAccesser(&Dir{})
func (dir *Dir) Access(ctx context.Context, req *fuse.AccessRequest) error { func (dir *Dir) Access(ctx context.Context, req *fuse.AccessRequest) error {
return fuse.ENOSYS return fuse.ENOSYS
} }
var _ = fs.NodeAccesser(&File{}) var _ = fs.NodeAccesser(&File{})
func (file *File) Access(ctx context.Context, req *fuse.AccessRequest) error { func (file *File) Access(ctx context.Context, req *fuse.AccessRequest) error {
return fuse.ENOSYS return fuse.ENOSYS
} }

3
weed/operation/needle_parse_test.go

@ -101,7 +101,6 @@ func TestCreateNeedleFromRequest(t *testing.T) {
} }
var textContent = `Redistribution and use in source and binary forms, with or without var textContent = `Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are modification, are permitted provided that the following conditions are
met: met:
@ -127,4 +126,4 @@ DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
`
`

4
weed/pb/filer_pb/filer.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.24.0
// protoc v3.12.3
// protoc-gen-go v1.25.0-devel
// protoc v3.12.0
// source: filer.proto // source: filer.proto
package filer_pb package filer_pb

4
weed/pb/iam_pb/iam.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.24.0
// protoc v3.12.3
// protoc-gen-go v1.25.0-devel
// protoc v3.12.0
// source: iam.proto // source: iam.proto
package iam_pb package iam_pb

4
weed/pb/master_pb/master.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.24.0
// protoc v3.12.3
// protoc-gen-go v1.25.0-devel
// protoc v3.12.0
// source: master.proto // source: master.proto
package master_pb package master_pb

4
weed/pb/messaging_pb/messaging.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.24.0
// protoc v3.12.3
// protoc-gen-go v1.25.0-devel
// protoc v3.12.0
// source: messaging.proto // source: messaging.proto
package messaging_pb package messaging_pb

2
weed/pb/volume_server.proto

@ -201,6 +201,7 @@ message VolumeMarkReadonlyResponse {
message VolumeConfigureRequest { message VolumeConfigureRequest {
uint32 volume_id = 1; uint32 volume_id = 1;
string replication = 2; string replication = 2;
string ttl = 3;
} }
message VolumeConfigureResponse { message VolumeConfigureResponse {
string error = 1; string error = 1;
@ -375,6 +376,7 @@ message VolumeInfo {
repeated RemoteFile files = 1; repeated RemoteFile files = 1;
uint32 version = 2; uint32 version = 2;
string replication = 3; string replication = 3;
string ttl = 4;
} }
message VolumeTierMoveDatToRemoteRequest { message VolumeTierMoveDatToRemoteRequest {

1177
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File

12
weed/server/master_server.go

@ -32,11 +32,11 @@ const (
) )
type MasterOption struct { type MasterOption struct {
Host string
Port int
MetaFolder string
VolumeSizeLimitMB uint
VolumePreallocate bool
Host string
Port int
MetaFolder string
VolumeSizeLimitMB uint
VolumePreallocate bool
// PulseSeconds int // PulseSeconds int
DefaultReplicaPlacement string DefaultReplicaPlacement string
GarbageThreshold float64 GarbageThreshold float64
@ -66,7 +66,7 @@ type MasterServer struct {
MasterClient *wdclient.MasterClient MasterClient *wdclient.MasterClient
adminLocks *AdminLocks
adminLocks *AdminLocks
} }
func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer { func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {

2
weed/server/volume_grpc_admin.go

@ -118,7 +118,7 @@ func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_
} }
// modify the volume info file // modify the volume info file
if err := vs.store.ConfigureVolume(needle.VolumeId(req.VolumeId), req.Replication); err != nil {
if err := vs.store.ConfigureVolume(needle.VolumeId(req.VolumeId), req.Replication, req.Ttl); err != nil {
glog.Errorf("volume configure %v: %v", req, err) glog.Errorf("volume configure %v: %v", req, err)
resp.Error = fmt.Sprintf("volume configure %v: %v", req, err) resp.Error = fmt.Sprintf("volume configure %v: %v", req, err)
return resp, nil return resp, nil

38
weed/shell/command_volume_configure_replication.go → weed/shell/command_volume_configure.go

@ -15,34 +15,36 @@ import (
) )
func init() { func init() {
Commands = append(Commands, &commandVolumeConfigureReplication{})
Commands = append(Commands, &commandVolumeConfigure{})
} }
type commandVolumeConfigureReplication struct {
type commandVolumeConfigure struct {
} }
func (c *commandVolumeConfigureReplication) Name() string {
return "volume.configure.replication"
func (c *commandVolumeConfigure) Name() string {
return "volume.configure"
} }
func (c *commandVolumeConfigureReplication) Help() string {
return `change volume replication value
func (c *commandVolumeConfigure) Help() string {
return `change volume replication or ttl value
This command changes a volume replication value. It should be followed by volume.fix.replication.
This command changes a volume replication or ttl value. It should be followed by volume.fix.replication.
` `
} }
func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
func (c *commandVolumeConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if err = commandEnv.confirmIsLocked(); err != nil { if err = commandEnv.confirmIsLocked(); err != nil {
return return
} }
configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id")
replicationString := configureReplicationCommand.String("replication", "", "the intended replication value")
if err = configureReplicationCommand.Parse(args); err != nil {
configureVolumeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeIdInt := configureVolumeCommand.Int("volumeId", 0, "the volume id")
replicationString := configureVolumeCommand.String("replication", "", "the intended replication value")
ttlString := configureVolumeCommand.String("ttl", "", "the intended ttl value")
if err = configureVolumeCommand.Parse(args); err != nil {
return nil return nil
} }
@ -50,12 +52,21 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
return fmt.Errorf("empty replication value") return fmt.Errorf("empty replication value")
} }
if *ttlString == "" {
return fmt.Errorf("empty ttl value")
}
replicaPlacement, err := super_block.NewReplicaPlacementFromString(*replicationString) replicaPlacement, err := super_block.NewReplicaPlacementFromString(*replicationString)
if err != nil { if err != nil {
return fmt.Errorf("replication format: %v", err) return fmt.Errorf("replication format: %v", err)
} }
replicaPlacementInt32 := uint32(replicaPlacement.Byte()) replicaPlacementInt32 := uint32(replicaPlacement.Byte())
ttl, err := needle.ReadTTL(*ttlString)
if err != nil {
return fmt.Errorf("wrong ttl format: %v", err)
}
var resp *master_pb.VolumeListResponse var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
@ -72,7 +83,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
loc := newLocation(dc, string(rack), dn) loc := newLocation(dc, string(rack), dn)
for _, v := range dn.VolumeInfos { for _, v := range dn.VolumeInfos {
if v.Id == uint32(vid) && v.ReplicaPlacement != replicaPlacementInt32 {
if v.Id == uint32(vid) && (v.ReplicaPlacement != replicaPlacementInt32 || v.Ttl != ttl.ToUint32()) {
allLocations = append(allLocations, loc) allLocations = append(allLocations, loc)
continue continue
} }
@ -88,6 +99,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{ resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
VolumeId: uint32(vid), VolumeId: uint32(vid),
Replication: replicaPlacement.String(), Replication: replicaPlacement.String(),
Ttl: ttl.String(),
}) })
if configureErr != nil { if configureErr != nil {
return configureErr return configureErr

2
weed/storage/erasure_coding/ec_decoder.go

@ -119,7 +119,7 @@ func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId
} }
func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error { func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error {
if !util.FileExists(baseFileName+".ecj") {
if !util.FileExists(baseFileName + ".ecj") {
return nil return nil
} }
ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644) ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644)

4
weed/storage/erasure_coding/ec_volume_test.go

@ -24,8 +24,8 @@ func TestPositioning(t *testing.T) {
tests := []struct { tests := []struct {
needleId string needleId string
offset int64
size int
offset int64
size int
}{ }{
{needleId: "0f0edb92", offset: 31300679656, size: 1167}, {needleId: "0f0edb92", offset: 31300679656, size: 1167},
{needleId: "0ef7d7f8", offset: 11513014944, size: 66044}, {needleId: "0ef7d7f8", offset: 11513014944, size: 66044},

3
weed/storage/store.go

@ -372,7 +372,7 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error {
return fmt.Errorf("volume %d not found on disk", i) return fmt.Errorf("volume %d not found on disk", i)
} }
func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error {
func (s *Store) ConfigureVolume(i needle.VolumeId, replication, ttl string) error {
for _, location := range s.Locations { for _, location := range s.Locations {
fileInfo, found := location.LocateVolume(i) fileInfo, found := location.LocateVolume(i)
@ -387,6 +387,7 @@ func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error {
return fmt.Errorf("volume %d fail to load vif", i) return fmt.Errorf("volume %d fail to load vif", i)
} }
volumeInfo.Replication = replication volumeInfo.Replication = replication
volumeInfo.Ttl = ttl
err = pb.SaveVolumeInfo(vifFile, volumeInfo) err = pb.SaveVolumeInfo(vifFile, volumeInfo)
if err != nil { if err != nil {
return fmt.Errorf("volume %d fail to save vif", i) return fmt.Errorf("volume %d fail to save vif", i)

5
weed/util/bounded_tree/bounded_tree_test.go

@ -9,9 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
var ( var (
visitFn = func(path util.FullPath) (childDirectories []string, err error) { visitFn = func(path util.FullPath) (childDirectories []string, err error) {
fmt.Printf(" visit %v ...\n", path) fmt.Printf(" visit %v ...\n", path)
switch path { switch path {
@ -37,14 +35,11 @@ var (
return nil, nil return nil, nil
} }
printMap = func(m map[string]*Node) { printMap = func(m map[string]*Node) {
for k := range m { for k := range m {
println(" >", k) println(" >", k)
} }
} }
) )
func TestBoundedTree(t *testing.T) { func TestBoundedTree(t *testing.T) {

1
weed/util/compression.go

@ -54,6 +54,7 @@ func ungzipData(input []byte) ([]byte, error) {
} }
var decoder, _ = zstd.NewReader(nil) var decoder, _ = zstd.NewReader(nil)
func unzstdData(input []byte) ([]byte, error) { func unzstdData(input []byte) ([]byte, error) {
return decoder.DecodeAll(input, nil) return decoder.DecodeAll(input, nil)
} }

Loading…
Cancel
Save