From e5506152c0a27d38fa334b2e338d82ee02669ab9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 18 Apr 2019 21:43:36 -0700 Subject: [PATCH] refactoring --- .../change_superblock/change_superblock.go | 3 +- unmaintained/fix_dat/fix_dat.go | 11 ++-- unmaintained/see_dat/see_dat.go | 9 ++- unmaintained/volume_tailer/volume_tailer.go | 9 +-- weed/command/backup.go | 6 +- weed/command/compact.go | 3 +- weed/command/export.go | 22 ++++---- weed/command/fix.go | 7 ++- weed/operation/chunked_file.go | 5 +- .../tail_volume.go} | 14 ++--- weed/operation/upload_content.go | 3 +- weed/server/common.go | 7 ++- weed/server/master_grpc_server_volume.go | 5 +- weed/server/master_server_handlers.go | 4 +- weed/server/master_server_handlers_admin.go | 12 ++-- weed/server/volume_grpc_admin.go | 10 ++-- weed/server/volume_grpc_batch_delete.go | 6 +- weed/server/volume_grpc_copy.go | 13 +++-- weed/server/volume_grpc_copy_incremental.go | 6 +- weed/server/volume_grpc_tail.go | 3 +- weed/server/volume_grpc_vacuum.go | 10 ++-- weed/server/volume_server_handlers_read.go | 10 ++-- weed/server/volume_server_handlers_write.go | 10 ++-- weed/storage/disk_location.go | 21 +++---- weed/storage/{ => needle}/crc.go | 2 +- weed/storage/{ => needle}/file_id.go | 3 +- weed/storage/{ => needle}/needle.go | 6 +- .../{ => needle}/needle_parse_multipart.go | 11 ++-- .../storage/{ => needle}/needle_read_write.go | 10 +++- .../{ => needle}/needle_read_write_test.go | 5 +- weed/storage/{ => needle}/needle_test.go | 5 +- weed/storage/{ => needle}/volume_id.go | 2 +- weed/storage/{ => needle}/volume_ttl.go | 2 +- weed/storage/{ => needle}/volume_ttl_test.go | 2 +- weed/storage/{ => needle}/volume_version.go | 2 +- weed/storage/needle_byte_cache.go | 11 ---- weed/storage/needle_map.go | 4 +- .../{needle => needle_map}/btree_map.go | 2 +- .../{needle => needle_map}/compact_map.go | 2 +- .../compact_map_perf_test.go | 2 +- .../compact_map_test.go | 2 +- .../{needle => needle_map}/needle_value.go | 2 +- .../needle_value_map.go | 2 +- weed/storage/needle_map_leveldb.go | 6 +- weed/storage/needle_map_memory.go | 10 ++-- weed/storage/store.go | 41 +++++++------- weed/storage/store_vacuum.go | 10 ++-- weed/storage/volume.go | 9 ++- weed/storage/volume_backup.go | 10 ++-- weed/storage/volume_checking.go | 9 +-- weed/storage/volume_info.go | 13 +++-- weed/storage/volume_info_test.go | 8 ++- weed/storage/volume_loading.go | 6 +- weed/storage/volume_read_write.go | 31 ++++++----- weed/storage/volume_super_block.go | 15 ++--- weed/storage/volume_super_block_test.go | 8 ++- weed/storage/volume_vacuum.go | 11 ++-- weed/storage/volume_vacuum_test.go | 18 +++--- weed/topology/allocate_volume.go | 5 +- weed/topology/cluster_commands.go | 6 +- weed/topology/collection.go | 5 +- weed/topology/data_node.go | 11 ++-- weed/topology/node.go | 14 ++--- weed/topology/store_replicate.go | 25 +++++---- weed/topology/topology.go | 9 +-- weed/topology/topology_test.go | 8 ++- weed/topology/topology_vacuum.go | 19 ++++--- weed/topology/volume_growth.go | 10 ++-- weed/topology/volume_growth_test.go | 5 +- weed/topology/volume_layout.go | 41 +++++++------- weed/topology/volume_location_list.go | 4 +- .../compress.go => util/compression.go} | 55 +++++++++---------- 72 files changed, 382 insertions(+), 326 deletions(-) rename weed/{storage/tail_volume.go.go => operation/tail_volume.go} (75%) rename weed/storage/{ => needle}/crc.go (97%) rename weed/storage/{ => needle}/file_id.go (98%) rename weed/storage/{ => needle}/needle.go (99%) rename weed/storage/{ => needle}/needle_parse_multipart.go (89%) rename weed/storage/{ => needle}/needle_read_write.go (98%) rename weed/storage/{ => needle}/needle_read_write_test.go (99%) rename weed/storage/{ => needle}/needle_test.go (98%) rename weed/storage/{ => needle}/volume_id.go (95%) rename weed/storage/{ => needle}/volume_ttl.go (99%) rename weed/storage/{ => needle}/volume_ttl_test.go (98%) rename weed/storage/{ => needle}/volume_version.go (90%) delete mode 100644 weed/storage/needle_byte_cache.go rename weed/storage/{needle => needle_map}/btree_map.go (98%) rename weed/storage/{needle => needle_map}/compact_map.go (99%) rename weed/storage/{needle => needle_map}/compact_map_perf_test.go (99%) rename weed/storage/{needle => needle_map}/compact_map_test.go (99%) rename weed/storage/{needle => needle_map}/needle_value.go (95%) rename weed/storage/{needle => needle_map}/needle_value_map.go (93%) rename weed/{operation/compress.go => util/compression.go} (92%) diff --git a/unmaintained/change_superblock/change_superblock.go b/unmaintained/change_superblock/change_superblock.go index 779580a9b..07d9b94e4 100644 --- a/unmaintained/change_superblock/change_superblock.go +++ b/unmaintained/change_superblock/change_superblock.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) var ( @@ -73,7 +74,7 @@ func main() { } if *targetTTL != "" { - ttl, err := storage.ReadTTL(*targetTTL) + ttl, err := needle.ReadTTL(*targetTTL) if err != nil { glog.Fatalf("cannot parse target ttl %s: %v", *targetTTL, err) diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go index 856dbc877..f8a535aaf 100644 --- a/unmaintained/fix_dat/fix_dat.go +++ b/unmaintained/fix_dat/fix_dat.go @@ -10,6 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -61,7 +62,7 @@ func main() { } newDatFile.Write(superBlock.Bytes()) - iterateEntries(datFile, indexFile, func(n *storage.Needle, offset int64) { + iterateEntries(datFile, indexFile, func(n *needle.Needle, offset int64) { fmt.Printf("needle id=%v name=%s size=%d dataSize=%d\n", n.Id, string(n.Name), n.Size, n.DataSize) _, s, _, e := n.Append(newDatFile, superBlock.Version()) fmt.Printf("size %d error %v\n", s, e) @@ -69,7 +70,7 @@ func main() { } -func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needle, offset int64)) { +func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *needle.Needle, offset int64)) { // start to read index file var readerOffset int64 bytes := make([]byte, 16) @@ -84,7 +85,7 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl } offset := int64(superBlock.BlockSize()) version := superBlock.Version() - n, _, rest, err := storage.ReadNeedleHeader(datFile, version, offset) + n, _, rest, err := needle.ReadNeedleHeader(datFile, version, offset) if err != nil { fmt.Printf("cannot read needle header: %v", err) return @@ -106,7 +107,7 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl fmt.Printf("key: %d offsetFromIndex %d n.Size %d sizeFromIndex:%d\n", key, offsetFromIndex, n.Size, sizeFromIndex) - rest = storage.NeedleBodyLength(sizeFromIndex, version) + rest = needle.NeedleBodyLength(sizeFromIndex, version) func() { defer func() { @@ -126,7 +127,7 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl offset += types.NeedleEntrySize + rest //fmt.Printf("==> new entry offset %d\n", offset) - if n, _, rest, err = storage.ReadNeedleHeader(datFile, version, offset); err != nil { + if n, _, rest, err = needle.ReadNeedleHeader(datFile, version, offset); err != nil { if err == io.EOF { return } diff --git a/unmaintained/see_dat/see_dat.go b/unmaintained/see_dat/see_dat.go index 28d6447d6..e8e54fd4f 100644 --- a/unmaintained/see_dat/see_dat.go +++ b/unmaintained/see_dat/see_dat.go @@ -2,8 +2,11 @@ package main import ( "flag" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "time" ) @@ -14,7 +17,7 @@ var ( ) type VolumeFileScanner4SeeDat struct { - version storage.Version + version needle.Version } func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock storage.SuperBlock) error { @@ -26,7 +29,7 @@ func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool { return true } -func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *storage.Needle, offset int64) error { +func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64) error { t := time.Unix(int64(n.AppendAtNs)/int64(time.Second), int64(n.AppendAtNs)%int64(time.Second)) glog.V(0).Infof("%d,%s%x offset %d size %d cookie %x appendedAt %v", *volumeId, n.Id, n.Cookie, offset, n.Size, n.Cookie, t) return nil @@ -35,7 +38,7 @@ func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *storage.Needle, offset i func main() { flag.Parse() - vid := storage.VolumeId(*volumeId) + vid := needle.VolumeId(*volumeId) scanner := &VolumeFileScanner4SeeDat{} err := storage.ScanVolumeFile(*volumePath, *volumeCollection, vid, storage.NeedleMapInMemory, scanner) diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go index b234f5c4d..4cee4e18b 100644 --- a/unmaintained/volume_tailer/volume_tailer.go +++ b/unmaintained/volume_tailer/volume_tailer.go @@ -8,7 +8,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" weed_server "github.com/chrislusf/seaweedfs/weed/server" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + util2 "github.com/chrislusf/seaweedfs/weed/util" "github.com/spf13/viper" "golang.org/x/tools/godoc/util" ) @@ -27,7 +28,7 @@ func main() { weed_server.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") - vid := storage.VolumeId(*volumeId) + vid := needle.VolumeId(*volumeId) var sinceTimeNs int64 if *rewindDuration == 0 { @@ -38,7 +39,7 @@ func main() { sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano() } - err := storage.TailVolume(*master, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *storage.Needle) (err error) { + err := operation.TailVolume(*master, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) { if n.Size == 0 { println("-", n.String()) return nil @@ -50,7 +51,7 @@ func main() { data := n.Data if n.IsGzipped() { - if data, err = operation.UnGzipData(data); err != nil { + if data, err = util2.UnGzipData(data); err != nil { return err } } diff --git a/weed/command/backup.go b/weed/command/backup.go index 7a98f60d9..e2b0da7dd 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -2,8 +2,10 @@ package command import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/spf13/viper" "github.com/chrislusf/seaweedfs/weed/operation" @@ -56,7 +58,7 @@ func runBackup(cmd *Command, args []string) bool { if *s.volumeId == -1 { return false } - vid := storage.VolumeId(*s.volumeId) + vid := needle.VolumeId(*s.volumeId) // find volume location, replication, ttl info lookup, err := operation.Lookup(*s.master, vid.String()) @@ -71,7 +73,7 @@ func runBackup(cmd *Command, args []string) bool { fmt.Printf("Error get volume %d status: %v\n", vid, err) return true } - ttl, err := storage.ReadTTL(stats.Ttl) + ttl, err := needle.ReadTTL(stats.Ttl) if err != nil { fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err) return true diff --git a/weed/command/compact.go b/weed/command/compact.go index 0dd4efe0e..3ac09259e 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -3,6 +3,7 @@ package command import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func init() { @@ -35,7 +36,7 @@ func runCompact(cmd *Command, args []string) bool { preallocate := *compactVolumePreallocate * (1 << 20) - vid := storage.VolumeId(*compactVolumeId) + vid := needle.VolumeId(*compactVolumeId) v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate) if err != nil { diff --git a/weed/command/export.go b/weed/command/export.go index 47abc2929..7e94ec11c 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -12,10 +12,12 @@ import ( "text/template" "time" + "io" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" - "io" ) const ( @@ -66,10 +68,10 @@ var ( localLocation, _ = time.LoadLocation("Local") ) -func printNeedle(vid storage.VolumeId, n *storage.Needle, version storage.Version, deleted bool) { - key := storage.NewFileIdFromNeedle(vid, n).String() +func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version, deleted bool) { + key := needle.NewFileIdFromNeedle(vid, n).String() size := n.DataSize - if version == storage.Version1 { + if version == needle.Version1 { size = n.Size } fmt.Printf("%s\t%s\t%d\t%t\t%s\t%s\t%s\t%t\n", @@ -85,10 +87,10 @@ func printNeedle(vid storage.VolumeId, n *storage.Needle, version storage.Versio } type VolumeFileScanner4Export struct { - version storage.Version + version needle.Version counter int needleMap *storage.NeedleMap - vid storage.VolumeId + vid needle.VolumeId } func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock storage.SuperBlock) error { @@ -100,7 +102,7 @@ func (scanner *VolumeFileScanner4Export) ReadNeedleBody() bool { return true } -func (scanner *VolumeFileScanner4Export) VisitNeedle(n *storage.Needle, offset int64) error { +func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset int64) error { needleMap := scanner.needleMap vid := scanner.vid @@ -189,7 +191,7 @@ func runExport(cmd *Command, args []string) bool { if *export.collection != "" { fileName = *export.collection + "_" + fileName } - vid := storage.VolumeId(*export.volumeId) + vid := needle.VolumeId(*export.volumeId) indexFile, err := os.OpenFile(path.Join(*export.dir, fileName+".idx"), os.O_RDONLY, 0644) if err != nil { glog.Fatalf("Create Volume Index [ERROR] %s\n", err) @@ -225,8 +227,8 @@ type nameParams struct { Ext string } -func writeFile(vid storage.VolumeId, n *storage.Needle) (err error) { - key := storage.NewFileIdFromNeedle(vid, n).String() +func writeFile(vid needle.VolumeId, n *needle.Needle) (err error) { + key := needle.NewFileIdFromNeedle(vid, n).String() fileNameTemplateBuffer.Reset() if err = fileNameTemplate.Execute(fileNameTemplateBuffer, nameParams{ diff --git a/weed/command/fix.go b/weed/command/fix.go index 2536d774f..bf33490cc 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -29,7 +30,7 @@ var ( ) type VolumeFileScanner4Fix struct { - version storage.Version + version needle.Version nm *storage.NeedleMap } @@ -42,7 +43,7 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool { return false } -func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *storage.Needle, offset int64) error { +func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64) error { glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped()) if n.Size > 0 && n.Size != types.TombstoneFileSize { pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size) @@ -74,7 +75,7 @@ func runFix(cmd *Command, args []string) bool { nm := storage.NewBtreeNeedleMap(indexFile) defer nm.Close() - vid := storage.VolumeId(*fixVolumeId) + vid := needle.VolumeId(*fixVolumeId) scanner := &VolumeFileScanner4Fix{ nm: nm, } diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 995f06b53..295204dd8 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -4,12 +4,13 @@ import ( "encoding/json" "errors" "fmt" - "google.golang.org/grpc" "io" "io/ioutil" "net/http" "sort" + "google.golang.org/grpc" + "sync" "github.com/chrislusf/seaweedfs/weed/glog" @@ -55,7 +56,7 @@ func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) { if isGzipped { var err error - if buffer, err = UnGzipData(buffer); err != nil { + if buffer, err = util.UnGzipData(buffer); err != nil { return nil, err } } diff --git a/weed/storage/tail_volume.go.go b/weed/operation/tail_volume.go similarity index 75% rename from weed/storage/tail_volume.go.go rename to weed/operation/tail_volume.go index 31ad058b1..0c4f96654 100644 --- a/weed/storage/tail_volume.go.go +++ b/weed/operation/tail_volume.go @@ -1,18 +1,18 @@ -package storage +package operation import ( "context" "fmt" "io" - "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "google.golang.org/grpc" ) -func TailVolume(master string, grpcDialOption grpc.DialOption, vid VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *Needle) error) error { +func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error { // find volume location, replication, ttl info - lookup, err := operation.Lookup(master, vid.String()) + lookup, err := Lookup(master, vid.String()) if err != nil { return fmt.Errorf("look up volume %d: %v", vid, err) } @@ -22,7 +22,7 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid VolumeId, sin volumeServer := lookup.Locations[0].Url - return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{ VolumeId: uint32(vid), @@ -62,9 +62,9 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid VolumeId, sin needleBody = append(needleBody, resp.NeedleBody...) } - n := new(Needle) + n := new(needle.Needle) n.ParseNeedleHeader(needleHeader) - n.ReadNeedleBodyBytes(needleBody, CurrentVersion) + n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion) err = fn(n) diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 4417a0e70..c387d0230 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -18,6 +18,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" ) type UploadResult struct { @@ -59,7 +60,7 @@ func doUpload(uploadUrl string, filename string, reader io.Reader, isGzipped boo contentIsGzipped := isGzipped shouldGzipNow := false if !isGzipped { - if shouldBeZipped, iAmSure := IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped { + if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped { shouldGzipNow = true contentIsGzipped = true } diff --git a/weed/server/common.go b/weed/server/common.go index 1c75d44cf..e02ab38a6 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -5,17 +5,18 @@ import ( "encoding/json" "errors" "fmt" - "google.golang.org/grpc" "net/http" "path/filepath" "strconv" "strings" "time" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" _ "github.com/chrislusf/seaweedfs/weed/statik" @@ -90,7 +91,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("parsing upload file...") - fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := storage.ParseUpload(r) + fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.ParseUpload(r) if pe != nil { writeJsonError(w, r, http.StatusBadRequest, pe) return diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 7b8efb933..2265cee3b 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" ) @@ -55,7 +56,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest if err != nil { return nil, err } - ttl, err := storage.ReadTTL(req.Ttl) + ttl, err := needle.ReadTTL(req.Ttl) if err != nil { return nil, err } @@ -110,7 +111,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic if err != nil { return nil, err } - ttl, err := storage.ReadTTL(req.Ttl) + ttl, err := needle.ReadTTL(req.Ttl) if err != nil { return nil, err } diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 5bdb448c1..60b593013 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -9,7 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) { @@ -22,7 +22,7 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume if _, ok := volumeLocations[vid]; ok { continue } - volumeId, err := storage.NewVolumeId(vid) + volumeId, err := needle.NewVolumeId(vid) if err == nil { machines := ms.Topo.Lookup(collection, volumeId) if machines != nil { diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 4f0195084..244098515 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -4,15 +4,17 @@ import ( "context" "errors" "fmt" + "math/rand" + "net/http" + "strconv" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" - "math/rand" - "net/http" - "strconv" ) func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { @@ -93,7 +95,7 @@ func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Reque func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) { vid, _, _, _, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) + volumeId, err := needle.NewVolumeId(vid) if err != nil { debug("parsing error:", err, r.URL.Path) return @@ -146,7 +148,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if err != nil { return nil, err } - ttl, err := storage.ReadTTL(r.FormValue("ttl")) + ttl, err := needle.ReadTTL(r.FormValue("ttl")) if err != nil { return nil, err } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index c32f8a086..d9244aa64 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -5,7 +5,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) { @@ -29,7 +29,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p resp := &volume_server_pb.AllocateVolumeResponse{} err := vs.store.AddVolume( - storage.VolumeId(req.VolumeId), + needle.VolumeId(req.VolumeId), req.Collection, vs.needleMapKind, req.Replication, @@ -51,7 +51,7 @@ func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.V resp := &volume_server_pb.VolumeMountResponse{} - err := vs.store.MountVolume(storage.VolumeId(req.VolumeId)) + err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("volume mount %v: %v", req, err) @@ -67,7 +67,7 @@ func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb resp := &volume_server_pb.VolumeUnmountResponse{} - err := vs.store.UnmountVolume(storage.VolumeId(req.VolumeId)) + err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("volume unmount %v: %v", req, err) @@ -83,7 +83,7 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb. resp := &volume_server_pb.VolumeDeleteResponse{} - err := vs.store.DeleteVolume(storage.VolumeId(req.VolumeId)) + err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("volume delete %v: %v", req, err) diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index 3554d97ae..d7fbb6edf 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -7,7 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) { @@ -26,8 +26,8 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B continue } - n := new(storage.Needle) - volumeId, _ := storage.NewVolumeId(vid) + n := new(needle.Needle) + volumeId, _ := needle.NewVolumeId(vid) n.ParsePath(id_cookie) cookie := n.Cookie diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 2bac7d2ee..b7789f88d 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -3,10 +3,13 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "io" "os" ) @@ -14,10 +17,10 @@ import ( // VolumeCopy copy the .idx .dat files, and mount the volume func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) { - v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v != nil { // unmount the volume - err := vs.store.UnmountVolume(storage.VolumeId(req.VolumeId)) + err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)) if err != nil { return nil, fmt.Errorf("failed to unmount volume %d: %v", req.VolumeId, err) } @@ -88,7 +91,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo } // mount the volume - err = vs.store.MountVolume(storage.VolumeId(req.VolumeId)) + err = vs.store.MountVolume(needle.VolumeId(req.VolumeId)) if err != nil { return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err) } @@ -144,7 +147,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { resp := &volume_server_pb.ReadVolumeFileStatusResponse{} - v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return nil, fmt.Errorf("not found volume id %d", req.VolumeId) } @@ -160,7 +163,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error { - v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return fmt.Errorf("not found volume id %d", req.VolumeId) } diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go index 06e7017e8..5977c44f6 100644 --- a/weed/server/volume_grpc_copy_incremental.go +++ b/weed/server/volume_grpc_copy_incremental.go @@ -7,12 +7,12 @@ import ( "os" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrementalCopyRequest, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error { - v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return fmt.Errorf("not found volume id %d", req.VolumeId) } @@ -36,7 +36,7 @@ func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrem func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) { - v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return nil, fmt.Errorf("not found volume id %d", req.VolumeId) } diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index da248498f..87db6e146 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -7,11 +7,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error { - v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return fmt.Errorf("not found volume id %d", req.VolumeId) } diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go index d31b8f8e7..4aa6588cb 100644 --- a/weed/server/volume_grpc_vacuum.go +++ b/weed/server/volume_grpc_vacuum.go @@ -5,14 +5,14 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) { resp := &volume_server_pb.VacuumVolumeCheckResponse{} - garbageRatio, err := vs.store.CheckCompactVolume(storage.VolumeId(req.VolumeId)) + garbageRatio, err := vs.store.CheckCompactVolume(needle.VolumeId(req.VolumeId)) resp.GarbageRatio = garbageRatio @@ -28,7 +28,7 @@ func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_ser resp := &volume_server_pb.VacuumVolumeCompactResponse{} - err := vs.store.CompactVolume(storage.VolumeId(req.VolumeId), req.Preallocate) + err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate) if err != nil { glog.Errorf("compact volume %d: %v", req.VolumeId, err) @@ -44,7 +44,7 @@ func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_serv resp := &volume_server_pb.VacuumVolumeCommitResponse{} - err := vs.store.CommitCompactVolume(storage.VolumeId(req.VolumeId)) + err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("commit volume %d: %v", req.VolumeId, err) @@ -60,7 +60,7 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser resp := &volume_server_pb.VacuumVolumeCleanupResponse{} - err := vs.store.CommitCleanupVolume(storage.VolumeId(req.VolumeId)) + err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("cleanup volume %d: %v", req.VolumeId, err) diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 92c728141..816afcb8b 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -17,16 +17,16 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) + n := new(needle.Needle) vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) + volumeId, err := needle.NewVolumeId(vid) if err != nil { glog.V(2).Infoln("parsing error:", err, r.URL.Path) w.WriteHeader(http.StatusBadRequest) @@ -132,7 +132,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { w.Header().Set("Content-Encoding", "gzip") } else { - if n.Data, err = operation.UnGzipData(n.Data); err != nil { + if n.Data, err = util.UnGzipData(n.Data); err != nil { glog.V(0).Infoln("ungzip error:", err, r.URL.Path) } } @@ -146,7 +146,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } -func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) { +func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) { if !n.IsChunkedManifest() || r.URL.Query().Get("cm") == "false" { return false } diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 9fb252eb7..45c868c33 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -10,7 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" ) @@ -22,7 +22,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } vid, fid, _, _, _ := parseURLPath(r.URL.Path) - volumeId, ve := storage.NewVolumeId(vid) + volumeId, ve := needle.NewVolumeId(vid) if ve != nil { glog.V(0).Infoln("NewVolumeId error:", ve) writeJsonError(w, r, http.StatusBadRequest, ve) @@ -34,7 +34,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - needle, originalSize, ne := storage.CreateNeedleFromRequest(r, vs.FixJpgOrientation) + needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation) if ne != nil { writeJsonError(w, r, http.StatusBadRequest, ne) return @@ -57,9 +57,9 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) + n := new(needle.Needle) vid, fid, _, _, _ := parseURLPath(r.URL.Path) - volumeId, _ := storage.NewVolumeId(vid) + volumeId, _ := needle.NewVolumeId(vid) n.ParsePath(fid) if !vs.maybeCheckJwtAuthorization(r, vid, fid) { diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index cd81b2210..680cc6097 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -9,22 +9,23 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type DiskLocation struct { Directory string MaxVolumeCount int - volumes map[VolumeId]*Volume + volumes map[needle.VolumeId]*Volume sync.RWMutex } func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount} - location.volumes = make(map[VolumeId]*Volume) + location.volumes = make(map[needle.VolumeId]*Volume) return location } -func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (VolumeId, string, error) { +func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (needle.VolumeId, string, error) { name := dir.Name() if !dir.IsDir() && strings.HasSuffix(name, ".dat") { collection := "" @@ -33,7 +34,7 @@ func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (VolumeId, string, erro if i > 0 { collection, base = base[0:i], base[i+1:] } - vol, err := NewVolumeId(base) + vol, err := needle.NewVolumeId(base) return vol, collection, err } @@ -114,7 +115,7 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er return } -func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) { +func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) { v, ok := l.volumes[vid] if !ok { return @@ -127,7 +128,7 @@ func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) { return } -func (l *DiskLocation) LoadVolume(vid VolumeId, needleMapKind NeedleMapType) bool { +func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool { if dirs, err := ioutil.ReadDir(l.Directory); err == nil { for _, dir := range dirs { volId, _, err := l.volumeIdFromPath(dir) @@ -142,7 +143,7 @@ func (l *DiskLocation) LoadVolume(vid VolumeId, needleMapKind NeedleMapType) boo return false } -func (l *DiskLocation) DeleteVolume(vid VolumeId) error { +func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error { l.Lock() defer l.Unlock() @@ -153,7 +154,7 @@ func (l *DiskLocation) DeleteVolume(vid VolumeId) error { return l.deleteVolumeById(vid) } -func (l *DiskLocation) UnloadVolume(vid VolumeId) error { +func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error { l.Lock() defer l.Unlock() @@ -166,14 +167,14 @@ func (l *DiskLocation) UnloadVolume(vid VolumeId) error { return nil } -func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) { +func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) { l.Lock() defer l.Unlock() l.volumes[vid] = volume } -func (l *DiskLocation) FindVolume(vid VolumeId) (*Volume, bool) { +func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) { l.RLock() defer l.RUnlock() diff --git a/weed/storage/crc.go b/weed/storage/needle/crc.go similarity index 97% rename from weed/storage/crc.go rename to weed/storage/needle/crc.go index e49686dc8..00ea1db69 100644 --- a/weed/storage/crc.go +++ b/weed/storage/needle/crc.go @@ -1,4 +1,4 @@ -package storage +package needle import ( "crypto/md5" diff --git a/weed/storage/file_id.go b/weed/storage/needle/file_id.go similarity index 98% rename from weed/storage/file_id.go rename to weed/storage/needle/file_id.go index 37dcb7c70..906877b8c 100644 --- a/weed/storage/file_id.go +++ b/weed/storage/needle/file_id.go @@ -1,7 +1,8 @@ -package storage +package needle import ( "encoding/hex" + . "github.com/chrislusf/seaweedfs/weed/storage/types" ) diff --git a/weed/storage/needle.go b/weed/storage/needle/needle.go similarity index 99% rename from weed/storage/needle.go rename to weed/storage/needle/needle.go index 5bd6f7d96..c224d2767 100644 --- a/weed/storage/needle.go +++ b/weed/storage/needle/needle.go @@ -1,4 +1,4 @@ -package storage +package needle import ( "encoding/json" @@ -8,9 +8,10 @@ import ( "strings" "time" + "io/ioutil" + "github.com/chrislusf/seaweedfs/weed/images" . "github.com/chrislusf/seaweedfs/weed/storage/types" - "io/ioutil" ) const ( @@ -187,3 +188,4 @@ func ParseNeedleIdCookie(key_hash_string string) (NeedleId, Cookie, error) { func (n *Needle) LastModifiedString() string { return time.Unix(int64(n.LastModified), 0).Format("2006-01-02T15:04:05") } + diff --git a/weed/storage/needle_parse_multipart.go b/weed/storage/needle/needle_parse_multipart.go similarity index 89% rename from weed/storage/needle_parse_multipart.go rename to weed/storage/needle/needle_parse_multipart.go index 93b4c2dce..8be1a1da4 100644 --- a/weed/storage/needle_parse_multipart.go +++ b/weed/storage/needle/needle_parse_multipart.go @@ -1,8 +1,9 @@ -package storage +package needle import ( "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/util" + "io" "io/ioutil" "mime" @@ -90,12 +91,12 @@ func parseMultipart(r *http.Request) ( } if part.Header.Get("Content-Encoding") == "gzip" { - if unzipped, e := operation.UnGzipData(data); e == nil { + if unzipped, e := util.UnGzipData(data); e == nil { originalDataSize = len(unzipped) } isGzipped = true - } else if operation.IsGzippable(ext, mtype, data) { - if compressedData, err := operation.GzipData(data); err == nil { + } else if util.IsGzippable(ext, mtype, data) { + if compressedData, err := util.GzipData(data); err == nil { if len(data) > len(compressedData) { data = compressedData isGzipped = true diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle/needle_read_write.go similarity index 98% rename from weed/storage/needle_read_write.go rename to weed/storage/needle/needle_read_write.go index 1a2c99450..9876b2f1c 100644 --- a/weed/storage/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -1,4 +1,4 @@ -package storage +package needle import ( "errors" @@ -6,10 +6,11 @@ import ( "io" "os" + "math" + "github.com/chrislusf/seaweedfs/weed/glog" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" - "math" ) const ( @@ -383,3 +384,8 @@ func (n *Needle) HasPairs() bool { func (n *Needle) SetHasPairs() { n.Flags = n.Flags | FlagHasPairs } + +func getActualSize(size uint32, version Version) int64 { + return NeedleEntrySize + NeedleBodyLength(size, version) +} + diff --git a/weed/storage/needle_read_write_test.go b/weed/storage/needle/needle_read_write_test.go similarity index 99% rename from weed/storage/needle_read_write_test.go rename to weed/storage/needle/needle_read_write_test.go index 2532e6a38..4c507f9e6 100644 --- a/weed/storage/needle_read_write_test.go +++ b/weed/storage/needle/needle_read_write_test.go @@ -1,10 +1,11 @@ -package storage +package needle import ( - "github.com/chrislusf/seaweedfs/weed/storage/types" "io/ioutil" "os" "testing" + + "github.com/chrislusf/seaweedfs/weed/storage/types" ) func TestAppend(t *testing.T) { diff --git a/weed/storage/needle_test.go b/weed/storage/needle/needle_test.go similarity index 98% rename from weed/storage/needle_test.go rename to weed/storage/needle/needle_test.go index 65036409c..0f2dde98e 100644 --- a/weed/storage/needle_test.go +++ b/weed/storage/needle/needle_test.go @@ -1,8 +1,9 @@ -package storage +package needle import ( - "github.com/chrislusf/seaweedfs/weed/storage/types" "testing" + + "github.com/chrislusf/seaweedfs/weed/storage/types" ) func TestParseKeyHash(t *testing.T) { diff --git a/weed/storage/volume_id.go b/weed/storage/needle/volume_id.go similarity index 95% rename from weed/storage/volume_id.go rename to weed/storage/needle/volume_id.go index 0333c6cf0..be56e42fa 100644 --- a/weed/storage/volume_id.go +++ b/weed/storage/needle/volume_id.go @@ -1,4 +1,4 @@ -package storage +package needle import ( "strconv" diff --git a/weed/storage/volume_ttl.go b/weed/storage/needle/volume_ttl.go similarity index 99% rename from weed/storage/volume_ttl.go rename to weed/storage/needle/volume_ttl.go index 0989e7a49..0b4c9579b 100644 --- a/weed/storage/volume_ttl.go +++ b/weed/storage/needle/volume_ttl.go @@ -1,4 +1,4 @@ -package storage +package needle import ( "strconv" diff --git a/weed/storage/volume_ttl_test.go b/weed/storage/needle/volume_ttl_test.go similarity index 98% rename from weed/storage/volume_ttl_test.go rename to weed/storage/needle/volume_ttl_test.go index 216469a4c..0afebebf5 100644 --- a/weed/storage/volume_ttl_test.go +++ b/weed/storage/needle/volume_ttl_test.go @@ -1,4 +1,4 @@ -package storage +package needle import ( "testing" diff --git a/weed/storage/volume_version.go b/weed/storage/needle/volume_version.go similarity index 90% rename from weed/storage/volume_version.go rename to weed/storage/needle/volume_version.go index fc0270c03..54daac77f 100644 --- a/weed/storage/volume_version.go +++ b/weed/storage/needle/volume_version.go @@ -1,4 +1,4 @@ -package storage +package needle type Version uint8 diff --git a/weed/storage/needle_byte_cache.go b/weed/storage/needle_byte_cache.go deleted file mode 100644 index 78c1ea862..000000000 --- a/weed/storage/needle_byte_cache.go +++ /dev/null @@ -1,11 +0,0 @@ -package storage - -import ( - "os" -) - -func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, err error) { - dataSlice = make([]byte, readSize) - _, err = r.ReadAt(dataSlice, offset) - return dataSlice, err -} diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 56dbd7c06..a22125682 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -6,7 +6,7 @@ import ( "os" "sync" - "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -22,7 +22,7 @@ const ( type NeedleMapper interface { Put(key NeedleId, offset Offset, size uint32) error - Get(key NeedleId) (element *needle.NeedleValue, ok bool) + Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) Delete(key NeedleId, offset Offset) error Close() Destroy() error diff --git a/weed/storage/needle/btree_map.go b/weed/storage/needle_map/btree_map.go similarity index 98% rename from weed/storage/needle/btree_map.go rename to weed/storage/needle_map/btree_map.go index aed940f0e..74b4952b7 100644 --- a/weed/storage/needle/btree_map.go +++ b/weed/storage/needle_map/btree_map.go @@ -1,4 +1,4 @@ -package needle +package needle_map import ( . "github.com/chrislusf/seaweedfs/weed/storage/types" diff --git a/weed/storage/needle/compact_map.go b/weed/storage/needle_map/compact_map.go similarity index 99% rename from weed/storage/needle/compact_map.go rename to weed/storage/needle_map/compact_map.go index a15035961..193d17e47 100644 --- a/weed/storage/needle/compact_map.go +++ b/weed/storage/needle_map/compact_map.go @@ -1,4 +1,4 @@ -package needle +package needle_map import ( . "github.com/chrislusf/seaweedfs/weed/storage/types" diff --git a/weed/storage/needle/compact_map_perf_test.go b/weed/storage/needle_map/compact_map_perf_test.go similarity index 99% rename from weed/storage/needle/compact_map_perf_test.go rename to weed/storage/needle_map/compact_map_perf_test.go index 3f6fe548b..a447375a4 100644 --- a/weed/storage/needle/compact_map_perf_test.go +++ b/weed/storage/needle_map/compact_map_perf_test.go @@ -1,4 +1,4 @@ -package needle +package needle_map import ( "fmt" diff --git a/weed/storage/needle/compact_map_test.go b/weed/storage/needle_map/compact_map_test.go similarity index 99% rename from weed/storage/needle/compact_map_test.go rename to weed/storage/needle_map/compact_map_test.go index b9586ab54..4894e59ad 100644 --- a/weed/storage/needle/compact_map_test.go +++ b/weed/storage/needle_map/compact_map_test.go @@ -1,4 +1,4 @@ -package needle +package needle_map import ( "fmt" diff --git a/weed/storage/needle/needle_value.go b/weed/storage/needle_map/needle_value.go similarity index 95% rename from weed/storage/needle/needle_value.go rename to weed/storage/needle_map/needle_value.go index 96ee83009..d0c0b9006 100644 --- a/weed/storage/needle/needle_value.go +++ b/weed/storage/needle_map/needle_value.go @@ -1,4 +1,4 @@ -package needle +package needle_map import ( . "github.com/chrislusf/seaweedfs/weed/storage/types" diff --git a/weed/storage/needle/needle_value_map.go b/weed/storage/needle_map/needle_value_map.go similarity index 93% rename from weed/storage/needle/needle_value_map.go rename to weed/storage/needle_map/needle_value_map.go index 9da257443..acf38a4bf 100644 --- a/weed/storage/needle/needle_value_map.go +++ b/weed/storage/needle_map/needle_value_map.go @@ -1,4 +1,4 @@ -package needle +package needle_map import ( . "github.com/chrislusf/seaweedfs/weed/storage/types" diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index 4d5280938..01807595b 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -7,7 +7,7 @@ import ( "path/filepath" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" @@ -74,7 +74,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error { }) } -func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle.NeedleValue, ok bool) { +func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) { bytes := make([]byte, NeedleIdSize) NeedleIdToBytes(bytes[0:NeedleIdSize], key) data, err := m.db.Get(bytes, nil) @@ -83,7 +83,7 @@ func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle.NeedleValue, ok bo } offset := BytesToOffset(data[0:OffsetSize]) size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize]) - return &needle.NeedleValue{Key: NeedleId(key), Offset: offset, Size: size}, true + return &needle_map.NeedleValue{Key: NeedleId(key), Offset: offset, Size: size}, true } func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error { diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index a3b574324..060920cea 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -5,19 +5,19 @@ import ( "os" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) type NeedleMap struct { - m needle.NeedleValueMap + m needle_map.NeedleValueMap baseNeedleMapper } func NewCompactNeedleMap(file *os.File) *NeedleMap { nm := &NeedleMap{ - m: needle.NewCompactMap(), + m: needle_map.NewCompactMap(), } nm.indexFile = file return nm @@ -25,7 +25,7 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap { func NewBtreeNeedleMap(file *os.File) *NeedleMap { nm := &NeedleMap{ - m: needle.NewBtreeMap(), + m: needle_map.NewBtreeMap(), } nm.indexFile = file return nm @@ -106,7 +106,7 @@ func (nm *NeedleMap) Put(key NeedleId, offset Offset, size uint32) error { nm.logPut(key, oldSize, size) return nm.appendToIndexFile(key, offset, size) } -func (nm *NeedleMap) Get(key NeedleId) (element *needle.NeedleValue, ok bool) { +func (nm *NeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) { element, ok = nm.m.Get(NeedleId(key)) return } diff --git a/weed/storage/store.go b/weed/storage/store.go index d866d2e11..3c222c0aa 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -27,8 +28,8 @@ type Store struct { volumeSizeLimit uint64 //read from the master Client master_pb.Seaweed_SendHeartbeatClient NeedleMapType NeedleMapType - NewVolumeIdChan chan VolumeId - DeletedVolumeIdChan chan VolumeId + NewVolumeIdChan chan needle.VolumeId + DeletedVolumeIdChan chan needle.VolumeId } func (s *Store) String() (str string) { @@ -44,16 +45,16 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts location.loadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) } - s.NewVolumeIdChan = make(chan VolumeId, 3) - s.DeletedVolumeIdChan = make(chan VolumeId, 3) + s.NewVolumeIdChan = make(chan needle.VolumeId, 3) + s.DeletedVolumeIdChan = make(chan needle.VolumeId, 3) return } -func (s *Store) AddVolume(volumeId VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error { +func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e } - ttl, e := ReadTTL(ttlString) + ttl, e := needle.ReadTTL(ttlString) if e != nil { return e } @@ -71,7 +72,7 @@ func (s *Store) DeleteCollection(collection string) (e error) { return } -func (s *Store) findVolume(vid VolumeId) *Volume { +func (s *Store) findVolume(vid needle.VolumeId) *Volume { for _, location := range s.Locations { if v, found := location.FindVolume(vid); found { return v @@ -90,7 +91,7 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL, preallocate int64) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } @@ -114,7 +115,7 @@ func (s *Store) Status() []*VolumeInfo { location.RLock() for k, v := range location.volumes { s := &VolumeInfo{ - Id: VolumeId(k), + Id: needle.VolumeId(k), Size: v.ContentSize(), Collection: v.Collection, ReplicaPlacement: v.ReplicaPlacement, @@ -184,7 +185,7 @@ func (s *Store) Close() { } } -func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { +func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, err error) { if v := s.findVolume(i); v != nil { if v.readOnly { err = fmt.Errorf("Volume %d is read only", i) @@ -203,32 +204,32 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { return } -func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { +func (s *Store) Delete(i needle.VolumeId, n *needle.Needle) (uint32, error) { if v := s.findVolume(i); v != nil && !v.readOnly { return v.deleteNeedle(n) } return 0, nil } -func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) { +func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle) (int, error) { if v := s.findVolume(i); v != nil { return v.readNeedle(n) } return 0, fmt.Errorf("Volume %d not found!", i) } -func (s *Store) GetVolume(i VolumeId) *Volume { +func (s *Store) GetVolume(i needle.VolumeId) *Volume { return s.findVolume(i) } -func (s *Store) HasVolume(i VolumeId) bool { +func (s *Store) HasVolume(i needle.VolumeId) bool { v := s.findVolume(i) return v != nil } -func (s *Store) MountVolume(i VolumeId) error { +func (s *Store) MountVolume(i needle.VolumeId) error { for _, location := range s.Locations { if found := location.LoadVolume(i, s.NeedleMapType); found == true { - s.NewVolumeIdChan <- VolumeId(i) + s.NewVolumeIdChan <- needle.VolumeId(i) return nil } } @@ -236,10 +237,10 @@ func (s *Store) MountVolume(i VolumeId) error { return fmt.Errorf("Volume %d not found on disk", i) } -func (s *Store) UnmountVolume(i VolumeId) error { +func (s *Store) UnmountVolume(i needle.VolumeId) error { for _, location := range s.Locations { if err := location.UnloadVolume(i); err == nil { - s.DeletedVolumeIdChan <- VolumeId(i) + s.DeletedVolumeIdChan <- needle.VolumeId(i) return nil } } @@ -247,10 +248,10 @@ func (s *Store) UnmountVolume(i VolumeId) error { return fmt.Errorf("Volume %d not found on disk", i) } -func (s *Store) DeleteVolume(i VolumeId) error { +func (s *Store) DeleteVolume(i needle.VolumeId) error { for _, location := range s.Locations { if error := location.deleteVolumeById(i); error == nil { - s.DeletedVolumeIdChan <- VolumeId(i) + s.DeletedVolumeIdChan <- needle.VolumeId(i) return nil } } diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index 0036315c8..5f982a8c3 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -2,29 +2,31 @@ package storage import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) -func (s *Store) CheckCompactVolume(volumeId VolumeId) (float64, error) { +func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) { if v := s.findVolume(volumeId); v != nil { glog.V(3).Infof("volumd %d garbage level: %f", volumeId, v.garbageLevel()) return v.garbageLevel(), nil } return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId) } -func (s *Store) CompactVolume(vid VolumeId, preallocate int64) error { +func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64) error { if v := s.findVolume(vid); v != nil { return v.Compact(preallocate) } return fmt.Errorf("volume id %d is not found during compact", vid) } -func (s *Store) CommitCompactVolume(vid VolumeId) error { +func (s *Store) CommitCompactVolume(vid needle.VolumeId) error { if v := s.findVolume(vid); v != nil { return v.CommitCompact() } return fmt.Errorf("volume id %d is not found during commit compact", vid) } -func (s *Store) CommitCleanupVolume(vid VolumeId) error { +func (s *Store) CommitCleanupVolume(vid needle.VolumeId) error { if v := s.findVolume(vid); v != nil { return v.cleanupCompact() } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 54769eaca..e33d90d28 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -2,7 +2,10 @@ package storage import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "os" "path" "strconv" @@ -13,7 +16,7 @@ import ( ) type Volume struct { - Id VolumeId + Id needle.VolumeId dir string Collection string dataFile *os.File @@ -31,7 +34,7 @@ type Volume struct { lastCompactRevision uint16 } -func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL, preallocate int64) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} @@ -59,7 +62,7 @@ func (v *Volume) DataFile() *os.File { return v.dataFile } -func (v *Volume) Version() Version { +func (v *Volume) Version() needle.Version { return v.SuperBlock.Version() } diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index 9a261df71..d25f945f8 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -3,12 +3,14 @@ package storage import ( "context" "fmt" + "io" + "os" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" . "github.com/chrislusf/seaweedfs/weed/storage/types" "google.golang.org/grpc" - "io" - "os" ) func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse { @@ -147,7 +149,7 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) { func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { - n, _, bodyLength, err := ReadNeedleHeader(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()) + n, _, bodyLength, err := needle.ReadNeedleHeader(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()) if err != nil { return 0, fmt.Errorf("ReadNeedleHeader: %v", err) } @@ -245,7 +247,7 @@ func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool { return false } -func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *Needle, offset int64) error { +func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64) error { if n.Size > 0 && n.Size != TombstoneFileSize { return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size) } diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 1ac73d3d3..0f419f2c0 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -4,14 +4,11 @@ import ( "fmt" "os" + "github.com/chrislusf/seaweedfs/weed/storage/needle" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) -func getActualSize(size uint32, version Version) int64 { - return NeedleEntrySize + NeedleBodyLength(size, version) -} - func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error { var indexSize int64 var e error @@ -55,8 +52,8 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err return } -func verifyNeedleIntegrity(datFile *os.File, v Version, offset int64, key NeedleId, size uint32) error { - n := new(Needle) +func verifyNeedleIntegrity(datFile *os.File, v needle.Version, offset int64, key NeedleId, size uint32) error { + n := new(needle.Needle) err := n.ReadData(datFile, offset, size, v) if err != nil { return err diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go index f5ddeca14..638b1e751 100644 --- a/weed/storage/volume_info.go +++ b/weed/storage/volume_info.go @@ -5,15 +5,16 @@ import ( "sort" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type VolumeInfo struct { - Id VolumeId + Id needle.VolumeId Size uint64 ReplicaPlacement *ReplicaPlacement - Ttl *TTL + Ttl *needle.TTL Collection string - Version Version + Version needle.Version FileCount int DeleteCount int DeletedByteCount uint64 @@ -23,14 +24,14 @@ type VolumeInfo struct { func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err error) { vi = VolumeInfo{ - Id: VolumeId(m.Id), + Id: needle.VolumeId(m.Id), Size: m.Size, Collection: m.Collection, FileCount: int(m.FileCount), DeleteCount: int(m.DeleteCount), DeletedByteCount: m.DeletedByteCount, ReadOnly: m.ReadOnly, - Version: Version(m.Version), + Version: needle.Version(m.Version), CompactRevision: m.CompactRevision, } rp, e := NewReplicaPlacementFromByte(byte(m.ReplicaPlacement)) @@ -38,7 +39,7 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er return vi, e } vi.ReplicaPlacement = rp - vi.Ttl = LoadTTLFromUint32(m.Ttl) + vi.Ttl = needle.LoadTTLFromUint32(m.Ttl) return vi, nil } diff --git a/weed/storage/volume_info_test.go b/weed/storage/volume_info_test.go index 9a9c43ad2..5b1bacb52 100644 --- a/weed/storage/volume_info_test.go +++ b/weed/storage/volume_info_test.go @@ -1,6 +1,10 @@ package storage -import "testing" +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) func TestSortVolumeInfos(t *testing.T) { vis := []*VolumeInfo{ @@ -16,7 +20,7 @@ func TestSortVolumeInfos(t *testing.T) { } sortVolumeInfos(vis) for i := 0; i < len(vis); i++ { - if vis[i].Id != VolumeId(i+1) { + if vis[i].Id != needle.VolumeId(i+1) { t.Fatal() } } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 14013b302..82eb98b45 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -2,14 +2,16 @@ package storage import ( "fmt" - "github.com/syndtr/goleveldb/leveldb/opt" "os" "time" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/chrislusf/seaweedfs/weed/glog" ) -func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) { +func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{} v.needleMapKind = needleMapKind diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 50ea8fecb..45888d53b 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -9,6 +9,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -16,13 +17,13 @@ var ErrorNotFound = errors.New("not found") // isFileUnchanged checks whether this needle to write is same as last one. // It requires serialized access in the same volume. -func (v *Volume) isFileUnchanged(n *Needle) bool { +func (v *Volume) isFileUnchanged(n *needle.Needle) bool { if v.Ttl.String() != "" { return false } nv, ok := v.nm.Get(n.Id) if ok && !nv.Offset.IsZero() && nv.Size != TombstoneFileSize { - oldNeedle := new(Needle) + oldNeedle := new(needle.Needle) err := oldNeedle.ReadData(v.dataFile, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) if err != nil { glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToAcutalOffset(), nv.Size, err) @@ -76,8 +77,8 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { return } -func (v *Volume) writeNeedle(n *Needle) (offset uint64, size uint32, err error) { - glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String()) +func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, err error) { + glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) if v.readOnly { err = fmt.Errorf("%s is read-only", v.dataFile.Name()) return @@ -107,8 +108,8 @@ func (v *Volume) writeNeedle(n *Needle) (offset uint64, size uint32, err error) return } -func (v *Volume) deleteNeedle(n *Needle) (uint32, error) { - glog.V(4).Infof("delete needle %s", NewFileIdFromNeedle(v.Id, n).String()) +func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) { + glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) if v.readOnly { return 0, fmt.Errorf("%s is read-only", v.dataFile.Name()) } @@ -133,7 +134,7 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) { } // read fills in Needle content by looking up n.Id from NeedleMapper -func (v *Volume) readNeedle(n *Needle) (int, error) { +func (v *Volume) readNeedle(n *needle.Needle) (int, error) { nv, ok := v.nm.Get(n.Id) if !ok || nv.Offset.IsZero() { v.compactingWg.Wait() @@ -172,10 +173,10 @@ func (v *Volume) readNeedle(n *Needle) (int, error) { type VolumeFileScanner interface { VisitSuperBlock(SuperBlock) error ReadNeedleBody() bool - VisitNeedle(n *Needle, offset int64) error + VisitNeedle(n *needle.Needle, offset int64) error } -func ScanVolumeFile(dirname string, collection string, id VolumeId, +func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, volumeFileScanner VolumeFileScanner) (err error) { var v *Volume @@ -194,8 +195,8 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId, return ScanVolumeFileFrom(version, v.dataFile, offset, volumeFileScanner) } -func ScanVolumeFileFrom(version Version, dataFile *os.File, offset int64, volumeFileScanner VolumeFileScanner) (err error) { - n, _, rest, e := ReadNeedleHeader(dataFile, version, offset) +func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, volumeFileScanner VolumeFileScanner) (err error) { + n, _, rest, e := needle.ReadNeedleHeader(dataFile, version, offset) if e != nil { if e == io.EOF { return nil @@ -219,7 +220,7 @@ func ScanVolumeFileFrom(version Version, dataFile *os.File, offset int64, volume } offset += NeedleEntrySize + rest glog.V(4).Infof("==> new entry offset %d", offset) - if n, _, rest, err = ReadNeedleHeader(dataFile, version, offset); err != nil { + if n, _, rest, err = needle.ReadNeedleHeader(dataFile, version, offset); err != nil { if err == io.EOF { return nil } @@ -230,8 +231,8 @@ func ScanVolumeFileFrom(version Version, dataFile *os.File, offset int64, volume return nil } -func ScanVolumeFileNeedleFrom(version Version, dataFile *os.File, offset int64, fn func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error) (err error) { - n, nh, rest, e := ReadNeedleHeader(dataFile, version, offset) +func ScanVolumeFileNeedleFrom(version needle.Version, dataFile *os.File, offset int64, fn func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error) (err error) { + n, nh, rest, e := needle.ReadNeedleHeader(dataFile, version, offset) if e != nil { if e == io.EOF { return nil @@ -252,7 +253,7 @@ func ScanVolumeFileNeedleFrom(version Version, dataFile *os.File, offset int64, } offset += NeedleEntrySize + rest glog.V(4).Infof("==> new entry offset %d", offset) - if n, nh, rest, err = ReadNeedleHeader(dataFile, version, offset); err != nil { + if n, nh, rest, err = needle.ReadNeedleHeader(dataFile, version, offset); err != nil { if err == io.EOF { return nil } diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go index 6435a051f..cdb668888 100644 --- a/weed/storage/volume_super_block.go +++ b/weed/storage/volume_super_block.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" ) @@ -23,9 +24,9 @@ const ( * Rest bytes: Reserved */ type SuperBlock struct { - version Version + version needle.Version ReplicaPlacement *ReplicaPlacement - Ttl *TTL + Ttl *needle.TTL CompactRevision uint16 Extra *master_pb.SuperBlockExtra extraSize uint16 @@ -33,13 +34,13 @@ type SuperBlock struct { func (s *SuperBlock) BlockSize() int { switch s.version { - case Version2, Version3: + case needle.Version2, needle.Version3: return _SuperBlockSize + int(s.extraSize) } return _SuperBlockSize } -func (s *SuperBlock) Version() Version { +func (s *SuperBlock) Version() needle.Version { return s.version } func (s *SuperBlock) Bytes() []byte { @@ -75,7 +76,7 @@ func (v *Volume) maybeWriteSuperBlock() error { return e } if stat.Size() == 0 { - v.SuperBlock.version = CurrentVersion + v.SuperBlock.version = needle.CurrentVersion _, e = v.dataFile.Write(v.SuperBlock.Bytes()) if e != nil && os.IsPermission(e) { //read-only, but zero length - recreate it! @@ -105,12 +106,12 @@ func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error) { err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e) return } - superBlock.version = Version(header[0]) + superBlock.version = needle.Version(header[0]) if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { err = fmt.Errorf("cannot read replica type: %s", err.Error()) return } - superBlock.Ttl = LoadTTLFromBytes(header[2:4]) + superBlock.Ttl = needle.LoadTTLFromBytes(header[2:4]) superBlock.CompactRevision = util.BytesToUint16(header[4:6]) superBlock.extraSize = util.BytesToUint16(header[6:8]) diff --git a/weed/storage/volume_super_block_test.go b/weed/storage/volume_super_block_test.go index 13db4b194..06ad8a5d3 100644 --- a/weed/storage/volume_super_block_test.go +++ b/weed/storage/volume_super_block_test.go @@ -2,20 +2,22 @@ package storage import ( "testing" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func TestSuperBlockReadWrite(t *testing.T) { rp, _ := NewReplicaPlacementFromByte(byte(001)) - ttl, _ := ReadTTL("15d") + ttl, _ := needle.ReadTTL("15d") s := &SuperBlock{ - version: CurrentVersion, + version: needle.CurrentVersion, ReplicaPlacement: rp, Ttl: ttl, } bytes := s.Bytes() - if !(bytes[2] == 15 && bytes[3] == Day) { + if !(bytes[2] == 15 && bytes[3] == needle.Day) { println("byte[2]:", bytes[2], "byte[3]:", bytes[3]) t.Fail() } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index b550edb80..109955bb5 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -6,6 +6,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -205,7 +206,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI //even the needle cache in memory is hit, the need_bytes is correct glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size) var needleBytes []byte - needleBytes, err = ReadNeedleBlob(oldDatFile, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version()) + needleBytes, err = needle.ReadNeedleBlob(oldDatFile, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version()) if err != nil { return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, err) } @@ -213,7 +214,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize)) } else { //deleted needle //fakeDelNeedle 's default Data field is nil - fakeDelNeedle := new(Needle) + fakeDelNeedle := new(needle.Needle) fakeDelNeedle.Id = key fakeDelNeedle.Cookie = 0x12345678 fakeDelNeedle.AppendAtNs = uint64(time.Now().UnixNano()) @@ -235,7 +236,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI } type VolumeFileScanner4Vacuum struct { - version Version + version needle.Version v *Volume dst *os.File nm *NeedleMap @@ -255,7 +256,7 @@ func (scanner *VolumeFileScanner4Vacuum) ReadNeedleBody() bool { return true } -func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *Needle, offset int64) error { +func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset int64) error { if n.HasTtl() && scanner.now >= n.LastModified+uint64(scanner.v.Ttl.Minutes()*60) { return nil } @@ -334,7 +335,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { return nil } - n := new(Needle) + n := new(needle.Needle) err := n.ReadData(v.dataFile, offset.ToAcutalOffset(), size, v.Version()) if err != nil { return nil diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 4909885b9..5192d23b8 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -1,11 +1,13 @@ package storage import ( - "github.com/chrislusf/seaweedfs/weed/storage/types" "io/ioutil" "math/rand" "os" "testing" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" ) /* @@ -65,7 +67,7 @@ func TestCompaction(t *testing.T) { } defer os.RemoveAll(dir) // clean up - v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &ReplicaPlacement{}, &TTL{}, 0) + v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &ReplicaPlacement{}, &needle.TTL{}, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -145,21 +147,21 @@ func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) { type needleInfo struct { size uint32 - crc CRC + crc needle.CRC } -func newRandomNeedle(id uint64) *Needle { - n := new(Needle) +func newRandomNeedle(id uint64) *needle.Needle { + n := new(needle.Needle) n.Data = make([]byte, rand.Intn(1024)) rand.Read(n.Data) - n.Checksum = NewCRC(n.Data) + n.Checksum = needle.NewCRC(n.Data) n.Id = types.Uint64ToNeedleId(id) return n } -func newEmptyNeedle(id uint64) *Needle { - n := new(Needle) +func newEmptyNeedle(id uint64) *needle.Needle { + n := new(needle.Needle) n.Id = types.Uint64ToNeedleId(id) return n } diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index f08736f64..48336092f 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -2,9 +2,10 @@ package topology import ( "context" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "google.golang.org/grpc" ) @@ -12,7 +13,7 @@ type AllocateVolumeResult struct { Error string } -func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid storage.VolumeId, option *VolumeGrowOption) error { +func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error { return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go index 7a36c25ec..152691ccb 100644 --- a/weed/topology/cluster_commands.go +++ b/weed/topology/cluster_commands.go @@ -3,14 +3,14 @@ package topology import ( "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type MaxVolumeIdCommand struct { - MaxVolumeId storage.VolumeId `json:"maxVolumeId"` + MaxVolumeId needle.VolumeId `json:"maxVolumeId"` } -func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand { +func NewMaxVolumeIdCommand(value needle.VolumeId) *MaxVolumeIdCommand { return &MaxVolumeIdCommand{ MaxVolumeId: value, } diff --git a/weed/topology/collection.go b/weed/topology/collection.go index a17f0c961..f6b728ec9 100644 --- a/weed/topology/collection.go +++ b/weed/topology/collection.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -23,7 +24,7 @@ func (c *Collection) String() string { return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout) } -func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { +func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { keyString := rp.String() if ttl != nil { keyString += ttl.String() @@ -34,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl * return vl.(*VolumeLayout) } -func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { +func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode { for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { if list := vl.(*VolumeLayout).Lookup(vid); list != nil { diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 84304512f..a89aa65d8 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -2,7 +2,10 @@ package topology import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "strconv" "github.com/chrislusf/seaweedfs/weed/glog" @@ -11,7 +14,7 @@ import ( type DataNode struct { NodeImpl - volumes map[storage.VolumeId]storage.VolumeInfo + volumes map[needle.VolumeId]storage.VolumeInfo Ip string Port int PublicUrl string @@ -22,7 +25,7 @@ func NewDataNode(id string) *DataNode { s := &DataNode{} s.id = NodeId(id) s.nodeType = "DataNode" - s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) + s.volumes = make(map[needle.VolumeId]storage.VolumeInfo) s.NodeImpl.value = s return s } @@ -51,7 +54,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) { } func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) { - actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) + actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } @@ -84,7 +87,7 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { return ret } -func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) { +func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) { dn.RLock() defer dn.RUnlock() vInfo, ok := dn.volumes[id] diff --git a/weed/topology/node.go b/weed/topology/node.go index db70c9734..a115c8480 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -8,7 +8,7 @@ import ( "sync/atomic" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type NodeId string @@ -20,12 +20,12 @@ type Node interface { UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) UpAdjustVolumeCountDelta(volumeCountDelta int64) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) - UpAdjustMaxVolumeId(vid storage.VolumeId) + UpAdjustMaxVolumeId(vid needle.VolumeId) GetVolumeCount() int64 GetActiveVolumeCount() int64 GetMaxVolumeCount() int64 - GetMaxVolumeId() storage.VolumeId + GetMaxVolumeId() needle.VolumeId SetParent(Node) LinkChildNode(node Node) UnlinkChildNode(nodeId NodeId) @@ -46,8 +46,8 @@ type NodeImpl struct { maxVolumeCount int64 parent Node sync.RWMutex // lock children - children map[NodeId]Node - maxVolumeId storage.VolumeId + children map[NodeId]Node + maxVolumeId needle.VolumeId //for rack, data center, topology nodeType string @@ -190,7 +190,7 @@ func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) } } -func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative +func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative if n.maxVolumeId < vid { n.maxVolumeId = vid if n.parent != nil { @@ -198,7 +198,7 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative } } } -func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { +func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId { return n.maxVolumeId } func (n *NodeImpl) GetVolumeCount() int64 { diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index 4273e6d68..fd19cbfba 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -14,17 +14,18 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) func ReplicatedWrite(masterNode string, s *storage.Store, - volumeId storage.VolumeId, needle *storage.Needle, + volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (size uint32, errorStatus string) { //check JWT jwt := security.GetJwt(r) - ret, err := s.Write(volumeId, needle) + ret, err := s.Write(volumeId, n) needToReplicate := !s.HasVolume(volumeId) if err != nil { errorStatus = "Failed to write to local disk (" + err.Error() + ")" @@ -47,30 +48,30 @@ func ReplicatedWrite(masterNode string, s *storage.Store, } q := url.Values{ "type": {"replicate"}, - "ttl": {needle.Ttl.String()}, + "ttl": {n.Ttl.String()}, } - if needle.LastModified > 0 { - q.Set("ts", strconv.FormatUint(needle.LastModified, 10)) + if n.LastModified > 0 { + q.Set("ts", strconv.FormatUint(n.LastModified, 10)) } - if needle.IsChunkedManifest() { + if n.IsChunkedManifest() { q.Set("cm", "true") } u.RawQuery = q.Encode() pairMap := make(map[string]string) - if needle.HasPairs() { + if n.HasPairs() { tmpMap := make(map[string]string) - err := json.Unmarshal(needle.Pairs, &tmpMap) + err := json.Unmarshal(n.Pairs, &tmpMap) if err != nil { glog.V(0).Infoln("Unmarshal pairs error:", err) } for k, v := range tmpMap { - pairMap[storage.PairNamePrefix+k] = v + pairMap[needle.PairNamePrefix+k] = v } } _, err := operation.Upload(u.String(), - string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), + string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime), pairMap, jwt) return err }); err != nil { @@ -84,7 +85,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, } func ReplicatedDelete(masterNode string, store *storage.Store, - volumeId storage.VolumeId, n *storage.Needle, + volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (uint32, error) { //check JWT @@ -132,7 +133,7 @@ type RemoteResult struct { Error error } -func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error { +func distributedOperation(masterNode string, store *storage.Store, volumeId needle.VolumeId, op func(location operation.Location) error) error { if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { length := 0 selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 208c9b5b7..5426ec7e3 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -72,7 +73,7 @@ func (t *Topology) Leader() (string, error) { return l, nil } -func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { +func (t *Topology) Lookup(collection string, vid needle.VolumeId) []*DataNode { //maybe an issue if lots of collections? if collection == "" { for _, c := range t.collectionMap.Items() { @@ -88,7 +89,7 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { return nil } -func (t *Topology) NextVolumeId() (storage.VolumeId, error) { +func (t *Topology) NextVolumeId() (needle.VolumeId, error) { vid := t.GetMaxVolumeId() next := vid.Next() if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { @@ -108,10 +109,10 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, return "", 0, nil, errors.New("No writable volumes available!") } fileId, count := t.Sequence.NextFileId(count) - return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil + return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { return t.collectionMap.Get(collectionName, func() interface{} { return NewCollection(collectionName, t.volumeSizeLimit) }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index a8bdec902..ec745ee93 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -4,6 +4,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "testing" ) @@ -96,16 +98,16 @@ func TestAddRemoveVolume(t *testing.T) { dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25) v := storage.VolumeInfo{ - Id: storage.VolumeId(1), + Id: needle.VolumeId(1), Size: 100, Collection: "xcollection", FileCount: 123, DeleteCount: 23, DeletedByteCount: 45, ReadOnly: false, - Version: storage.CurrentVersion, + Version: needle.CurrentVersion, ReplicaPlacement: &storage.ReplicaPlacement{}, - Ttl: storage.EMPTY_TTL, + Ttl: needle.EMPTY_TTL, } dn.UpdateVolumes([]storage.VolumeInfo{v}) diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index ea65b2ff9..351ff842f 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -2,19 +2,20 @@ package topology import ( "context" - "google.golang.org/grpc" "time" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" ) -func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool { +func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool { ch := make(chan bool, locationlist.Length()) for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { + go func(index int, url string, vid needle.VolumeId) { err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: uint32(vid), @@ -44,11 +45,11 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi } return isCheckSuccess } -func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { +func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { vl.removeFromWritable(vid) ch := make(chan bool, locationlist.Length()) for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { + go func(index int, url string, vid needle.VolumeId) { glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ @@ -77,7 +78,7 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, } return isVacuumSuccess } -func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { +func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool { isCommitSuccess := true for _, dn := range locationlist.list { glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) @@ -99,7 +100,7 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v } return isCommitSuccess } -func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) { +func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { for _, dn := range locationlist.list { glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { @@ -133,7 +134,7 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { volumeLayout.accessLock.RLock() - tmpMap := make(map[storage.VolumeId]*VolumeLocationList) + tmpMap := make(map[needle.VolumeId]*VolumeLocationList) for vid, locationList := range volumeLayout.vid2location { tmpMap[vid] = locationList } diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 514033ca1..ff02044a1 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -2,10 +2,12 @@ package topology import ( "fmt" - "google.golang.org/grpc" "math/rand" "sync" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" ) @@ -21,7 +23,7 @@ This package is created to resolve these replica placement issues: type VolumeGrowOption struct { Collection string ReplicaPlacement *storage.ReplicaPlacement - Ttl *storage.TTL + Ttl *needle.TTL Prealloacte int64 DataCenter string Rack string @@ -193,7 +195,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum return } -func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { +func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { for _, server := range servers { if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil { vi := storage.VolumeInfo{ @@ -202,7 +204,7 @@ func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Ttl: option.Ttl, - Version: storage.CurrentVersion, + Version: needle.CurrentVersion, } server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(vi, server) diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index 1963cb928..3573365fd 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) var topologyLayout = ` @@ -96,9 +97,9 @@ func setup(topologyLayout string) *Topology { for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) vi := storage.VolumeInfo{ - Id: storage.VolumeId(int64(m["id"].(float64))), + Id: needle.VolumeId(int64(m["id"].(float64))), Size: uint64(m["size"].(float64)), - Version: storage.CurrentVersion} + Version: needle.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int64(serverMap["limit"].(float64))) diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 71a071e2f..b3aa7d251 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -9,16 +9,17 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *storage.ReplicaPlacement - ttl *storage.TTL - vid2location map[storage.VolumeId]*VolumeLocationList - writables []storage.VolumeId // transient array of writable volume id - readonlyVolumes map[storage.VolumeId]bool // transient set of readonly volumes - oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes + ttl *needle.TTL + vid2location map[needle.VolumeId]*VolumeLocationList + writables []needle.VolumeId // transient array of writable volume id + readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes + oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes volumeSizeLimit uint64 accessLock sync.RWMutex } @@ -29,14 +30,14 @@ type VolumeLayoutStats struct { FileCount uint64 } -func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout { return &VolumeLayout{ rp: rp, ttl: ttl, - vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), - readonlyVolumes: make(map[storage.VolumeId]bool), - oversizedVolumes: make(map[storage.VolumeId]bool), + vid2location: make(map[needle.VolumeId]*VolumeLocationList), + writables: *new([]needle.VolumeId), + readonlyVolumes: make(map[needle.VolumeId]bool), + oversizedVolumes: make(map[needle.VolumeId]bool), volumeSizeLimit: volumeSizeLimit, } } @@ -95,7 +96,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { delete(vl.vid2location, v.Id) } -func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) { +func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) { for _, id := range vl.writables { if vid == id { return @@ -110,7 +111,7 @@ func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool { func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { return !vl.isOversized(v) && - v.Version == storage.CurrentVersion && + v.Version == needle.CurrentVersion && !v.ReadOnly } @@ -121,7 +122,7 @@ func (vl *VolumeLayout) isEmpty() bool { return len(vl.vid2location) == 0 } -func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { +func (vl *VolumeLayout) Lookup(vid needle.VolumeId) []*DataNode { vl.accessLock.RLock() defer vl.accessLock.RUnlock() @@ -141,7 +142,7 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { return } -func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) { +func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*needle.VolumeId, uint64, *VolumeLocationList, error) { vl.accessLock.RLock() defer vl.accessLock.RUnlock() @@ -158,7 +159,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s } return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") } - var vid storage.VolumeId + var vid needle.VolumeId var locationList *VolumeLocationList counter := 0 for _, v := range vl.writables { @@ -205,7 +206,7 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { return counter } -func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { +func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool { toDeleteIndex := -1 for k, id := range vl.writables { if id == vid { @@ -220,7 +221,7 @@ func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { } return false } -func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { +func (vl *VolumeLayout) setVolumeWritable(vid needle.VolumeId) bool { for _, v := range vl.writables { if v == vid { return false @@ -231,7 +232,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { return true } -func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -245,7 +246,7 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) } return false } -func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -256,7 +257,7 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b return false } -func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { +func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go index 8d5881333..8905c54b5 100644 --- a/weed/topology/volume_location_list.go +++ b/weed/topology/volume_location_list.go @@ -3,7 +3,7 @@ package topology import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type VolumeLocationList struct { @@ -66,7 +66,7 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { } } -func (dnll *VolumeLocationList) Stats(vid storage.VolumeId, freshThreshHold int64) (size uint64, fileCount int) { +func (dnll *VolumeLocationList) Stats(vid needle.VolumeId, freshThreshHold int64) (size uint64, fileCount int) { for _, dnl := range dnll.list { if dnl.LastSeen < freshThreshHold { vinfo, err := dnl.GetVolumesById(vid) diff --git a/weed/operation/compress.go b/weed/util/compression.go similarity index 92% rename from weed/operation/compress.go rename to weed/util/compression.go index 7190eeeb2..bb78f916c 100644 --- a/weed/operation/compress.go +++ b/weed/util/compression.go @@ -1,4 +1,4 @@ -package operation +package util import ( "bytes" @@ -11,10 +11,33 @@ import ( "golang.org/x/tools/godoc/util" ) +func GzipData(input []byte) ([]byte, error) { + buf := new(bytes.Buffer) + w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed) + if _, err := w.Write(input); err != nil { + glog.V(2).Infoln("error compressing data:", err) + return nil, err + } + if err := w.Close(); err != nil { + glog.V(2).Infoln("error closing compressed data:", err) + return nil, err + } + return buf.Bytes(), nil +} +func UnGzipData(input []byte) ([]byte, error) { + buf := bytes.NewBuffer(input) + r, _ := gzip.NewReader(buf) + defer r.Close() + output, err := ioutil.ReadAll(r) + if err != nil { + glog.V(2).Infoln("error uncompressing data:", err) + } + return output, err +} + /* * Default more not to gzip since gzip can be done on client side. - */ -func IsGzippable(ext, mtype string, data []byte) bool { + */func IsGzippable(ext, mtype string, data []byte) bool { shouldBeZipped, iAmSure := IsGzippableFileType(ext, mtype) if iAmSure { @@ -28,8 +51,7 @@ func IsGzippable(ext, mtype string, data []byte) bool { /* * Default more not to gzip since gzip can be done on client side. - */ -func IsGzippableFileType(ext, mtype string) (shouldBeZipped, iAmSure bool) { + */func IsGzippableFileType(ext, mtype string) (shouldBeZipped, iAmSure bool) { // text if strings.HasPrefix(mtype, "text/") { @@ -70,26 +92,3 @@ func IsGzippableFileType(ext, mtype string) (shouldBeZipped, iAmSure bool) { return false, false } -func GzipData(input []byte) ([]byte, error) { - buf := new(bytes.Buffer) - w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed) - if _, err := w.Write(input); err != nil { - glog.V(2).Infoln("error compressing data:", err) - return nil, err - } - if err := w.Close(); err != nil { - glog.V(2).Infoln("error closing compressed data:", err) - return nil, err - } - return buf.Bytes(), nil -} -func UnGzipData(input []byte) ([]byte, error) { - buf := bytes.NewBuffer(input) - r, _ := gzip.NewReader(buf) - defer r.Close() - output, err := ioutil.ReadAll(r) - if err != nil { - glog.V(2).Infoln("error uncompressing data:", err) - } - return output, err -}