Browse Source

Merge c5269cfcad into 70f6740309

pull/608/merge
Benjamin Roth 8 years ago
committed by GitHub
parent
commit
559444013c
  1. 154
      weed/command/export.go
  2. 13
      weed/server/volume_server_handlers_write.go
  3. 4
      weed/storage/needle.go
  4. 20
      weed/storage/volume.go
  5. 8
      weed/storage/volume_loading.go
  6. 2
      weed/topology/store_replicate.go

154
weed/command/export.go

@ -49,9 +49,11 @@ func init() {
} }
var ( var (
output = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout")
format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename formatted with {{.Mime}} {{.Id}} {{.Name}} {{.Ext}}")
newer = cmdExport.Flag.String("newer", "", "export only files newer than this time, default is all files. Must be specified in RFC3339 without timezone, e.g. 2006-01-02T15:04:05")
output = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout")
format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename formatted with {{.Mime}} {{.Id}} {{.Name}} {{.Ext}}")
newer = cmdExport.Flag.String("newer", "", "export only files newer than this time, default is all files. Must be specified in RFC3339 without timezone, e.g. 2006-01-02T15:04:05")
showDeleted = cmdExport.Flag.Bool("deleted", false, "export deleted files. only applies if -o is not specified")
volumeInfo = cmdExport.Flag.Bool("volumeInfo", false, "show volume info")
tarOutputFile *tar.Writer tarOutputFile *tar.Writer
tarHeader tar.Header tarHeader tar.Header
@ -62,6 +64,24 @@ var (
localLocation, _ = time.LoadLocation("Local") localLocation, _ = time.LoadLocation("Local")
) )
func printNeedle(vid storage.VolumeId, n *storage.Needle, version storage.Version, deleted bool) {
key := storage.NewFileIdFromNeedle(vid, n).String()
size := n.DataSize
if version == storage.Version1 {
size = n.Size
}
fmt.Printf("\"%s\",\"%s\",%d,%t,%s,%s,%s,%t\n",
key,
strings.Replace(string(n.Name), "\"", "\"\"", -1),
size,
n.IsGzipped(),
n.Mime,
n.LastModifiedString(),
n.Ttl.String(),
deleted,
)
}
func runExport(cmd *Command, args []string) bool { func runExport(cmd *Command, args []string) bool {
var err error var err error
@ -78,6 +98,11 @@ func runExport(cmd *Command, args []string) bool {
return false return false
} }
if (*volumeInfo == true) {
showVolumeInfo(*export.dir, *export.collection, *export.volumeId)
return true
}
if *output != "" { if *output != "" {
if *output != "-" && !strings.HasSuffix(*output, ".tar") { if *output != "-" && !strings.HasSuffix(*output, ".tar") {
fmt.Println("the output file", *output, "should be '-' or end with .tar") fmt.Println("the output file", *output, "should be '-' or end with .tar")
@ -103,7 +128,7 @@ func runExport(cmd *Command, args []string) bool {
t := time.Now() t := time.Now()
tarHeader = tar.Header{Mode: 0644, tarHeader = tar.Header{Mode: 0644,
ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(), ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(),
Typeflag: tar.TypeReg,
Typeflag: tar.TypeReg,
AccessTime: t, ChangeTime: t} AccessTime: t, ChangeTime: t}
} }
@ -125,6 +150,10 @@ func runExport(cmd *Command, args []string) bool {
var version storage.Version var version storage.Version
if tarOutputFile == nil {
fmt.Printf("key,name,size,gzip,mime,modified,ttl,deleted\n")
}
err = storage.ScanVolumeFile(*export.dir, *export.collection, vid, err = storage.ScanVolumeFile(*export.dir, *export.collection, vid,
storage.NeedleMapInMemory, storage.NeedleMapInMemory,
func(superBlock storage.SuperBlock) error { func(superBlock storage.SuperBlock) error {
@ -140,9 +169,24 @@ func runExport(cmd *Command, args []string) bool {
n.LastModified, newerThanUnix) n.LastModified, newerThanUnix)
return nil return nil
} }
return walker(vid, n, version)
if tarOutputFile != nil {
return writeFile(vid, n, version)
} else {
printNeedle(vid, n, version, false)
return nil
}
} }
if !ok { if !ok {
if *showDeleted && tarOutputFile == nil {
if (n.DataSize > 0) {
printNeedle(vid, n, version, true)
} else {
n.Name = []byte("*tombstone")
printNeedle(vid, n, version, true)
}
}
glog.V(2).Infof("This seems deleted %d size %d", n.Id, n.Size) glog.V(2).Infof("This seems deleted %d size %d", n.Id, n.Size)
} else { } else {
glog.V(2).Infof("Skipping later-updated Id %d size %d", n.Id, n.Size) glog.V(2).Infof("Skipping later-updated Id %d size %d", n.Id, n.Size)
@ -163,51 +207,71 @@ type nameParams struct {
Ext string Ext string
} }
func walker(vid storage.VolumeId, n *storage.Needle, version storage.Version) (err error) {
func showVolumeInfo(dirname string, collection string, volumeId int) {
vid := storage.VolumeId(volumeId)
var volume *storage.Volume
var err error
if volume, err = storage.LoadVolume(dirname, collection, vid, storage.NeedleMapInMemory); err != nil {
fmt.Println("Failed to load volume: " + err.Error())
return
}
var ttl string
if (volume.Ttl.Minutes() > 0) {
ttl = volume.Ttl.String()
} else {
ttl = "none"
}
fmt.Printf("Collection: %s\n"+
"VolumeId: %v\n"+
"Content-Size: %v\n"+
"Content-Count: %v\n"+
"Deleted-Size: %v\n"+
"Deleted-Count: %v\n"+
"Modification-Time: %s\n"+
"TTL: %s\n",
volume.Collection,
volume.Id,
volume.ContentSize(),
volume.FileCount(),
volume.DeletedSize(),
volume.DeletedCount(),
volume.LastModifiedString(),
ttl,
)
}
func writeFile(vid storage.VolumeId, n *storage.Needle, version storage.Version) (err error) {
key := storage.NewFileIdFromNeedle(vid, n).String() key := storage.NewFileIdFromNeedle(vid, n).String()
if tarOutputFile != nil {
fileNameTemplateBuffer.Reset()
if err = fileNameTemplate.Execute(fileNameTemplateBuffer,
nameParams{
Name: string(n.Name),
Id: n.Id,
Mime: string(n.Mime),
Key: key,
Ext: filepath.Ext(string(n.Name)),
},
); err != nil {
return err
}
fileNameTemplateBuffer.Reset()
if err = fileNameTemplate.Execute(fileNameTemplateBuffer,
nameParams{
Name: string(n.Name),
Id: n.Id,
Mime: string(n.Mime),
Key: key,
Ext: filepath.Ext(string(n.Name)),
},
); err != nil {
return err
}
fileName := fileNameTemplateBuffer.String()
fileName := fileNameTemplateBuffer.String()
if n.IsGzipped() && path.Ext(fileName) != ".gz" {
fileName = fileName + ".gz"
}
if n.IsGzipped() && path.Ext(fileName) != ".gz" {
fileName = fileName + ".gz"
}
tarHeader.Name, tarHeader.Size = fileName, int64(len(n.Data))
if n.HasLastModifiedDate() {
tarHeader.ModTime = time.Unix(int64(n.LastModified), 0)
} else {
tarHeader.ModTime = time.Unix(0, 0)
}
tarHeader.ChangeTime = tarHeader.ModTime
if err = tarOutputFile.WriteHeader(&tarHeader); err != nil {
return err
}
_, err = tarOutputFile.Write(n.Data)
tarHeader.Name, tarHeader.Size = fileName, int64(len(n.Data))
if n.HasLastModifiedDate() {
tarHeader.ModTime = time.Unix(int64(n.LastModified), 0)
} else { } else {
size := n.DataSize
if version == storage.Version1 {
size = n.Size
}
fmt.Printf("key=%s Name=%s Size=%d gzip=%t mime=%s\n",
key,
n.Name,
size,
n.IsGzipped(),
n.Mime,
)
tarHeader.ModTime = time.Unix(0, 0)
}
tarHeader.ChangeTime = tarHeader.ModTime
if err = tarOutputFile.WriteHeader(&tarHeader); err != nil {
return err
} }
_, err = tarOutputFile.Write(n.Data)
return return
} }

13
weed/server/volume_server_handlers_write.go

@ -9,6 +9,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/topology"
"time"
"strconv"
) )
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
@ -87,6 +89,13 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
count = chunkManifest.Size count = chunkManifest.Size
} }
n.LastModified = uint64(time.Now().Unix())
if len(r.FormValue("time")) > 0 {
modifiedTime, err := strconv.ParseInt(r.FormValue("time"), 10, 64)
if err == nil {
n.LastModified = uint64(modifiedTime)
}
}
_, err := topology.ReplicatedDelete(vs.GetMasterNode(), vs.store, volumeId, n, r) _, err := topology.ReplicatedDelete(vs.GetMasterNode(), vs.store, volumeId, n, r)
if err == nil { if err == nil {
@ -103,6 +112,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm() r.ParseForm()
var ret []operation.DeleteResult var ret []operation.DeleteResult
now := uint64(time.Now().Unix())
for _, fid := range r.Form["fid"] { for _, fid := range r.Form["fid"] {
vid, id_cookie, err := operation.ParseFileId(fid) vid, id_cookie, err := operation.ParseFileId(fid)
if err != nil { if err != nil {
@ -144,6 +154,9 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
return return
} }
n.LastModified = now
if size, err := vs.store.Delete(volumeId, n); err != nil { if size, err := vs.store.Delete(volumeId, n); err != nil {
ret = append(ret, operation.DeleteResult{ ret = append(ret, operation.DeleteResult{
Fid: fid, Fid: fid,

4
weed/storage/needle.go

@ -262,3 +262,7 @@ func ParseKeyHash(key_hash_string string) (uint64, uint32, error) {
} }
return key, uint32(hash), nil return key, uint32(hash), nil
} }
func (n *Needle) LastModifiedString() string {
return time.Unix(int64(n.LastModified), 0).Format("2006-01-02T15:04:05")
}

20
weed/storage/volume.go

@ -80,6 +80,26 @@ func (v *Volume) ContentSize() uint64 {
return v.nm.ContentSize() return v.nm.ContentSize()
} }
func (v *Volume) DeletedSize() uint64 {
return v.nm.DeletedSize()
}
func (v *Volume) FileCount() int {
return v.nm.FileCount()
}
func (v *Volume) DeletedCount() int {
return v.nm.DeletedCount()
}
func (v *Volume) LastModifiedTime() uint64 {
return v.lastModifiedTime
}
func (v *Volume) LastModifiedString() string {
return time.Unix(int64(v.lastModifiedTime), 0).Format("2006-01-02T15:04:05")
}
// volume is expired if modified time + volume ttl < now // volume is expired if modified time + volume ttl < now
// except when volume is empty // except when volume is empty
// or when the volume does not have a ttl // or when the volume does not have a ttl

8
weed/storage/volume_loading.go

@ -8,6 +8,14 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
) )
func LoadVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{}
v.needleMapKind = needleMapKind
e = v.load(true, false, needleMapKind, 0)
return
}
func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) { func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id} v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{} v.SuperBlock = SuperBlock{}

2
weed/topology/store_replicate.go

@ -102,7 +102,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store,
if needToReplicate { //send to other replica locations if needToReplicate { //send to other replica locations
if r.FormValue("type") != "replicate" { if r.FormValue("type") != "replicate" {
if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error { if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error {
return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
return util.Delete(fmt.Sprintf("http://%s%s?type=replicate&time=%d", location.Url, r.URL.Path, n.LastModified), jwt)
}); err != nil { }); err != nil {
ret = 0 ret = 0
} }

Loading…
Cancel
Save