diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 62134715f..d60351671 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.3.6 + 1.3.9 org.sonatype.oss diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index 62134715f..d60351671 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.3.6 + 1.3.9 org.sonatype.oss diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index dcedc2aa6..c9fb256d4 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.3.6 + 1.3.9 org.sonatype.oss diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index 218021a58..62cd798b4 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -15,8 +15,8 @@ maven-compiler-plugin - 7 - 7 + 8 + 8 @@ -127,7 +127,7 @@ - 1.3.6 + 1.3.9 2.9.2 diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index 94f80a114..1f4e7aebc 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.3.6 + 1.3.9 2.9.2 @@ -31,8 +31,8 @@ org.apache.maven.plugins maven-compiler-plugin - 7 - 7 + 8 + 8 diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 1138ecca2..46de0c443 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -169,12 +169,35 @@ public class SeaweedOutputStream extends OutputStream { return; } - buffer.flip(); - int bytesLength = buffer.limit() - buffer.position(); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit()); - // System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")"); - position += bytesLength; - buffer.clear(); + position += submitWriteBufferToService(buffer, position); + + buffer = ByteBufferPool.request(bufferSize); + + } + + private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { + + bufferToWrite.flip(); + int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); + + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + waitForTaskToComplete(); + } + final Future job = completionService.submit(() -> { + // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit()); + // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + bufferToWrite.clear(); + ByteBufferPool.release(bufferToWrite); + return null; + }); + + writeOperations.add(new WriteOperation(job, writePosition, bytesLength)); + + // Try to shrink the queue + shrinkWriteOperationQueue(); + + return bytesLength; } diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index 00e236aa2..5a077cb31 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -15,8 +15,8 @@ maven-compiler-plugin - 7 - 7 + 8 + 8 @@ -127,7 +127,7 @@ - 1.3.6 + 1.3.9 3.1.1 diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index a03068a48..e289a1855 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.3.6 + 1.3.9 3.1.1 @@ -31,8 +31,8 @@ org.apache.maven.plugins maven-compiler-plugin - 7 - 7 + 8 + 8 diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 9ea26776b..c602a0d81 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -216,12 +216,35 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea return; } - buffer.flip(); - int bytesLength = buffer.limit() - buffer.position(); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit()); - // System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")"); - position += bytesLength; - buffer.clear(); + position += submitWriteBufferToService(buffer, position); + + buffer = ByteBufferPool.request(bufferSize); + + } + + private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { + + bufferToWrite.flip(); + int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); + + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + waitForTaskToComplete(); + } + final Future job = completionService.submit(() -> { + // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit()); + // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + bufferToWrite.clear(); + ByteBufferPool.release(bufferToWrite); + return null; + }); + + writeOperations.add(new WriteOperation(job, writePosition, bytesLength)); + + // Try to shrink the queue + shrinkWriteOperationQueue(); + + return bytesLength; } diff --git a/weed/command/backup.go b/weed/command/backup.go index 615be80cf..950cbf68e 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -112,7 +112,7 @@ func runBackup(cmd *Command, args []string) bool { return true } } - v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) + v, err := storage.NewVolume(util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true @@ -137,7 +137,7 @@ func runBackup(cmd *Command, args []string) bool { // remove the old data v.Destroy() // recreate an empty volume - v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) + v, err = storage.NewVolume(util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true diff --git a/weed/command/compact.go b/weed/command/compact.go index 4e28aa725..6117cf9f3 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -4,6 +4,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -40,7 +41,7 @@ func runCompact(cmd *Command, args []string) bool { preallocate := *compactVolumePreallocate * (1 << 20) vid := needle.VolumeId(*compactVolumeId) - v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, + v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, 0) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) diff --git a/weed/command/download.go b/weed/command/download.go index be0eb47e5..7d4dad2d4 100644 --- a/weed/command/download.go +++ b/weed/command/download.go @@ -43,7 +43,7 @@ var cmdDownload = &Command{ func runDownload(cmd *Command, args []string) bool { for _, fid := range args { - if e := downloadToFile(*d.server, fid, *d.dir); e != nil { + if e := downloadToFile(*d.server, fid, util.ResolvePath(*d.dir)); e != nil { fmt.Println("Download Error: ", fid, e) } } diff --git a/weed/command/export.go b/weed/command/export.go index 5d304b5a0..411d231cb 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -198,7 +198,7 @@ func runExport(cmd *Command, args []string) bool { needleMap := needle_map.NewMemDb() defer needleMap.Close() - if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil { + if err := needleMap.LoadFromIdx(path.Join(util.ResolvePath(*export.dir), fileName+".idx")); err != nil { glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err) } @@ -211,7 +211,7 @@ func runExport(cmd *Command, args []string) bool { fmt.Printf("key\tname\tsize\tgzip\tmime\tmodified\tttl\tdeleted\n") } - err = storage.ScanVolumeFile(*export.dir, *export.collection, vid, storage.NeedleMapInMemory, volumeFileScanner) + err = storage.ScanVolumeFile(util.ResolvePath(*export.dir), *export.collection, vid, storage.NeedleMapInMemory, volumeFileScanner) if err != nil && err != io.EOF { glog.Fatalf("Export Volume File [ERROR] %s\n", err) } diff --git a/weed/command/filer.go b/weed/command/filer.go index b52b01149..c36c43e93 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -100,7 +100,7 @@ func (fo *FilerOptions) startFiler() { defaultLevelDbDirectory := "./filerldb2" if fo.defaultLevelDbDirectory != nil { - defaultLevelDbDirectory = *fo.defaultLevelDbDirectory + "/filerldb2" + defaultLevelDbDirectory = util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2") } var peers []string diff --git a/weed/command/fix.go b/weed/command/fix.go index 223808f4b..e1455790f 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -67,7 +68,7 @@ func runFix(cmd *Command, args []string) bool { if *fixVolumeCollection != "" { baseFileName = *fixVolumeCollection + "_" + baseFileName } - indexFileName := path.Join(*fixVolumePath, baseFileName+".idx") + indexFileName := path.Join(util.ResolvePath(*fixVolumePath), baseFileName+".idx") nm := needle_map.NewMemDb() defer nm.Close() @@ -77,7 +78,7 @@ func runFix(cmd *Command, args []string) bool { nm: nm, } - if err := storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner); err != nil { + if err := storage.ScanVolumeFile(util.ResolvePath(*fixVolumePath), *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner); err != nil { glog.Fatalf("scan .dat File: %v", err) os.Remove(indexFileName) } diff --git a/weed/command/master.go b/weed/command/master.go index 7cbd66f5c..a6fe744d7 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -8,10 +8,11 @@ import ( "strings" "github.com/chrislusf/raft/protobuf" - "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/gorilla/mux" "google.golang.org/grpc/reflection" + "github.com/chrislusf/seaweedfs/weed/util/grace" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -85,7 +86,7 @@ func runMaster(cmd *Command, args []string) bool { runtime.GOMAXPROCS(runtime.NumCPU()) grace.SetupProfiling(*masterCpuProfile, *masterMemProfile) - if err := util.TestFolderWritable(*m.metaFolder); err != nil { + if err := util.TestFolderWritable(util.ResolvePath(*m.metaFolder)); err != nil { glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err) } @@ -118,7 +119,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } // start raftServer raftServer := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"), - peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, 5) + peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, 5) if raftServer == nil { glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder) } diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 7520de784..56df740c4 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -69,7 +69,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { } filerMountRootPath := *option.filerMountRootPath - dir := *option.dir + dir := util.ResolvePath(*option.dir) chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB util.LoadConfiguration("security", false) diff --git a/weed/command/server.go b/weed/command/server.go index 8bc98b900..565563c77 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -176,7 +176,7 @@ func runServer(cmd *Command, args []string) bool { if *masterOptions.metaFolder == "" { *masterOptions.metaFolder = folders[0] } - if err := util.TestFolderWritable(*masterOptions.metaFolder); err != nil { + if err := util.TestFolderWritable(util.ResolvePath(*masterOptions.metaFolder)); err != nil { glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterOptions.metaFolder, err) } filerOptions.defaultLevelDbDirectory = masterOptions.metaFolder diff --git a/weed/command/upload.go b/weed/command/upload.go index 358897aee..45b15535b 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -69,7 +69,7 @@ func runUpload(cmd *Command, args []string) bool { if *upload.dir == "" { return false } - filepath.Walk(*upload.dir, func(path string, info os.FileInfo, err error) error { + filepath.Walk(util.ResolvePath(*upload.dir), func(path string, info os.FileInfo, err error) error { if err == nil { if !info.IsDir() { if *upload.include != "" { diff --git a/weed/command/volume.go b/weed/command/volume.go index 27687af66..4f04a467d 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -117,7 +117,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v // Set multiple folders and each folder's max volume count limit' v.folders = strings.Split(volumeFolders, ",") for _, folder := range v.folders { - if err := util.TestFolderWritable(folder); err != nil { + if err := util.TestFolderWritable(util.ResolvePath(folder)); err != nil { glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err) } } diff --git a/weed/command/webdav.go b/weed/command/webdav.go index b9676c909..dc84b1fd0 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -110,7 +110,7 @@ func (wo *WebDavOption) startWebDav() bool { Uid: uid, Gid: gid, Cipher: cipher, - CacheDir: *wo.cacheDir, + CacheDir: util.ResolvePath(*wo.cacheDir), CacheSizeMB: *wo.cacheSizeMB, }) if webdavServer_err != nil { diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 7ef1170b3..d508849bc 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -208,7 +208,9 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, func conditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext string, r *http.Request) io.ReadSeeker { rs := originalDataReaderSeeker - + if len(ext) > 0 { + ext = strings.ToLower(ext) + } width, height, mode, shouldResize := shouldResizeImages(ext, r) if shouldResize { rs, _, _ = images.Resized(ext, originalDataReaderSeeker, width, height, mode) @@ -217,9 +219,6 @@ func conditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext strin } func shouldResizeImages(ext string, r *http.Request) (width, height int, mode string, shouldResize bool) { - if len(ext) > 0 { - ext = strings.ToLower(ext) - } if ext == ".png" || ext == ".jpg" || ext == ".jpeg" || ext == ".gif" { if r.FormValue("width") != "" { width, _ = strconv.Atoi(r.FormValue("width")) diff --git a/weed/storage/store.go b/weed/storage/store.go index 0ac3381c5..02372da97 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -16,6 +16,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" . "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" ) const ( @@ -28,13 +29,13 @@ const ( type Store struct { MasterAddress string grpcDialOption grpc.DialOption - volumeSizeLimit uint64 //read from the master + volumeSizeLimit uint64 // read from the master Ip string Port int PublicUrl string Locations []*DiskLocation - dataCenter string //optional informaton, overwriting master setting if exists - rack string //optional information, overwriting master setting if exists + dataCenter string // optional informaton, overwriting master setting if exists + rack string // optional information, overwriting master setting if exists connected bool NeedleMapType NeedleMapType NewVolumesChan chan master_pb.VolumeShortInformationMessage @@ -52,7 +53,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { - location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i]) + location := NewDiskLocation(util.ResolvePath(dirnames[i]), maxVolumeCounts[i], minFreeSpacePercents[i]) location.loadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) diff --git a/weed/util/file_util.go b/weed/util/file_util.go index ff725830b..70135180d 100644 --- a/weed/util/file_util.go +++ b/weed/util/file_util.go @@ -3,6 +3,9 @@ package util import ( "errors" "os" + "os/user" + "path/filepath" + "strings" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -63,3 +66,20 @@ func CheckFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti fileSize = fi.Size() return } + +func ResolvePath(path string) string { + + usr, _ := user.Current() + dir := usr.HomeDir + + if path == "~" { + // In case of "~", which won't be caught by the "else if" + path = dir + } else if strings.HasPrefix(path, "~/") { + // Use strings.HasPrefix so we don't match paths like + // "/something/~/something/" + path = filepath.Join(dir, path[2:]) + } + + return path +}