diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml
index 843e68e46..3425b79fa 100644
--- a/k8s/seaweedfs/Chart.yaml
+++ b/k8s/seaweedfs/Chart.yaml
@@ -1,4 +1,4 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
-version: 2.03
\ No newline at end of file
+version: 2.04
\ No newline at end of file
diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml
index 82776675d..622b9b29e 100644
--- a/k8s/seaweedfs/values.yaml
+++ b/k8s/seaweedfs/values.yaml
@@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
- imageTag: "2.03"
+ imageTag: "2.04"
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 01ac3dcfc..9fbc074ec 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -5,7 +5,7 @@
com.github.chrislusf
seaweedfs-client
- 1.4.9
+ 1.5.0
org.sonatype.oss
@@ -65,7 +65,7 @@
junit
junit
- 4.12
+ 4.13.1
test
diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy
index 01ac3dcfc..f5fb65143 100644
--- a/other/java/client/pom.xml.deploy
+++ b/other/java/client/pom.xml.deploy
@@ -5,7 +5,7 @@
com.github.chrislusf
seaweedfs-client
- 1.4.9
+ 1.5.0
org.sonatype.oss
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
index 14281f7a3..9f96294de 100644
--- a/other/java/client/pom_debug.xml
+++ b/other/java/client/pom_debug.xml
@@ -5,7 +5,7 @@
com.github.chrislusf
seaweedfs-client
- 1.4.9
+ 1.5.0
org.sonatype.oss
@@ -65,7 +65,7 @@
junit
junit
- 4.12
+ 4.13.1
test
diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml
index 966abd1c7..78fc3ca32 100644
--- a/other/java/hdfs2/dependency-reduced-pom.xml
+++ b/other/java/hdfs2/dependency-reduced-pom.xml
@@ -301,7 +301,7 @@
- 1.4.9
+ 1.5.0
2.9.2
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml
index ede70eea4..b89664f76 100644
--- a/other/java/hdfs2/pom.xml
+++ b/other/java/hdfs2/pom.xml
@@ -5,7 +5,7 @@
4.0.0
- 1.4.9
+ 1.5.0
2.9.2
diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml
index 9ac40b48b..47456c6f6 100644
--- a/other/java/hdfs3/dependency-reduced-pom.xml
+++ b/other/java/hdfs3/dependency-reduced-pom.xml
@@ -309,7 +309,7 @@
- 1.4.9
+ 1.5.0
3.1.1
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml
index d1a5d8e0f..825e80e40 100644
--- a/other/java/hdfs3/pom.xml
+++ b/other/java/hdfs3/pom.xml
@@ -5,7 +5,7 @@
4.0.0
- 1.4.9
+ 1.5.0
3.1.1
diff --git a/other/java/s3copier/pom.xml b/other/java/s3copier/pom.xml
index f8cb9e91c..c3ff30932 100644
--- a/other/java/s3copier/pom.xml
+++ b/other/java/s3copier/pom.xml
@@ -28,7 +28,7 @@
junit
junit
- 3.8.1
+ 4.13.1
test
diff --git a/test/random_access/pom.xml b/test/random_access/pom.xml
index 6c5c90eea..44a3fd9df 100644
--- a/test/random_access/pom.xml
+++ b/test/random_access/pom.xml
@@ -25,7 +25,7 @@
junit
junit
- 4.12
+ 4.13.1
test
diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go
index 12ac42dbe..bff5becc1 100644
--- a/unmaintained/repeated_vacuum/repeated_vacuum.go
+++ b/unmaintained/repeated_vacuum/repeated_vacuum.go
@@ -32,7 +32,7 @@ func main() {
go func() {
for {
println("vacuum threshold", *garbageThreshold)
- _, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", *master, *garbageThreshold))
+ _, _, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", *master, *garbageThreshold))
if err != nil {
log.Fatalf("vacuum: %v", err)
}
diff --git a/weed/Makefile b/weed/Makefile
index f537fe051..0e2d29623 100644
--- a/weed/Makefile
+++ b/weed/Makefile
@@ -16,7 +16,7 @@ debug_shell:
debug_mount:
go build -gcflags="all=-N -l"
- dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- mount -dir=~/tmp/mm
+ dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- mount -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/buckets
debug_server:
go build -gcflags="all=-N -l"
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 8bb585d91..e241a904e 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -290,7 +290,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
}
var bytes []byte
for _, url := range urls {
- bytes, err = util.Get(url)
+ bytes, _, err = util.Get(url)
if err == nil {
break
}
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 42a79bd1f..f9700679d 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -48,7 +48,7 @@ func init() {
mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system")
mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
mountOptions.nonempty = cmdMount.Flag.Bool("nonempty", false, "allows the mounting over a non-empty directory")
- mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system")
+ mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access volume servers with publicUrl")
mountOptions.uidMap = cmdMount.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated :")
mountOptions.gidMap = cmdMount.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated :")
@@ -72,11 +72,5 @@ var cmdMount = &Command{
On OS X, it requires OSXFUSE (http://osxfuse.github.com/).
- If the SeaweedFS system runs in a container cluster, e.g. managed by kubernetes or docker compose,
- the volume servers are not accessible by their own ip addresses.
- In "outsideContainerClusterMode", the mount will use the filer ip address instead, assuming:
- * All volume server containers are accessible through the same hostname or IP address as the filer.
- * All volume server container ports are open external to the cluster.
-
`,
}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 2df8a4bbf..f45448a6e 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -97,12 +97,16 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
var err error
var buffer bytes.Buffer
+ var shouldRetry bool
for waitTime := time.Second; waitTime < ReadWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
- err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
buffer.Write(data)
})
+ if !shouldRetry {
+ break
+ }
if err != nil {
glog.V(0).Infof("read %s failed, err: %v", urlString, err)
buffer.Reset()
@@ -110,8 +114,8 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
break
}
}
- if err != nil {
- glog.V(0).Infof("sleep for %v before retrying reading", waitTime)
+ if err != nil && shouldRetry {
+ glog.V(0).Infof("retry reading in %v", waitTime)
time.Sleep(waitTime)
} else {
break
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 835d6cfc2..da7eae621 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -75,7 +75,7 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
}
for _, loc := range locations.Locations {
- volumeServerAddress := filerClient.AdjustedUrl(loc.Url)
+ volumeServerAddress := filerClient.AdjustedUrl(loc)
targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
targetUrls = append(targetUrls, targetUrl)
}
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index a41aebe22..363b07f14 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -2,6 +2,7 @@ package filer
import (
"bytes"
+ "fmt"
"io"
"math"
"strings"
@@ -35,10 +36,14 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
if err != nil {
- return err
+ glog.Errorf("read chunk: %v", err)
+ return fmt.Errorf("read chunk: %v", err)
+ }
+ _, err = w.Write(data)
+ if err != nil {
+ glog.Errorf("write chunk: %v", err)
+ return fmt.Errorf("write chunk: %v", err)
}
- w.Write(data)
-
}
return nil
@@ -174,10 +179,14 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return err
}
var buffer bytes.Buffer
+ var shouldRetry bool
for _, urlString := range urlStrings {
- err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
+ if !shouldRetry {
+ break
+ }
if err != nil {
glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
buffer.Reset()
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 4dede3a8b..ae2ae3418 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -234,7 +234,11 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
dirPath := util.FullPath(dir.FullPath())
- meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, util.FullPath(dirPath))
+ visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
+ if visitErr != nil {
+ glog.Errorf("dir Lookup %s: %v", dirPath, visitErr)
+ return nil, fuse.EIO
+ }
cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT
@@ -296,7 +300,10 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
}
dirPath := util.FullPath(dir.FullPath())
- meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
+ if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
+ glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
+ return nil, fuse.EIO
+ }
listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(math.MaxInt32))
if listErr != nil {
glog.Errorf("list meta cache: %v", listErr)
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index bb81d6d27..0dd129623 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -2,6 +2,7 @@ package meta_cache
import (
"context"
+ "fmt"
"os"
"sync"
@@ -22,10 +23,10 @@ type MetaCache struct {
uidGidMapper *UidGidMapper
}
-func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper) *MetaCache {
+func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper) *MetaCache {
return &MetaCache{
localStore: openMetaStore(dbFolder),
- visitedBoundary: bounded_tree.NewBoundedTree(),
+ visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
uidGidMapper: uidGidMapper,
}
}
@@ -116,6 +117,10 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full
mc.RLock()
defer mc.RUnlock()
+ if !mc.visitedBoundary.HasVisited(dirPath) {
+ return nil, fmt.Errorf("unsynchronized dir: %v", dirPath)
+ }
+
entries, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
if err != nil {
return nil, err
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
index 455a8772c..f42d61230 100644
--- a/weed/filesys/meta_cache/meta_cache_init.go
+++ b/weed/filesys/meta_cache/meta_cache_init.go
@@ -3,6 +3,8 @@ package meta_cache
import (
"context"
"fmt"
+ "strings"
+ "time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -10,25 +12,34 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) {
+func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
- mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
+ return mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
- err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
- entry := filer.FromPbEntry(string(dirPath), pbEntry)
- if err := mc.doInsertEntry(context.Background(), entry); err != nil {
- glog.V(0).Infof("read %s: %v", entry.FullPath, err)
- return err
+ for waitTime := time.Second; waitTime < filer.ReadWaitTime; waitTime += waitTime / 2 {
+ err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
+ entry := filer.FromPbEntry(string(dirPath), pbEntry)
+ if err := mc.doInsertEntry(context.Background(), entry); err != nil {
+ glog.V(0).Infof("read %s: %v", entry.FullPath, err)
+ return err
+ }
+ if entry.IsDirectory() {
+ childDirectories = append(childDirectories, entry.Name())
+ }
+ return nil
+ })
+ if err == nil {
+ break
}
- if entry.IsDirectory() {
- childDirectories = append(childDirectories, entry.Name())
+ if strings.Contains(err.Error(), "transport: ") {
+ glog.V(0).Infof("ReadDirAllEntries %s: %v. Retry in %v", path, err, waitTime)
+ time.Sleep(waitTime)
+ continue
}
- return nil
- })
- if err != nil {
err = fmt.Errorf("list %s: %v", dirPath, err)
+ break
}
return
})
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 57b4c3da5..265fc95a8 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -92,7 +92,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
}
- wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), option.UidGidMapper)
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper)
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
grace.OnInterrupt(func() {
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
index 9791c8630..a245b6795 100644
--- a/weed/filesys/wfs_deletion.go
+++ b/weed/filesys/wfs_deletion.go
@@ -68,7 +68,7 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se
}
for _, loc := range locations.Locations {
lr.Locations = append(lr.Locations, operation.Location{
- Url: wfs.AdjustedUrl(loc.Url),
+ Url: wfs.AdjustedUrl(loc),
PublicUrl: loc.PublicUrl,
})
}
diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go
index 736df3588..096ee555f 100644
--- a/weed/filesys/wfs_filer_client.go
+++ b/weed/filesys/wfs_filer_client.go
@@ -1,9 +1,6 @@
package filesys
import (
- "fmt"
- "strings"
-
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -26,15 +23,9 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
}
-func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
- if !wfs.option.OutsideContainerClusterMode {
- return hostAndPort
- }
- commaIndex := strings.Index(hostAndPort, ":")
- if commaIndex < 0 {
- return hostAndPort
+func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
+ if wfs.option.OutsideContainerClusterMode {
+ return location.PublicUrl
}
- filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":")
- return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:])
-
+ return location.Url
}
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
index fec33e4ab..e7db31203 100644
--- a/weed/filesys/wfs_write.go
+++ b/weed/filesys/wfs_write.go
@@ -38,8 +38,12 @@ func (wfs *WFS) saveDataAsChunk(dir string) filer.SaveDataAsChunkFunctionType {
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
}
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
- host = wfs.AdjustedUrl(host)
+ fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
+ loc := &filer_pb.Location{
+ Url: resp.Url,
+ PublicUrl: resp.PublicUrl,
+ }
+ host = wfs.AdjustedUrl(loc)
collection, replication = resp.Collection, resp.Replication
return nil
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index 80f107e00..8e5b56fd0 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -108,6 +108,6 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient
}
-func (broker *MessageBroker) AdjustedUrl(hostAndPort string) string {
- return hostAndPort
+func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
}
diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go
index 4eacfa2e5..96a716d5b 100644
--- a/weed/pb/filer_pb/filer_client.go
+++ b/weed/pb/filer_pb/filer_client.go
@@ -21,7 +21,7 @@ var (
type FilerClient interface {
WithFilerClient(fn func(SeaweedFilerClient) error) error
- AdjustedUrl(hostAndPort string) string
+ AdjustedUrl(location *Location) string
}
func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go
index 9b18275b5..42777f4ad 100644
--- a/weed/replication/repl_util/replication_utli.go
+++ b/weed/replication/repl_util/replication_utli.go
@@ -19,7 +19,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var writeErr error
for _, fileUrl := range fileUrls {
- err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ _, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index d33669447..98330da6b 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -124,6 +124,6 @@ func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error)
}, fs.grpcAddress, fs.grpcDialOption)
}
-func (fs *FilerSink) AdjustedUrl(hostAndPort string) string {
- return hostAndPort
+func (fs *FilerSink) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
}
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index c3ef8835c..ff4f2eb26 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -111,8 +111,8 @@ func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) erro
}
-func (fs *FilerSource) AdjustedUrl(hostAndPort string) string {
- return hostAndPort
+func (fs *FilerSource) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
}
func volumeId(fileId string) string {
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go
index fa706cd1c..6935c75bd 100644
--- a/weed/s3api/s3api_handlers.go
+++ b/weed/s3api/s3api_handlers.go
@@ -50,8 +50,8 @@ func (s3a *S3ApiServer) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) err
}, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
}
-func (s3a *S3ApiServer) AdjustedUrl(hostAndPort string) string {
- return hostAndPort
+func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
}
// If none of the http routes match respond with MethodNotAllowed
diff --git a/weed/server/common.go b/weed/server/common.go
index 75fc3ad9e..44098a4b5 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -274,7 +274,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
ra := ranges[0]
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
- // w.WriteHeader(http.StatusPartialContent)
+ w.WriteHeader(http.StatusPartialContent)
err = writeFn(w, ra.start, ra.length)
if err != nil {
@@ -315,7 +315,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
if w.Header().Get("Content-Encoding") == "" {
w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
}
- // w.WriteHeader(http.StatusPartialContent)
+ w.WriteHeader(http.StatusPartialContent)
if _, err := io.CopyN(w, sendContent, sendSize); err != nil {
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index f13e73a7b..ee5fa5f65 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -118,8 +118,8 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient)
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
}
-func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string {
- return hostAndPort
+func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
}
func clearName(name string) (string, error) {
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
index 214783ee1..a82454cd3 100644
--- a/weed/shell/command_volume_server_evacuate.go
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -9,6 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"io"
+ "os"
"sort"
)
@@ -151,6 +152,11 @@ func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEc
for i := 0; i < len(otherNodes); i++ {
emptyNode := otherNodes[i]
+ collectionPrefix := ""
+ if ecShardInfo.Collection != "" {
+ collectionPrefix = ecShardInfo.Collection + "_"
+ }
+ fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
if err != nil {
return
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index f61ed9f82..1a937ad53 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -102,8 +102,8 @@ func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error
}
-func (ce *CommandEnv) AdjustedUrl(hostAndPort string) string {
- return hostAndPort
+func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
}
func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go
index 0e023c0d1..0e8af2520 100644
--- a/weed/util/bounded_tree/bounded_tree.go
+++ b/weed/util/bounded_tree/bounded_tree.go
@@ -16,13 +16,15 @@ type Node struct {
type BoundedTree struct {
root *Node
sync.RWMutex
+ baseDir util.FullPath
}
-func NewBoundedTree() *BoundedTree {
+func NewBoundedTree(baseDir util.FullPath) *BoundedTree {
return &BoundedTree{
root: &Node{
Name: "/",
},
+ baseDir: baseDir,
}
}
@@ -32,21 +34,29 @@ type VisitNodeFunc func(path util.FullPath) (childDirectories []string, err erro
// No action if the directory has been visited before or does not exist.
// A leaf node, which has no children, represents a directory not visited.
// A non-leaf node or a non-existing node represents a directory already visited, or does not need to visit.
-func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) {
+func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) (visitErr error) {
t.Lock()
defer t.Unlock()
if t.root == nil {
return
}
+ if t.baseDir != "/" {
+ p = p[len(t.baseDir):]
+ }
components := p.Split()
// fmt.Printf("components %v %d\n", components, len(components))
- if canDelete := t.ensureVisited(t.root, util.FullPath("/"), components, 0, visitFn); canDelete {
+ canDelete, err := t.ensureVisited(t.root, t.baseDir, components, 0, visitFn)
+ if err != nil {
+ return err
+ }
+ if canDelete {
t.root = nil
}
+ return nil
}
-func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool) {
+func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool, visitErr error) {
// println("ensureVisited", currentPath, i)
@@ -60,15 +70,20 @@ func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, componen
} else {
// fmt.Printf("ensure %v\n", currentPath)
- children, err := visitFn(currentPath)
+ filerPath := currentPath
+ if t.baseDir != "/" {
+ filerPath = t.baseDir + filerPath
+ }
+
+ children, err := visitFn(filerPath)
if err != nil {
glog.V(0).Infof("failed to visit %s: %v", currentPath, err)
- return
+ return false, err
}
if len(children) == 0 {
// fmt.Printf(" canDelete %v without children\n", currentPath)
- return true
+ return true, nil
}
n.Children = make(map[string]*Node)
@@ -93,19 +108,22 @@ func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, componen
}
// fmt.Printf(" ensureVisited %v %v\n", currentPath, toVisitNode.Name)
-
- if canDelete := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn); canDelete {
+ canDelete, childVisitErr := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn)
+ if childVisitErr != nil {
+ return false, childVisitErr
+ }
+ if canDelete {
// fmt.Printf(" delete %v %v\n", currentPath, components[i])
delete(n.Children, components[i])
if len(n.Children) == 0 {
// fmt.Printf(" canDelete %v\n", currentPath)
- return true
+ return true, nil
}
}
- return false
+ return false, nil
}
diff --git a/weed/util/bounded_tree/bounded_tree_test.go b/weed/util/bounded_tree/bounded_tree_test.go
index 0b9c3177a..465f1cc9c 100644
--- a/weed/util/bounded_tree/bounded_tree_test.go
+++ b/weed/util/bounded_tree/bounded_tree_test.go
@@ -52,7 +52,7 @@ func TestBoundedTree(t *testing.T) {
// g
// h
- tree := NewBoundedTree()
+ tree := NewBoundedTree(util.FullPath("/"))
tree.EnsureVisited(util.FullPath("/a/b/c"), visitFn)
@@ -100,7 +100,7 @@ func TestEmptyBoundedTree(t *testing.T) {
// g
// h
- tree := NewBoundedTree()
+ tree := NewBoundedTree(util.FullPath("/"))
visitFn := func(path util.FullPath) (childDirectories []string, err error) {
fmt.Printf(" visit %v ...\n", path)
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 0f3fd52c7..3b222fbaa 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 03)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 04)
COMMIT = ""
)
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index eef24b930..da0b3d849 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -67,14 +67,14 @@ func Post(url string, values url.Values) ([]byte, error) {
// github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
// may need increasing http.Client.Timeout
-func Get(url string) ([]byte, error) {
+func Get(url string) ([]byte, bool, error) {
request, err := http.NewRequest("GET", url, nil)
request.Header.Add("Accept-Encoding", "gzip")
response, err := client.Do(request)
if err != nil {
- return nil, err
+ return nil, true, err
}
defer response.Body.Close()
@@ -89,12 +89,13 @@ func Get(url string) ([]byte, error) {
b, err := ioutil.ReadAll(reader)
if response.StatusCode >= 400 {
- return nil, fmt.Errorf("%s: %s", url, response.Status)
+ retryable := response.StatusCode >= 500
+ return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
}
if err != nil {
- return nil, err
+ return nil, false, err
}
- return b, nil
+ return b, false, nil
}
func Head(url string) (http.Header, error) {
@@ -207,7 +208,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
if cipherKey != nil {
var n int
- err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
+ _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
n = copy(buf, data)
})
return int64(n), err
@@ -272,7 +273,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
return n, err
}
-func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
+func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
if cipherKey != nil {
return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
@@ -280,7 +281,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil {
- return err
+ return false, err
}
if isFullChunk {
@@ -291,11 +292,12 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
r, err := client.Do(req)
if err != nil {
- return err
+ return true, err
}
defer CloseResponse(r)
if r.StatusCode >= 400 {
- return fmt.Errorf("%s: %s", fileUrl, r.Status)
+ retryable = r.StatusCode >= 500
+ return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
var reader io.ReadCloser
@@ -317,23 +319,23 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
m, err = reader.Read(buf)
fn(buf[:m])
if err == io.EOF {
- return nil
+ return false, nil
}
if err != nil {
- return err
+ return false, err
}
}
}
-func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
- encryptedData, err := Get(fileUrl)
+func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
+ encryptedData, retryable, err := Get(fileUrl)
if err != nil {
- return fmt.Errorf("fetch %s: %v", fileUrl, err)
+ return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
}
decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
if err != nil {
- return fmt.Errorf("decrypt %s: %v", fileUrl, err)
+ return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
}
if isContentCompressed {
decryptedData, err = DecompressData(decryptedData)
@@ -342,14 +344,14 @@ func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool
}
}
if len(decryptedData) < int(offset)+size {
- return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
+ return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
}
if isFullChunk {
fn(decryptedData)
} else {
fn(decryptedData[int(offset) : int(offset)+size])
}
- return nil
+ return false, nil
}
func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {
diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go
index f057a8f5b..e8075c297 100644
--- a/weed/util/net_timeout.go
+++ b/weed/util/net_timeout.go
@@ -54,7 +54,8 @@ func (c *Conn) Read(b []byte) (count int, e error) {
func (c *Conn) Write(b []byte) (count int, e error) {
if c.WriteTimeout != 0 {
- err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
+ // minimum 4KB/s
+ err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Duration(len(b)/40000+1)))
if err != nil {
return 0, err
}