From b9aee2defbc2f5aafbc3ea049fbe2ab5f3320999 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 20 Sep 2014 12:38:59 -0700 Subject: [PATCH] add TTL support The volume TTL and file TTL are not necessarily the same. as long as file TTL is smaller than volume TTL, it'll be fine. volume TTL is used when assigning file id, e.g. http://.../dir/assign?ttl=3h file TTL is used when uploading --- go/filer/directory_in_map.go | 4 +- go/operation/assign_file_id.go | 5 +- go/operation/submit.go | 11 +- go/operation/system_message.pb.go | 12 +- go/proto/system_message.proto | 1 + go/storage/needle.go | 9 +- go/storage/needle_read_write.go | 21 +++ go/storage/replica_placement.go | 8 - go/storage/store.go | 67 +++++--- go/storage/volume.go | 74 ++++++++- go/storage/volume_info.go | 2 + go/storage/volume_super_block.go | 10 +- go/storage/volume_super_block_test.go | 7 +- go/storage/volume_ttl.go | 147 ++++++++++++++++++ go/storage/volume_vacuum.go | 14 +- go/topology/allocate_volume.go | 7 +- go/topology/collection.go | 23 +-- go/topology/data_node.go | 6 +- go/topology/topology.go | 21 ++- go/topology/topology_event_handling.go | 6 +- go/topology/topology_map.go | 2 +- go/topology/topology_vacuum.go | 2 +- go/topology/volume_growth.go | 12 +- go/topology/volume_layout.go | 13 +- go/weed/benchmark.go | 2 +- go/weed/compact.go | 2 +- go/weed/upload.go | 6 +- go/weed/weed_server/common.go | 4 +- go/weed/weed_server/filer_server_handlers.go | 12 +- .../master_server_handlers_admin.go | 9 +- .../volume_server_handlers_admin.go | 2 +- 31 files changed, 416 insertions(+), 105 deletions(-) create mode 100644 go/storage/volume_ttl.go diff --git a/go/filer/directory_in_map.go b/go/filer/directory_in_map.go index 46a626f77..35b4e53c1 100644 --- a/go/filer/directory_in_map.go +++ b/go/filer/directory_in_map.go @@ -187,10 +187,10 @@ func (dm *DirectoryManagerInMap) makeDirectory(dirPath string) (dir *DirectoryEn for i := 1; i < len(parts); i++ { sub, ok := dir.SubDirectories[parts[i]] if !ok { - var err error + var err error sub, err = dm.NewDirectoryEntryInMap(dir, parts[i]) if err != nil { - return nil, false + return nil, false } dir.SubDirectories[parts[i]] = sub created = true diff --git a/go/operation/assign_file_id.go b/go/operation/assign_file_id.go index 018e1d763..34d371f37 100644 --- a/go/operation/assign_file_id.go +++ b/go/operation/assign_file_id.go @@ -17,7 +17,7 @@ type AssignResult struct { Error string `json:"error,omitempty"` } -func Assign(server string, count int, replication string, collection string) (*AssignResult, error) { +func Assign(server string, count int, replication string, collection string, ttl string) (*AssignResult, error) { values := make(url.Values) values.Add("count", strconv.Itoa(count)) if replication != "" { @@ -26,6 +26,9 @@ func Assign(server string, count int, replication string, collection string) (*A if collection != "" { values.Add("collection", collection) } + if ttl != "" { + values.Add("ttl", ttl) + } jsonBlob, err := util.Post("http://"+server+"/dir/assign", values) glog.V(2).Info("assign result :", string(jsonBlob)) if err != nil { diff --git a/go/operation/submit.go b/go/operation/submit.go index 9191f7d9a..ec45cc320 100644 --- a/go/operation/submit.go +++ b/go/operation/submit.go @@ -20,6 +20,7 @@ type FilePart struct { ModTime int64 //in seconds Replication string Collection string + Ttl string Server string //this comes from assign result Fid string //this comes from assign result, but customizable } @@ -32,12 +33,12 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } -func SubmitFiles(master string, files []FilePart, replication string, collection string, maxMB int) ([]SubmitResult, error) { +func SubmitFiles(master string, files []FilePart, replication string, collection string, ttl string, maxMB int) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { results[index].FileName = file.FileName } - ret, err := Assign(master, len(files), replication, collection) + ret, err := Assign(master, len(files), replication, collection, ttl) if err != nil { for index, _ := range files { results[index].Error = err.Error() @@ -112,7 +113,7 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error) chunks := fi.FileSize/chunkSize + 1 fids := make([]string, 0) for i := int64(0); i < chunks; i++ { - id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection) + id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection, fi.Ttl) if e != nil { return 0, e } @@ -130,8 +131,8 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error) return } -func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string) (fid string, size uint32, e error) { - ret, err := Assign(master, 1, replication, collection) +func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string, ttl string) (fid string, size uint32, e error) { + ret, err := Assign(master, 1, replication, collection, ttl) if err != nil { return "", 0, err } diff --git a/go/operation/system_message.pb.go b/go/operation/system_message.pb.go index 45ae8a648..9f00dd74d 100644 --- a/go/operation/system_message.pb.go +++ b/go/operation/system_message.pb.go @@ -15,12 +15,10 @@ It has these top-level messages: package operation import proto "code.google.com/p/goprotobuf/proto" -import json "encoding/json" import math "math" -// Reference proto, json, and math imports to suppress error if they are not otherwise used. +// Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = &json.SyntaxError{} var _ = math.Inf type VolumeInformationMessage struct { @@ -33,6 +31,7 @@ type VolumeInformationMessage struct { ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"` ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"` Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"` + Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -105,6 +104,13 @@ func (m *VolumeInformationMessage) GetVersion() uint32 { return Default_VolumeInformationMessage_Version } +func (m *VolumeInformationMessage) GetTtl() uint32 { + if m != nil && m.Ttl != nil { + return *m.Ttl + } + return 0 +} + type JoinMessage struct { IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"` Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"` diff --git a/go/proto/system_message.proto b/go/proto/system_message.proto index 15574ad56..ecd4973f7 100644 --- a/go/proto/system_message.proto +++ b/go/proto/system_message.proto @@ -10,6 +10,7 @@ message VolumeInformationMessage { optional bool read_only = 7; required uint32 replica_placement = 8; optional uint32 version = 9 [default=2]; + optional uint32 ttl = 10; } message JoinMessage { diff --git a/go/storage/needle.go b/go/storage/needle.go index 77aa70169..3bf627141 100644 --- a/go/storage/needle.go +++ b/go/storage/needle.go @@ -38,12 +38,13 @@ type Needle struct { MimeSize uint8 //version2 Mime []byte `comment:"maximum 256 characters"` //version2 LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk + Ttl *TTL Checksum CRC `comment:"CRC32 to check integrity"` Padding []byte `comment:"Aligned to 8 bytes"` } -func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, e error) { +func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, ttl *TTL, e error) { form, fe := r.MultipartReader() if fe != nil { glog.V(0).Infoln("MultipartReader [ERROR]", fe) @@ -92,12 +93,13 @@ func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string fileName = fileName[:len(fileName)-3] } modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64) + ttl, _ = ReadTTL(r.FormValue("ttl")) return } func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { fname, mimeType, isGzipped := "", "", false n = new(Needle) - fname, n.Data, mimeType, isGzipped, n.LastModified, e = ParseUpload(r) + fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, e = ParseUpload(r) if e != nil { return } @@ -116,6 +118,9 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { n.LastModified = uint64(time.Now().Unix()) } n.SetHasLastModifiedDate() + if n.Ttl != nil { + n.SetHasTtl() + } if fixJpgOrientation { loweredName := strings.ToLower(fname) diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index 835d7c270..848121ff2 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -14,7 +14,9 @@ const ( FlagHasName = 0x02 FlagHasMime = 0x04 FlagHasLastModifiedDate = 0x08 + FlagHasTtl = 0x10 LastModifiedBytesLength = 5 + TtlBytesLength = 2 ) func (n *Needle) DiskSize() int64 { @@ -70,6 +72,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { if n.HasLastModifiedDate() { n.Size = n.Size + LastModifiedBytesLength } + if n.HasTtl() { + n.Size = n.Size + TtlBytesLength + } } size = n.DataSize util.Uint32toBytes(header[12:16], n.Size) @@ -112,6 +117,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { return } } + if n.HasTtl() { + n.Ttl.ToBytes(header[0:TtlBytesLength]) + if _, err = w.Write(header[0:TtlBytesLength]); err != nil { + return + } + } } padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) @@ -194,6 +205,10 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) { n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength]) index = index + LastModifiedBytesLength } + if index < lenBytes && n.HasTtl() { + n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) + index = index + TtlBytesLength + } } func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) { @@ -263,3 +278,9 @@ func (n *Needle) HasLastModifiedDate() bool { func (n *Needle) SetHasLastModifiedDate() { n.Flags = n.Flags | FlagHasLastModifiedDate } +func (n *Needle) HasTtl() bool { + return n.Flags&FlagHasTtl > 0 +} +func (n *Needle) SetHasTtl() { + n.Flags = n.Flags | FlagHasTtl +} diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go index 696888cd8..c1aca52eb 100644 --- a/go/storage/replica_placement.go +++ b/go/storage/replica_placement.go @@ -5,10 +5,6 @@ import ( "fmt" ) -const ( - ReplicaPlacementCount = 9 -) - type ReplicaPlacement struct { SameRackCount int DiffRackCount int @@ -55,7 +51,3 @@ func (rp *ReplicaPlacement) String() string { func (rp *ReplicaPlacement) GetCopyCount() int { return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1 } - -func (rp *ReplicaPlacement) GetReplicationLevelIndex() int { - return rp.DiffDataCenterCount*3 + rp.DiffRackCount*3 + rp.SameRackCount -} diff --git a/go/storage/store.go b/go/storage/store.go index a6a4f399e..ef38ade98 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -14,6 +14,10 @@ import ( "strings" ) +const ( + MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes +) + type DiskLocation struct { Directory string MaxVolumeCount int @@ -83,11 +87,15 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts } return } -func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string) error { +func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e } + ttl, e := ReadTTL(ttlString) + if e != nil { + return e + } for _, range_string := range strings.Split(volumeListString, ",") { if strings.Index(range_string, "-") < 0 { id_string := range_string @@ -95,7 +103,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, rt) + e = s.addVolume(VolumeId(id), collection, rt, ttl) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -107,7 +115,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, rt); err != nil { + if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil { e = err } } @@ -129,6 +137,14 @@ func (s *Store) DeleteCollection(collection string) (e error) { } return } +func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) { + e = v.Destroy() + if e != nil { + return + } + delete(volumes, v.Id) + return +} func (s *Store) findVolume(vid VolumeId) *Volume { for _, location := range s.Locations { if v, found := location.volumes[vid]; found { @@ -148,13 +164,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement) error { +func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { - glog.V(0).Infoln("In dir", location.Directory, "adds volume =", vid, ", collection =", collection, ", replicaPlacement =", replicaPlacement) - if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement); err == nil { + glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", + location.Directory, vid, collection, replicaPlacement, ttl) + if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil { location.volumes[vid] = volume return nil } else { @@ -190,9 +207,9 @@ func (l *DiskLocation) loadExistingVolumes() { } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, nil); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil { l.volumes[vid] = v - glog.V(0).Infoln("data file", l.Directory+"/"+name, "replicaPlacement =", v.ReplicaPlacement, "version =", v.Version(), "size =", v.Size()) + glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) } } } @@ -240,21 +257,31 @@ func (s *Store) Join() (masterNode string, e error) { for _, location := range s.Locations { maxVolumeCount = maxVolumeCount + location.MaxVolumeCount for k, v := range location.volumes { - volumeMessage := &operation.VolumeInformationMessage{ - Id: proto.Uint32(uint32(k)), - Size: proto.Uint64(uint64(v.Size())), - Collection: proto.String(v.Collection), - FileCount: proto.Uint64(uint64(v.nm.FileCount())), - DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), - DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), - ReadOnly: proto.Bool(v.readOnly), - ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), - Version: proto.Uint32(uint32(v.Version())), - } - volumeMessages = append(volumeMessages, volumeMessage) if maxFileKey < v.nm.MaxFileKey() { maxFileKey = v.nm.MaxFileKey() } + if !v.expired(s.volumeSizeLimit) { + volumeMessage := &operation.VolumeInformationMessage{ + Id: proto.Uint32(uint32(k)), + Size: proto.Uint64(uint64(v.Size())), + Collection: proto.String(v.Collection), + FileCount: proto.Uint64(uint64(v.nm.FileCount())), + DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), + DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), + ReadOnly: proto.Bool(v.readOnly), + ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), + Version: proto.Uint32(uint32(v.Version())), + Ttl: proto.Uint32(v.Ttl.ToUint32()), + } + volumeMessages = append(volumeMessages, volumeMessage) + } else { + if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { + s.DeleteVolume(location.volumes, v) + glog.V(0).Infoln("volume", v.Id, "is deleted.") + } else { + glog.V(0).Infoln("volume", v.Id, "is expired.") + } + } } } diff --git a/go/storage/volume.go b/go/storage/volume.go index dec560545..34ae7e386 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -22,12 +22,13 @@ type Volume struct { SuperBlock - accessLock sync.Mutex + accessLock sync.Mutex + lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement} + v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} e = v.load(true, true) return } @@ -49,12 +50,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { var e error fileName := v.FileName() - if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists { + if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists { if !canRead { return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) } if canWrite { v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + v.lastModifiedTime = uint64(modifiedTime.Unix()) } else { glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") v.dataFile, e = os.Open(fileName + ".dat") @@ -192,6 +194,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error()) } } + if v.lastModifiedTime < n.LastModified { + v.lastModifiedTime = n.LastModified + } return } @@ -221,8 +226,25 @@ func (v *Volume) delete(n *Needle) (uint32, error) { func (v *Volume) read(n *Needle) (int, error) { nv, ok := v.nm.Get(n.Id) - if ok && nv.Offset > 0 { - return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + if !ok || nv.Offset == 0 { + return -1, errors.New("Not Found") + } + bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + if err != nil { + return bytesRead, err + } + if !n.HasTtl() { + return bytesRead, err + } + ttlMinutes := n.Ttl.Minutes() + if ttlMinutes == 0 { + return bytesRead, nil + } + if !n.HasLastModifiedDate() { + return bytesRead, nil + } + if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) { + return bytesRead, nil } return -1, errors.New("Not Found") } @@ -343,3 +365,43 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) { } return true } + +// volume is expired if modified time + volume ttl < now +// except when volume is empty +// or when the volume does not have a ttl +// or when volumeSizeLimit is 0 when server just starts +func (v *Volume) expired(volumeSizeLimit uint64) bool { + if volumeSizeLimit == 0 { + //skip if we don't know size limit + return false + } + if v.ContentSize() == 0 { + return false + } + if v.Ttl == nil || v.Ttl.Minutes() == 0 { + return false + } + glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime) + livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60 + glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) + if int64(v.Ttl.Minutes()) < livedMinutes { + return true + } + return false +} + +// wait either maxDelayMinutes or 10% of ttl minutes +func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool { + if v.Ttl == nil || v.Ttl.Minutes() == 0 { + return false + } + removalDelay := v.Ttl.Minutes() / 10 + if removalDelay > maxDelayMinutes { + removalDelay = maxDelayMinutes + } + + if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) { + return true + } + return false +} diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index 165af1a19..6a954f743 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -8,6 +8,7 @@ type VolumeInfo struct { Id VolumeId Size uint64 ReplicaPlacement *ReplicaPlacement + Ttl *TTL Collection string Version Version FileCount int @@ -32,5 +33,6 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er return vi, e } vi.ReplicaPlacement = rp + vi.Ttl = LoadTTLFromUint32(*m.Ttl) return vi, nil } diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go index 35030b93e..3fbef44d6 100644 --- a/go/storage/volume_super_block.go +++ b/go/storage/volume_super_block.go @@ -2,7 +2,6 @@ package storage import ( "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/util" "fmt" "os" ) @@ -15,12 +14,13 @@ const ( * Super block currently has 8 bytes allocated for each volume. * Byte 0: version, 1 or 2 * Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc -* Byte 2 and byte 3: Time to live in minutes +* Byte 2 and byte 3: Time to live. See TTL for definition +* Rest bytes: Reserved */ type SuperBlock struct { version Version ReplicaPlacement *ReplicaPlacement - Ttl uint16 + Ttl *TTL } func (s *SuperBlock) Version() Version { @@ -30,7 +30,7 @@ func (s *SuperBlock) Bytes() []byte { header := make([]byte, SuperBlockSize) header[0] = byte(s.version) header[1] = s.ReplicaPlacement.Byte() - util.Uint16toBytes(header[2:4], s.Ttl) + s.Ttl.ToBytes(header[2:4]) return header } @@ -70,6 +70,6 @@ func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { err = fmt.Errorf("cannot read replica type: %s", err.Error()) } - superBlock.Ttl = util.BytesToUint16(header[2:4]) + superBlock.Ttl = LoadTTLFromBytes(header[2:4]) return } diff --git a/go/storage/volume_super_block_test.go b/go/storage/volume_super_block_test.go index 19a1bb757..13db4b194 100644 --- a/go/storage/volume_super_block_test.go +++ b/go/storage/volume_super_block_test.go @@ -6,16 +6,17 @@ import ( func TestSuperBlockReadWrite(t *testing.T) { rp, _ := NewReplicaPlacementFromByte(byte(001)) + ttl, _ := ReadTTL("15d") s := &SuperBlock{ version: CurrentVersion, ReplicaPlacement: rp, - Ttl: uint16(35), + Ttl: ttl, } bytes := s.Bytes() - if !(bytes[2] == 0 && bytes[3] == 35) { - println("byte[2]:", bytes[2], "byte[3]:", bytes[3]) + if !(bytes[2] == 15 && bytes[3] == Day) { + println("byte[2]:", bytes[2], "byte[3]:", bytes[3]) t.Fail() } diff --git a/go/storage/volume_ttl.go b/go/storage/volume_ttl.go new file mode 100644 index 000000000..5ff43e24e --- /dev/null +++ b/go/storage/volume_ttl.go @@ -0,0 +1,147 @@ +package storage + +import ( + "strconv" +) + +var ( + TtlRange []uint16 +) + +const ( + //stored unit types + Empty byte = iota + Minute + Hour + Day + Week + Month + Year +) + +func init() { + TtlRange = []uint16{3, 10, 30, + 60 /*1 hour*/, 60 * 3, 60 * 6, 60 * 12, + 1440 /*1 day*/, 1440 * 3, 1440 * 7, 1440 * 15, 1440 * 31, + 44888 /*1 month*/, 65535, + } +} + +type TTL struct { + count byte + unit byte +} + +var EMPTY_TTL = &TTL{} + +// translate a readable ttl to internal ttl +// Supports format example: +// 3m: 3 minutes +// 4h: 4 hours +// 5d: 5 days +// 6w: 6 weeks +// 7M: 7 months +// 8y: 8 years +func ReadTTL(ttlString string) (*TTL, error) { + if ttlString == "" { + return EMPTY_TTL, nil + } + ttlBytes := []byte(ttlString) + unitByte := ttlBytes[len(ttlBytes)-1] + countBytes := ttlBytes[0 : len(ttlBytes)-1] + if '0' <= unitByte && unitByte <= '9' { + countBytes = ttlBytes + unitByte = 'm' + } + count, err := strconv.Atoi(string(countBytes)) + unit := toStoredByte(unitByte) + return &TTL{count: byte(count), unit: unit}, err +} + +// read stored bytes to a ttl +func LoadTTLFromBytes(input []byte) (t *TTL) { + return &TTL{count: input[0], unit: input[1]} +} + +// read stored bytes to a ttl +func LoadTTLFromUint32(ttl uint32) (t *TTL) { + input := make([]byte, 2) + input[1] = byte(ttl) + input[0] = byte(ttl >> 8) + return LoadTTLFromBytes(input) +} + +// save stored bytes to an output with 2 bytes +func (t TTL) ToBytes(output []byte) { + output[0] = t.count + output[1] = t.unit +} + +func (t TTL) ToUint32() (output uint32) { + output = uint32(t.count) << 8 + output += uint32(t.unit) + return output +} + +func (t TTL) String() string { + if t.count == 0 { + return "" + } + if t.unit == Empty { + return "" + } + countString := strconv.Itoa(int(t.count)) + switch t.unit { + case Minute: + return countString + "m" + case Hour: + return countString + "h" + case Day: + return countString + "d" + case Week: + return countString + "w" + case Month: + return countString + "M" + case Year: + return countString + "y" + } + return "" +} + +func toStoredByte(readableUnitByte byte) byte { + switch readableUnitByte { + case 'm': + return Minute + case 'h': + return Hour + case 'd': + return Day + case 'w': + return Week + case 'M': + return Month + case 'Y': + return Year + } + return 0 +} + +func (t TTL) Minutes() uint32 { + switch t.unit { + case Empty: + return 0 + case Minute: + return uint32(t.count) + case Hour: + return uint32(t.count) * 60 + case Day: + return uint32(t.count) * 60 * 24 + case Week: + return uint32(t.count) * 60 * 24 * 7 + case Month: + return uint32(t.count) * 60 * 24 * 31 + case Year: + return uint32(t.count) * 60 * 24 * 31 * 365 + } + return 0 +} diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index 7d2a38cb8..706a1f951 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -4,7 +4,7 @@ import ( "code.google.com/p/weed-fs/go/glog" "fmt" "os" - _ "time" + "time" ) func (v *Volume) garbageLevel() float64 { @@ -13,9 +13,10 @@ func (v *Volume) garbageLevel() float64 { func (v *Volume) Compact() error { glog.V(3).Infof("Compacting ...") - v.accessLock.Lock() - defer v.accessLock.Unlock() - glog.V(3).Infof("Got Compaction lock...") + //no need to lock for copy on write + //v.accessLock.Lock() + //defer v.accessLock.Unlock() + //glog.V(3).Infof("Got Compaction lock...") filePath := v.FileName() glog.V(3).Infof("creating copies for volume %d ...", v.Id) @@ -59,10 +60,15 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro nm := NewNeedleMap(idx) new_offset := int64(SuperBlockSize) + now := uint64(time.Now().Unix()) + err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error { _, err = dst.Write(superBlock.Bytes()) return err }, true, func(n *Needle, offset int64) error { + if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { + return nil + } nv, ok := v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go index 77b4ac508..4aeef35f7 100644 --- a/go/topology/allocate_volume.go +++ b/go/topology/allocate_volume.go @@ -12,11 +12,12 @@ type AllocateVolumeResult struct { Error string } -func AllocateVolume(dn *DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error { +func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error { values := make(url.Values) values.Add("volume", vid.String()) - values.Add("collection", collection) - values.Add("replication", rp.String()) + values.Add("collection", option.Collection) + values.Add("replication", option.ReplicaPlacement.String()) + values.Add("ttl", option.Ttl.String()) jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values) if err != nil { return err diff --git a/go/topology/collection.go b/go/topology/collection.go index b21122d22..c014231af 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -1,33 +1,34 @@ package topology import ( - "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/storage" ) type Collection struct { Name string volumeSizeLimit uint64 - replicaType2VolumeLayout []*VolumeLayout + storageType2VolumeLayout map[string]*VolumeLayout } func NewCollection(name string, volumeSizeLimit uint64) *Collection { c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} - c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.ReplicaPlacementCount) + c.storageType2VolumeLayout = make(map[string]*VolumeLayout) return c } -func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement) *VolumeLayout { - replicaPlacementIndex := rp.GetReplicationLevelIndex() - if c.replicaType2VolumeLayout[replicaPlacementIndex] == nil { - glog.V(0).Infoln("collection", c.Name, "adding replication type", rp) - c.replicaType2VolumeLayout[replicaPlacementIndex] = NewVolumeLayout(rp, c.volumeSizeLimit) +func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { + keyString := rp.String() + if ttl != nil { + keyString += ttl.String() } - return c.replicaType2VolumeLayout[replicaPlacementIndex] + if c.storageType2VolumeLayout[keyString] == nil { + c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit) + } + return c.storageType2VolumeLayout[keyString] } func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { - for _, vl := range c.replicaType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout { if vl != nil { if list := vl.Lookup(vid); list != nil { return list @@ -38,7 +39,7 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { } func (c *Collection) ListVolumeServers() (nodes []*DataNode) { - for _, vl := range c.replicaType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout { if vl != nil { if list := vl.ListVolumeServers(); list != nil { nodes = append(nodes, list...) diff --git a/go/topology/data_node.go b/go/topology/data_node.go index ae80e08bb..c67c5c1c1 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -38,15 +38,16 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { } } -func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) { +func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) { actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } - for vid, _ := range dn.volumes { + for vid, v := range dn.volumes { if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) delete(dn.volumes, vid) + deletedVolumes = append(deletedVolumes, v) dn.UpAdjustVolumeCountDelta(-1) dn.UpAdjustActiveVolumeCountDelta(-1) } @@ -54,6 +55,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) { for _, v := range actualVolumes { dn.AddOrUpdateVolume(v) } + return } func (dn *DataNode) GetDataCenter() *DataCenter { diff --git a/go/topology/topology.go b/go/topology/topology.go index f1daffb53..acdef5e36 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -110,12 +110,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId { } func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool { - vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement) + vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option) + vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } @@ -123,12 +123,12 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { _, ok := t.collectionMap[collectionName] if !ok { t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit) } - return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp) + return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl) } func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) { @@ -141,10 +141,14 @@ func (t *Topology) DeleteCollection(collectionName string) { } func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn) + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn) +} +func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { + glog.Infof("removing volume info:%+v", v) + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn) } -func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) { +func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { t.Sequence.SetMax(*joinMessage.MaxFileKey) dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack) dc := t.GetOrCreateDataCenter(dcName) @@ -162,10 +166,13 @@ func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) { glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) } } - dn.UpdateVolumes(volumeInfos) + deletedVolumes := dn.UpdateVolumes(volumeInfos) for _, v := range volumeInfos { t.RegisterVolumeLayout(v, dn) } + for _, v := range deletedVolumes { + t.UnRegisterVolumeLayout(v, dn) + } } func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 7398ff9bf..1e630e149 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -41,7 +41,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { }() } func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement) + vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl) if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } @@ -55,7 +55,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.volumes { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) vl.SetVolumeUnavailable(dn, v.Id) } dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) @@ -65,7 +65,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go index f66d4c251..d6400c988 100644 --- a/go/topology/topology_map.go +++ b/go/topology/topology_map.go @@ -14,7 +14,7 @@ func (t *Topology) ToMap() interface{} { m["DataCenters"] = dcs var layouts []interface{} for _, c := range t.collectionMap { - for _, layout := range c.replicaType2VolumeLayout { + for _, layout := range c.storageType2VolumeLayout { if layout != nil { tmp := layout.ToMap() tmp["collection"] = c.Name diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index a1d6d2564..9eaca37d4 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -80,7 +80,7 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis } func (t *Topology) Vacuum(garbageThreshold string) int { for _, c := range t.collectionMap { - for _, vl := range c.replicaType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout { if vl != nil { for vid, locationlist := range vl.vid2location { if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index 4965e3ba0..778aa038a 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -19,6 +19,7 @@ This package is created to resolve these replica placement issues: type VolumeGrowOption struct { Collection string ReplicaPlacement *storage.ReplicaPlacement + Ttl *storage.TTL DataCenter string Rack string DataNode string @@ -184,8 +185,15 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { for _, server := range servers { - if err := AllocateVolume(server, vid, option.Collection, option.ReplicaPlacement); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Version: storage.CurrentVersion} + if err := AllocateVolume(server, vid, option); err == nil { + vi := storage.VolumeInfo{ + Id: vid, + Size: 0, + Collection: option.Collection, + ReplicaPlacement: option.ReplicaPlacement, + Ttl: option.Ttl, + Version: storage.CurrentVersion, + } server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(vi, server) glog.V(0).Infoln("Created Volume", vid, "on", server) diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 538acb54c..1e55072a3 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -11,15 +11,17 @@ import ( // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *storage.ReplicaPlacement + ttl *storage.TTL vid2location map[storage.VolumeId]*VolumeLocationList writables []storage.VolumeId // transient array of writable volume id volumeSizeLimit uint64 accessLock sync.Mutex } -func NewVolumeLayout(rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { return &VolumeLayout{ rp: rp, + ttl: ttl, vid2location: make(map[storage.VolumeId]*VolumeLocationList), writables: *new([]storage.VolumeId), volumeSizeLimit: volumeSizeLimit, @@ -42,6 +44,14 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } } +func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + vl.removeFromWritable(v.Id) + delete(vl.vid2location, v.Id) +} + func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) { for _, id := range vl.writables { if vid == id { @@ -192,6 +202,7 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { func (vl *VolumeLayout) ToMap() map[string]interface{} { m := make(map[string]interface{}) m["replication"] = vl.rp.String() + m["ttl"] = vl.ttl.String() m["writables"] = vl.writables //m["locations"] = vl.vid2location return m diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go index eab923751..27aebaef0 100644 --- a/go/weed/benchmark.go +++ b/go/weed/benchmark.go @@ -201,7 +201,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) { start := time.Now() fileSize := int64(*b.fileSize + rand.Intn(64)) fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize} - if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil { + if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil { fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection if _, ok := serverLimitChan[fp.Server]; !ok { serverLimitChan[fp.Server] = make(chan bool, 7) diff --git a/go/weed/compact.go b/go/weed/compact.go index 580f3f98d..57a02261f 100644 --- a/go/weed/compact.go +++ b/go/weed/compact.go @@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool { } vid := storage.VolumeId(*compactVolumeId) - v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil) + v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil, nil) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/go/weed/upload.go b/go/weed/upload.go index b59313a2a..c21499dd0 100644 --- a/go/weed/upload.go +++ b/go/weed/upload.go @@ -12,6 +12,7 @@ var ( uploadReplication *string uploadCollection *string uploadDir *string + uploadTtl *string include *string maxMB *int ) @@ -24,6 +25,7 @@ func init() { include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") uploadReplication = cmdUpload.Flag.String("replication", "", "replication type") uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name") + uploadTtl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit") } @@ -67,7 +69,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB) + results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -84,7 +86,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { fmt.Println(e.Error()) } - results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB) + results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) } diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index a547d7462..49e84378c 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -99,14 +99,14 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("parsing upload file...") - fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r) + fname, data, mimeType, isGzipped, lastModified, _, pe := storage.ParseUpload(r) if pe != nil { writeJsonError(w, r, pe) return } debug("assigning file id for", fname) - assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection")) + assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"), r.FormValue("ttl")) if ae != nil { writeJsonError(w, r, ae) return diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index 0f83352a9..f760030f3 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -109,7 +109,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() - assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection) + assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection, query.Get("ttl")) if ae != nil { glog.V(0).Infoln("failing to assign a file id", ae.Error()) writeJsonError(w, r, ae) @@ -131,14 +131,14 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } resp, do_err := util.Do(request) if do_err != nil { - glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) + glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error()) writeJsonError(w, r, do_err) return } defer resp.Body.Close() resp_body, ra_err := ioutil.ReadAll(resp.Body) if ra_err != nil { - glog.V(0).Infoln("failing to upload to volume server", ra_err.Error()) + glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error()) writeJsonError(w, r, ra_err) return } @@ -146,12 +146,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { var ret operation.UploadResult unmarshal_err := json.Unmarshal(resp_body, &ret) if unmarshal_err != nil { - glog.V(0).Infoln("failing to read upload resonse", string(resp_body)) + glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body)) writeJsonError(w, r, unmarshal_err) return } if ret.Error != "" { - glog.V(0).Infoln("failing to post to volume server", ret.Error) + glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) writeJsonError(w, r, errors.New(ret.Error)) return } @@ -169,7 +169,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { glog.V(4).Infoln("saving", path, "=>", assignResult.Fid) if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil { operation.DeleteFile(fs.master, assignResult.Fid) //clean up - glog.V(0).Infoln("failing to write to filer server", db_err.Error()) + glog.V(0).Infoln("failing to write to filer server", r.RequestURI, db_err.Error()) writeJsonError(w, r, db_err) return } diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index d50075fd5..1458bf3e6 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -55,7 +55,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { } } - ms.Topo.RegisterVolumes(joinMessage) + ms.Topo.ProcessJoinMessage(joinMessage) writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024}) } @@ -144,7 +144,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r * } func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool { - vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement) + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } @@ -157,9 +157,14 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if err != nil { return nil, err } + ttl, err := storage.ReadTTL(r.FormValue("ttl")) + if err != nil { + return nil, err + } volumeGrowOption := &topology.VolumeGrowOption{ Collection: r.FormValue("collection"), ReplicaPlacement: replicaPlacement, + Ttl: ttl, DataCenter: r.FormValue("dataCenter"), Rack: r.FormValue("rack"), DataNode: r.FormValue("dataNode"), diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go index 6d285524a..1c01fdfd0 100644 --- a/go/weed/weed_server/volume_server_handlers_admin.go +++ b/go/weed/weed_server/volume_server_handlers_admin.go @@ -16,7 +16,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication")) + err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), r.FormValue("ttl")) if err == nil { writeJsonQuiet(w, r, map[string]string{"error": ""}) } else {