diff --git a/go/filer/directory_in_map.go b/go/filer/directory_in_map.go index 46a626f77..35b4e53c1 100644 --- a/go/filer/directory_in_map.go +++ b/go/filer/directory_in_map.go @@ -187,10 +187,10 @@ func (dm *DirectoryManagerInMap) makeDirectory(dirPath string) (dir *DirectoryEn for i := 1; i < len(parts); i++ { sub, ok := dir.SubDirectories[parts[i]] if !ok { - var err error + var err error sub, err = dm.NewDirectoryEntryInMap(dir, parts[i]) if err != nil { - return nil, false + return nil, false } dir.SubDirectories[parts[i]] = sub created = true diff --git a/go/operation/assign_file_id.go b/go/operation/assign_file_id.go index 018e1d763..34d371f37 100644 --- a/go/operation/assign_file_id.go +++ b/go/operation/assign_file_id.go @@ -17,7 +17,7 @@ type AssignResult struct { Error string `json:"error,omitempty"` } -func Assign(server string, count int, replication string, collection string) (*AssignResult, error) { +func Assign(server string, count int, replication string, collection string, ttl string) (*AssignResult, error) { values := make(url.Values) values.Add("count", strconv.Itoa(count)) if replication != "" { @@ -26,6 +26,9 @@ func Assign(server string, count int, replication string, collection string) (*A if collection != "" { values.Add("collection", collection) } + if ttl != "" { + values.Add("ttl", ttl) + } jsonBlob, err := util.Post("http://"+server+"/dir/assign", values) glog.V(2).Info("assign result :", string(jsonBlob)) if err != nil { diff --git a/go/operation/submit.go b/go/operation/submit.go index 9191f7d9a..ec45cc320 100644 --- a/go/operation/submit.go +++ b/go/operation/submit.go @@ -20,6 +20,7 @@ type FilePart struct { ModTime int64 //in seconds Replication string Collection string + Ttl string Server string //this comes from assign result Fid string //this comes from assign result, but customizable } @@ -32,12 +33,12 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } -func SubmitFiles(master string, files []FilePart, replication string, collection string, maxMB int) ([]SubmitResult, error) { +func SubmitFiles(master string, files []FilePart, replication string, collection string, ttl string, maxMB int) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { results[index].FileName = file.FileName } - ret, err := Assign(master, len(files), replication, collection) + ret, err := Assign(master, len(files), replication, collection, ttl) if err != nil { for index, _ := range files { results[index].Error = err.Error() @@ -112,7 +113,7 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error) chunks := fi.FileSize/chunkSize + 1 fids := make([]string, 0) for i := int64(0); i < chunks; i++ { - id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection) + id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection, fi.Ttl) if e != nil { return 0, e } @@ -130,8 +131,8 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error) return } -func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string) (fid string, size uint32, e error) { - ret, err := Assign(master, 1, replication, collection) +func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string, ttl string) (fid string, size uint32, e error) { + ret, err := Assign(master, 1, replication, collection, ttl) if err != nil { return "", 0, err } diff --git a/go/operation/system_message.pb.go b/go/operation/system_message.pb.go index 45ae8a648..9f00dd74d 100644 --- a/go/operation/system_message.pb.go +++ b/go/operation/system_message.pb.go @@ -15,12 +15,10 @@ It has these top-level messages: package operation import proto "code.google.com/p/goprotobuf/proto" -import json "encoding/json" import math "math" -// Reference proto, json, and math imports to suppress error if they are not otherwise used. +// Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = &json.SyntaxError{} var _ = math.Inf type VolumeInformationMessage struct { @@ -33,6 +31,7 @@ type VolumeInformationMessage struct { ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"` ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"` Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"` + Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -105,6 +104,13 @@ func (m *VolumeInformationMessage) GetVersion() uint32 { return Default_VolumeInformationMessage_Version } +func (m *VolumeInformationMessage) GetTtl() uint32 { + if m != nil && m.Ttl != nil { + return *m.Ttl + } + return 0 +} + type JoinMessage struct { IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"` Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"` diff --git a/go/proto/system_message.proto b/go/proto/system_message.proto index 15574ad56..ecd4973f7 100644 --- a/go/proto/system_message.proto +++ b/go/proto/system_message.proto @@ -10,6 +10,7 @@ message VolumeInformationMessage { optional bool read_only = 7; required uint32 replica_placement = 8; optional uint32 version = 9 [default=2]; + optional uint32 ttl = 10; } message JoinMessage { diff --git a/go/storage/needle.go b/go/storage/needle.go index 77aa70169..3bf627141 100644 --- a/go/storage/needle.go +++ b/go/storage/needle.go @@ -38,12 +38,13 @@ type Needle struct { MimeSize uint8 //version2 Mime []byte `comment:"maximum 256 characters"` //version2 LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk + Ttl *TTL Checksum CRC `comment:"CRC32 to check integrity"` Padding []byte `comment:"Aligned to 8 bytes"` } -func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, e error) { +func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, ttl *TTL, e error) { form, fe := r.MultipartReader() if fe != nil { glog.V(0).Infoln("MultipartReader [ERROR]", fe) @@ -92,12 +93,13 @@ func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string fileName = fileName[:len(fileName)-3] } modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64) + ttl, _ = ReadTTL(r.FormValue("ttl")) return } func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { fname, mimeType, isGzipped := "", "", false n = new(Needle) - fname, n.Data, mimeType, isGzipped, n.LastModified, e = ParseUpload(r) + fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, e = ParseUpload(r) if e != nil { return } @@ -116,6 +118,9 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { n.LastModified = uint64(time.Now().Unix()) } n.SetHasLastModifiedDate() + if n.Ttl != nil { + n.SetHasTtl() + } if fixJpgOrientation { loweredName := strings.ToLower(fname) diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index 835d7c270..848121ff2 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -14,7 +14,9 @@ const ( FlagHasName = 0x02 FlagHasMime = 0x04 FlagHasLastModifiedDate = 0x08 + FlagHasTtl = 0x10 LastModifiedBytesLength = 5 + TtlBytesLength = 2 ) func (n *Needle) DiskSize() int64 { @@ -70,6 +72,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { if n.HasLastModifiedDate() { n.Size = n.Size + LastModifiedBytesLength } + if n.HasTtl() { + n.Size = n.Size + TtlBytesLength + } } size = n.DataSize util.Uint32toBytes(header[12:16], n.Size) @@ -112,6 +117,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { return } } + if n.HasTtl() { + n.Ttl.ToBytes(header[0:TtlBytesLength]) + if _, err = w.Write(header[0:TtlBytesLength]); err != nil { + return + } + } } padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) @@ -194,6 +205,10 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) { n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength]) index = index + LastModifiedBytesLength } + if index < lenBytes && n.HasTtl() { + n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) + index = index + TtlBytesLength + } } func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) { @@ -263,3 +278,9 @@ func (n *Needle) HasLastModifiedDate() bool { func (n *Needle) SetHasLastModifiedDate() { n.Flags = n.Flags | FlagHasLastModifiedDate } +func (n *Needle) HasTtl() bool { + return n.Flags&FlagHasTtl > 0 +} +func (n *Needle) SetHasTtl() { + n.Flags = n.Flags | FlagHasTtl +} diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go index 696888cd8..c1aca52eb 100644 --- a/go/storage/replica_placement.go +++ b/go/storage/replica_placement.go @@ -5,10 +5,6 @@ import ( "fmt" ) -const ( - ReplicaPlacementCount = 9 -) - type ReplicaPlacement struct { SameRackCount int DiffRackCount int @@ -55,7 +51,3 @@ func (rp *ReplicaPlacement) String() string { func (rp *ReplicaPlacement) GetCopyCount() int { return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1 } - -func (rp *ReplicaPlacement) GetReplicationLevelIndex() int { - return rp.DiffDataCenterCount*3 + rp.DiffRackCount*3 + rp.SameRackCount -} diff --git a/go/storage/store.go b/go/storage/store.go index a6a4f399e..ef38ade98 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -14,6 +14,10 @@ import ( "strings" ) +const ( + MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes +) + type DiskLocation struct { Directory string MaxVolumeCount int @@ -83,11 +87,15 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts } return } -func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string) error { +func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e } + ttl, e := ReadTTL(ttlString) + if e != nil { + return e + } for _, range_string := range strings.Split(volumeListString, ",") { if strings.Index(range_string, "-") < 0 { id_string := range_string @@ -95,7 +103,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, rt) + e = s.addVolume(VolumeId(id), collection, rt, ttl) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -107,7 +115,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, rt); err != nil { + if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil { e = err } } @@ -129,6 +137,14 @@ func (s *Store) DeleteCollection(collection string) (e error) { } return } +func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) { + e = v.Destroy() + if e != nil { + return + } + delete(volumes, v.Id) + return +} func (s *Store) findVolume(vid VolumeId) *Volume { for _, location := range s.Locations { if v, found := location.volumes[vid]; found { @@ -148,13 +164,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement) error { +func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { - glog.V(0).Infoln("In dir", location.Directory, "adds volume =", vid, ", collection =", collection, ", replicaPlacement =", replicaPlacement) - if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement); err == nil { + glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", + location.Directory, vid, collection, replicaPlacement, ttl) + if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil { location.volumes[vid] = volume return nil } else { @@ -190,9 +207,9 @@ func (l *DiskLocation) loadExistingVolumes() { } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, nil); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil { l.volumes[vid] = v - glog.V(0).Infoln("data file", l.Directory+"/"+name, "replicaPlacement =", v.ReplicaPlacement, "version =", v.Version(), "size =", v.Size()) + glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) } } } @@ -240,21 +257,31 @@ func (s *Store) Join() (masterNode string, e error) { for _, location := range s.Locations { maxVolumeCount = maxVolumeCount + location.MaxVolumeCount for k, v := range location.volumes { - volumeMessage := &operation.VolumeInformationMessage{ - Id: proto.Uint32(uint32(k)), - Size: proto.Uint64(uint64(v.Size())), - Collection: proto.String(v.Collection), - FileCount: proto.Uint64(uint64(v.nm.FileCount())), - DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), - DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), - ReadOnly: proto.Bool(v.readOnly), - ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), - Version: proto.Uint32(uint32(v.Version())), - } - volumeMessages = append(volumeMessages, volumeMessage) if maxFileKey < v.nm.MaxFileKey() { maxFileKey = v.nm.MaxFileKey() } + if !v.expired(s.volumeSizeLimit) { + volumeMessage := &operation.VolumeInformationMessage{ + Id: proto.Uint32(uint32(k)), + Size: proto.Uint64(uint64(v.Size())), + Collection: proto.String(v.Collection), + FileCount: proto.Uint64(uint64(v.nm.FileCount())), + DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), + DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), + ReadOnly: proto.Bool(v.readOnly), + ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), + Version: proto.Uint32(uint32(v.Version())), + Ttl: proto.Uint32(v.Ttl.ToUint32()), + } + volumeMessages = append(volumeMessages, volumeMessage) + } else { + if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { + s.DeleteVolume(location.volumes, v) + glog.V(0).Infoln("volume", v.Id, "is deleted.") + } else { + glog.V(0).Infoln("volume", v.Id, "is expired.") + } + } } } diff --git a/go/storage/volume.go b/go/storage/volume.go index dec560545..34ae7e386 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -22,12 +22,13 @@ type Volume struct { SuperBlock - accessLock sync.Mutex + accessLock sync.Mutex + lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement} + v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} e = v.load(true, true) return } @@ -49,12 +50,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { var e error fileName := v.FileName() - if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists { + if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists { if !canRead { return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) } if canWrite { v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + v.lastModifiedTime = uint64(modifiedTime.Unix()) } else { glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") v.dataFile, e = os.Open(fileName + ".dat") @@ -192,6 +194,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error()) } } + if v.lastModifiedTime < n.LastModified { + v.lastModifiedTime = n.LastModified + } return } @@ -221,8 +226,25 @@ func (v *Volume) delete(n *Needle) (uint32, error) { func (v *Volume) read(n *Needle) (int, error) { nv, ok := v.nm.Get(n.Id) - if ok && nv.Offset > 0 { - return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + if !ok || nv.Offset == 0 { + return -1, errors.New("Not Found") + } + bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + if err != nil { + return bytesRead, err + } + if !n.HasTtl() { + return bytesRead, err + } + ttlMinutes := n.Ttl.Minutes() + if ttlMinutes == 0 { + return bytesRead, nil + } + if !n.HasLastModifiedDate() { + return bytesRead, nil + } + if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) { + return bytesRead, nil } return -1, errors.New("Not Found") } @@ -343,3 +365,43 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) { } return true } + +// volume is expired if modified time + volume ttl < now +// except when volume is empty +// or when the volume does not have a ttl +// or when volumeSizeLimit is 0 when server just starts +func (v *Volume) expired(volumeSizeLimit uint64) bool { + if volumeSizeLimit == 0 { + //skip if we don't know size limit + return false + } + if v.ContentSize() == 0 { + return false + } + if v.Ttl == nil || v.Ttl.Minutes() == 0 { + return false + } + glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime) + livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60 + glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) + if int64(v.Ttl.Minutes()) < livedMinutes { + return true + } + return false +} + +// wait either maxDelayMinutes or 10% of ttl minutes +func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool { + if v.Ttl == nil || v.Ttl.Minutes() == 0 { + return false + } + removalDelay := v.Ttl.Minutes() / 10 + if removalDelay > maxDelayMinutes { + removalDelay = maxDelayMinutes + } + + if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) { + return true + } + return false +} diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index 165af1a19..6a954f743 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -8,6 +8,7 @@ type VolumeInfo struct { Id VolumeId Size uint64 ReplicaPlacement *ReplicaPlacement + Ttl *TTL Collection string Version Version FileCount int @@ -32,5 +33,6 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er return vi, e } vi.ReplicaPlacement = rp + vi.Ttl = LoadTTLFromUint32(*m.Ttl) return vi, nil } diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go index 35030b93e..3fbef44d6 100644 --- a/go/storage/volume_super_block.go +++ b/go/storage/volume_super_block.go @@ -2,7 +2,6 @@ package storage import ( "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/util" "fmt" "os" ) @@ -15,12 +14,13 @@ const ( * Super block currently has 8 bytes allocated for each volume. * Byte 0: version, 1 or 2 * Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc -* Byte 2 and byte 3: Time to live in minutes +* Byte 2 and byte 3: Time to live. See TTL for definition +* Rest bytes: Reserved */ type SuperBlock struct { version Version ReplicaPlacement *ReplicaPlacement - Ttl uint16 + Ttl *TTL } func (s *SuperBlock) Version() Version { @@ -30,7 +30,7 @@ func (s *SuperBlock) Bytes() []byte { header := make([]byte, SuperBlockSize) header[0] = byte(s.version) header[1] = s.ReplicaPlacement.Byte() - util.Uint16toBytes(header[2:4], s.Ttl) + s.Ttl.ToBytes(header[2:4]) return header } @@ -70,6 +70,6 @@ func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { err = fmt.Errorf("cannot read replica type: %s", err.Error()) } - superBlock.Ttl = util.BytesToUint16(header[2:4]) + superBlock.Ttl = LoadTTLFromBytes(header[2:4]) return } diff --git a/go/storage/volume_super_block_test.go b/go/storage/volume_super_block_test.go index 19a1bb757..13db4b194 100644 --- a/go/storage/volume_super_block_test.go +++ b/go/storage/volume_super_block_test.go @@ -6,16 +6,17 @@ import ( func TestSuperBlockReadWrite(t *testing.T) { rp, _ := NewReplicaPlacementFromByte(byte(001)) + ttl, _ := ReadTTL("15d") s := &SuperBlock{ version: CurrentVersion, ReplicaPlacement: rp, - Ttl: uint16(35), + Ttl: ttl, } bytes := s.Bytes() - if !(bytes[2] == 0 && bytes[3] == 35) { - println("byte[2]:", bytes[2], "byte[3]:", bytes[3]) + if !(bytes[2] == 15 && bytes[3] == Day) { + println("byte[2]:", bytes[2], "byte[3]:", bytes[3]) t.Fail() } diff --git a/go/storage/volume_ttl.go b/go/storage/volume_ttl.go new file mode 100644 index 000000000..5ff43e24e --- /dev/null +++ b/go/storage/volume_ttl.go @@ -0,0 +1,147 @@ +package storage + +import ( + "strconv" +) + +var ( + TtlRange []uint16 +) + +const ( + //stored unit types + Empty byte = iota + Minute + Hour + Day + Week + Month + Year +) + +func init() { + TtlRange = []uint16{3, 10, 30, + 60 /*1 hour*/, 60 * 3, 60 * 6, 60 * 12, + 1440 /*1 day*/, 1440 * 3, 1440 * 7, 1440 * 15, 1440 * 31, + 44888 /*1 month*/, 65535, + } +} + +type TTL struct { + count byte + unit byte +} + +var EMPTY_TTL = &TTL{} + +// translate a readable ttl to internal ttl +// Supports format example: +// 3m: 3 minutes +// 4h: 4 hours +// 5d: 5 days +// 6w: 6 weeks +// 7M: 7 months +// 8y: 8 years +func ReadTTL(ttlString string) (*TTL, error) { + if ttlString == "" { + return EMPTY_TTL, nil + } + ttlBytes := []byte(ttlString) + unitByte := ttlBytes[len(ttlBytes)-1] + countBytes := ttlBytes[0 : len(ttlBytes)-1] + if '0' <= unitByte && unitByte <= '9' { + countBytes = ttlBytes + unitByte = 'm' + } + count, err := strconv.Atoi(string(countBytes)) + unit := toStoredByte(unitByte) + return &TTL{count: byte(count), unit: unit}, err +} + +// read stored bytes to a ttl +func LoadTTLFromBytes(input []byte) (t *TTL) { + return &TTL{count: input[0], unit: input[1]} +} + +// read stored bytes to a ttl +func LoadTTLFromUint32(ttl uint32) (t *TTL) { + input := make([]byte, 2) + input[1] = byte(ttl) + input[0] = byte(ttl >> 8) + return LoadTTLFromBytes(input) +} + +// save stored bytes to an output with 2 bytes +func (t TTL) ToBytes(output []byte) { + output[0] = t.count + output[1] = t.unit +} + +func (t TTL) ToUint32() (output uint32) { + output = uint32(t.count) << 8 + output += uint32(t.unit) + return output +} + +func (t TTL) String() string { + if t.count == 0 { + return "" + } + if t.unit == Empty { + return "" + } + countString := strconv.Itoa(int(t.count)) + switch t.unit { + case Minute: + return countString + "m" + case Hour: + return countString + "h" + case Day: + return countString + "d" + case Week: + return countString + "w" + case Month: + return countString + "M" + case Year: + return countString + "y" + } + return "" +} + +func toStoredByte(readableUnitByte byte) byte { + switch readableUnitByte { + case 'm': + return Minute + case 'h': + return Hour + case 'd': + return Day + case 'w': + return Week + case 'M': + return Month + case 'Y': + return Year + } + return 0 +} + +func (t TTL) Minutes() uint32 { + switch t.unit { + case Empty: + return 0 + case Minute: + return uint32(t.count) + case Hour: + return uint32(t.count) * 60 + case Day: + return uint32(t.count) * 60 * 24 + case Week: + return uint32(t.count) * 60 * 24 * 7 + case Month: + return uint32(t.count) * 60 * 24 * 31 + case Year: + return uint32(t.count) * 60 * 24 * 31 * 365 + } + return 0 +} diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index 7d2a38cb8..706a1f951 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -4,7 +4,7 @@ import ( "code.google.com/p/weed-fs/go/glog" "fmt" "os" - _ "time" + "time" ) func (v *Volume) garbageLevel() float64 { @@ -13,9 +13,10 @@ func (v *Volume) garbageLevel() float64 { func (v *Volume) Compact() error { glog.V(3).Infof("Compacting ...") - v.accessLock.Lock() - defer v.accessLock.Unlock() - glog.V(3).Infof("Got Compaction lock...") + //no need to lock for copy on write + //v.accessLock.Lock() + //defer v.accessLock.Unlock() + //glog.V(3).Infof("Got Compaction lock...") filePath := v.FileName() glog.V(3).Infof("creating copies for volume %d ...", v.Id) @@ -59,10 +60,15 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro nm := NewNeedleMap(idx) new_offset := int64(SuperBlockSize) + now := uint64(time.Now().Unix()) + err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error { _, err = dst.Write(superBlock.Bytes()) return err }, true, func(n *Needle, offset int64) error { + if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { + return nil + } nv, ok := v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go index 77b4ac508..4aeef35f7 100644 --- a/go/topology/allocate_volume.go +++ b/go/topology/allocate_volume.go @@ -12,11 +12,12 @@ type AllocateVolumeResult struct { Error string } -func AllocateVolume(dn *DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error { +func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error { values := make(url.Values) values.Add("volume", vid.String()) - values.Add("collection", collection) - values.Add("replication", rp.String()) + values.Add("collection", option.Collection) + values.Add("replication", option.ReplicaPlacement.String()) + values.Add("ttl", option.Ttl.String()) jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values) if err != nil { return err diff --git a/go/topology/collection.go b/go/topology/collection.go index b21122d22..c014231af 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -1,33 +1,34 @@ package topology import ( - "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/storage" ) type Collection struct { Name string volumeSizeLimit uint64 - replicaType2VolumeLayout []*VolumeLayout + storageType2VolumeLayout map[string]*VolumeLayout } func NewCollection(name string, volumeSizeLimit uint64) *Collection { c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} - c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.ReplicaPlacementCount) + c.storageType2VolumeLayout = make(map[string]*VolumeLayout) return c } -func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement) *VolumeLayout { - replicaPlacementIndex := rp.GetReplicationLevelIndex() - if c.replicaType2VolumeLayout[replicaPlacementIndex] == nil { - glog.V(0).Infoln("collection", c.Name, "adding replication type", rp) - c.replicaType2VolumeLayout[replicaPlacementIndex] = NewVolumeLayout(rp, c.volumeSizeLimit) +func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { + keyString := rp.String() + if ttl != nil { + keyString += ttl.String() } - return c.replicaType2VolumeLayout[replicaPlacementIndex] + if c.storageType2VolumeLayout[keyString] == nil { + c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit) + } + return c.storageType2VolumeLayout[keyString] } func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { - for _, vl := range c.replicaType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout { if vl != nil { if list := vl.Lookup(vid); list != nil { return list @@ -38,7 +39,7 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { } func (c *Collection) ListVolumeServers() (nodes []*DataNode) { - for _, vl := range c.replicaType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout { if vl != nil { if list := vl.ListVolumeServers(); list != nil { nodes = append(nodes, list...) diff --git a/go/topology/data_node.go b/go/topology/data_node.go index ae80e08bb..c67c5c1c1 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -38,15 +38,16 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { } } -func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) { +func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) { actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } - for vid, _ := range dn.volumes { + for vid, v := range dn.volumes { if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) delete(dn.volumes, vid) + deletedVolumes = append(deletedVolumes, v) dn.UpAdjustVolumeCountDelta(-1) dn.UpAdjustActiveVolumeCountDelta(-1) } @@ -54,6 +55,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) { for _, v := range actualVolumes { dn.AddOrUpdateVolume(v) } + return } func (dn *DataNode) GetDataCenter() *DataCenter { diff --git a/go/topology/topology.go b/go/topology/topology.go index f1daffb53..acdef5e36 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -110,12 +110,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId { } func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool { - vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement) + vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option) + vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } @@ -123,12 +123,12 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { _, ok := t.collectionMap[collectionName] if !ok { t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit) } - return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp) + return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl) } func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) { @@ -141,10 +141,14 @@ func (t *Topology) DeleteCollection(collectionName string) { } func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn) + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn) +} +func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { + glog.Infof("removing volume info:%+v", v) + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn) } -func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) { +func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { t.Sequence.SetMax(*joinMessage.MaxFileKey) dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack) dc := t.GetOrCreateDataCenter(dcName) @@ -162,10 +166,13 @@ func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) { glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) } } - dn.UpdateVolumes(volumeInfos) + deletedVolumes := dn.UpdateVolumes(volumeInfos) for _, v := range volumeInfos { t.RegisterVolumeLayout(v, dn) } + for _, v := range deletedVolumes { + t.UnRegisterVolumeLayout(v, dn) + } } func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 7398ff9bf..1e630e149 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -41,7 +41,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { }() } func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement) + vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl) if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } @@ -55,7 +55,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.volumes { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) vl.SetVolumeUnavailable(dn, v.Id) } dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) @@ -65,7 +65,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go index f66d4c251..d6400c988 100644 --- a/go/topology/topology_map.go +++ b/go/topology/topology_map.go @@ -14,7 +14,7 @@ func (t *Topology) ToMap() interface{} { m["DataCenters"] = dcs var layouts []interface{} for _, c := range t.collectionMap { - for _, layout := range c.replicaType2VolumeLayout { + for _, layout := range c.storageType2VolumeLayout { if layout != nil { tmp := layout.ToMap() tmp["collection"] = c.Name diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index a1d6d2564..9eaca37d4 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -80,7 +80,7 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis } func (t *Topology) Vacuum(garbageThreshold string) int { for _, c := range t.collectionMap { - for _, vl := range c.replicaType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout { if vl != nil { for vid, locationlist := range vl.vid2location { if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index 4965e3ba0..778aa038a 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -19,6 +19,7 @@ This package is created to resolve these replica placement issues: type VolumeGrowOption struct { Collection string ReplicaPlacement *storage.ReplicaPlacement + Ttl *storage.TTL DataCenter string Rack string DataNode string @@ -184,8 +185,15 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { for _, server := range servers { - if err := AllocateVolume(server, vid, option.Collection, option.ReplicaPlacement); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Version: storage.CurrentVersion} + if err := AllocateVolume(server, vid, option); err == nil { + vi := storage.VolumeInfo{ + Id: vid, + Size: 0, + Collection: option.Collection, + ReplicaPlacement: option.ReplicaPlacement, + Ttl: option.Ttl, + Version: storage.CurrentVersion, + } server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(vi, server) glog.V(0).Infoln("Created Volume", vid, "on", server) diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 538acb54c..1e55072a3 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -11,15 +11,17 @@ import ( // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *storage.ReplicaPlacement + ttl *storage.TTL vid2location map[storage.VolumeId]*VolumeLocationList writables []storage.VolumeId // transient array of writable volume id volumeSizeLimit uint64 accessLock sync.Mutex } -func NewVolumeLayout(rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { return &VolumeLayout{ rp: rp, + ttl: ttl, vid2location: make(map[storage.VolumeId]*VolumeLocationList), writables: *new([]storage.VolumeId), volumeSizeLimit: volumeSizeLimit, @@ -42,6 +44,14 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } } +func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + vl.removeFromWritable(v.Id) + delete(vl.vid2location, v.Id) +} + func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) { for _, id := range vl.writables { if vid == id { @@ -192,6 +202,7 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { func (vl *VolumeLayout) ToMap() map[string]interface{} { m := make(map[string]interface{}) m["replication"] = vl.rp.String() + m["ttl"] = vl.ttl.String() m["writables"] = vl.writables //m["locations"] = vl.vid2location return m diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go index eab923751..27aebaef0 100644 --- a/go/weed/benchmark.go +++ b/go/weed/benchmark.go @@ -201,7 +201,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) { start := time.Now() fileSize := int64(*b.fileSize + rand.Intn(64)) fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize} - if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil { + if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil { fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection if _, ok := serverLimitChan[fp.Server]; !ok { serverLimitChan[fp.Server] = make(chan bool, 7) diff --git a/go/weed/compact.go b/go/weed/compact.go index 580f3f98d..57a02261f 100644 --- a/go/weed/compact.go +++ b/go/weed/compact.go @@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool { } vid := storage.VolumeId(*compactVolumeId) - v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil) + v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil, nil) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/go/weed/upload.go b/go/weed/upload.go index b59313a2a..c21499dd0 100644 --- a/go/weed/upload.go +++ b/go/weed/upload.go @@ -12,6 +12,7 @@ var ( uploadReplication *string uploadCollection *string uploadDir *string + uploadTtl *string include *string maxMB *int ) @@ -24,6 +25,7 @@ func init() { include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") uploadReplication = cmdUpload.Flag.String("replication", "", "replication type") uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name") + uploadTtl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit") } @@ -67,7 +69,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB) + results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -84,7 +86,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { fmt.Println(e.Error()) } - results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB) + results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) } diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index a547d7462..49e84378c 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -99,14 +99,14 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("parsing upload file...") - fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r) + fname, data, mimeType, isGzipped, lastModified, _, pe := storage.ParseUpload(r) if pe != nil { writeJsonError(w, r, pe) return } debug("assigning file id for", fname) - assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection")) + assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"), r.FormValue("ttl")) if ae != nil { writeJsonError(w, r, ae) return diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index 0f83352a9..f760030f3 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -109,7 +109,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() - assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection) + assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection, query.Get("ttl")) if ae != nil { glog.V(0).Infoln("failing to assign a file id", ae.Error()) writeJsonError(w, r, ae) @@ -131,14 +131,14 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } resp, do_err := util.Do(request) if do_err != nil { - glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) + glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error()) writeJsonError(w, r, do_err) return } defer resp.Body.Close() resp_body, ra_err := ioutil.ReadAll(resp.Body) if ra_err != nil { - glog.V(0).Infoln("failing to upload to volume server", ra_err.Error()) + glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error()) writeJsonError(w, r, ra_err) return } @@ -146,12 +146,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { var ret operation.UploadResult unmarshal_err := json.Unmarshal(resp_body, &ret) if unmarshal_err != nil { - glog.V(0).Infoln("failing to read upload resonse", string(resp_body)) + glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body)) writeJsonError(w, r, unmarshal_err) return } if ret.Error != "" { - glog.V(0).Infoln("failing to post to volume server", ret.Error) + glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) writeJsonError(w, r, errors.New(ret.Error)) return } @@ -169,7 +169,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { glog.V(4).Infoln("saving", path, "=>", assignResult.Fid) if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil { operation.DeleteFile(fs.master, assignResult.Fid) //clean up - glog.V(0).Infoln("failing to write to filer server", db_err.Error()) + glog.V(0).Infoln("failing to write to filer server", r.RequestURI, db_err.Error()) writeJsonError(w, r, db_err) return } diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index d50075fd5..1458bf3e6 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -55,7 +55,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { } } - ms.Topo.RegisterVolumes(joinMessage) + ms.Topo.ProcessJoinMessage(joinMessage) writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024}) } @@ -144,7 +144,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r * } func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool { - vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement) + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } @@ -157,9 +157,14 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if err != nil { return nil, err } + ttl, err := storage.ReadTTL(r.FormValue("ttl")) + if err != nil { + return nil, err + } volumeGrowOption := &topology.VolumeGrowOption{ Collection: r.FormValue("collection"), ReplicaPlacement: replicaPlacement, + Ttl: ttl, DataCenter: r.FormValue("dataCenter"), Rack: r.FormValue("rack"), DataNode: r.FormValue("dataNode"), diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go index 6d285524a..1c01fdfd0 100644 --- a/go/weed/weed_server/volume_server_handlers_admin.go +++ b/go/weed/weed_server/volume_server_handlers_admin.go @@ -16,7 +16,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication")) + err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), r.FormValue("ttl")) if err == nil { writeJsonQuiet(w, r, map[string]string{"error": ""}) } else {