Browse Source

Merge pull request #26 from chrislusf/master

sync
pull/1542/head
hilimd 4 years ago
committed by GitHub
parent
commit
a911375798
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      k8s/seaweedfs/Chart.yaml
  2. 2
      k8s/seaweedfs/values.yaml
  3. 4
      other/java/client/pom.xml
  4. 2
      other/java/client/pom.xml.deploy
  5. 4
      other/java/client/pom_debug.xml
  6. 2
      other/java/hdfs2/dependency-reduced-pom.xml
  7. 2
      other/java/hdfs2/pom.xml
  8. 2
      other/java/hdfs3/dependency-reduced-pom.xml
  9. 2
      other/java/hdfs3/pom.xml
  10. 2
      other/java/s3copier/pom.xml
  11. 2
      test/random_access/pom.xml
  12. 2
      unmaintained/repeated_vacuum/repeated_vacuum.go
  13. 2
      weed/Makefile
  14. 2
      weed/command/benchmark.go
  15. 8
      weed/command/mount.go
  16. 10
      weed/filer/filechunk_manifest.go
  17. 2
      weed/filer/reader_at.go
  18. 17
      weed/filer/stream.go
  19. 11
      weed/filesys/dir.go
  20. 9
      weed/filesys/meta_cache/meta_cache.go
  21. 17
      weed/filesys/meta_cache/meta_cache_init.go
  22. 2
      weed/filesys/wfs.go
  23. 2
      weed/filesys/wfs_deletion.go
  24. 17
      weed/filesys/wfs_filer_client.go
  25. 8
      weed/filesys/wfs_write.go
  26. 4
      weed/messaging/broker/broker_append.go
  27. 2
      weed/pb/filer_pb/filer_client.go
  28. 2
      weed/replication/repl_util/replication_utli.go
  29. 4
      weed/replication/sink/filersink/fetch_write.go
  30. 4
      weed/replication/source/filer_source.go
  31. 4
      weed/s3api/s3api_handlers.go
  32. 4
      weed/server/common.go
  33. 4
      weed/server/webdav_server.go
  34. 6
      weed/shell/command_volume_server_evacuate.go
  35. 4
      weed/shell/commands.go
  36. 40
      weed/util/bounded_tree/bounded_tree.go
  37. 4
      weed/util/bounded_tree/bounded_tree_test.go
  38. 2
      weed/util/constants.go
  39. 38
      weed/util/http_util.go
  40. 3
      weed/util/net_timeout.go

2
k8s/seaweedfs/Chart.yaml

@ -1,4 +1,4 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
version: 2.03
version: 2.04

2
k8s/seaweedfs/values.yaml

@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
imageTag: "2.03"
imageTag: "2.04"
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always

4
other/java/client/pom.xml

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.4.9</version>
<version>1.5.0</version>
<parent>
<groupId>org.sonatype.oss</groupId>
@ -65,7 +65,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>

2
other/java/client/pom.xml.deploy

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.4.9</version>
<version>1.5.0</version>
<parent>
<groupId>org.sonatype.oss</groupId>

4
other/java/client/pom_debug.xml

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.4.9</version>
<version>1.5.0</version>
<parent>
<groupId>org.sonatype.oss</groupId>
@ -65,7 +65,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>

2
other/java/hdfs2/dependency-reduced-pom.xml

@ -301,7 +301,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.4.9</seaweedfs.client.version>
<seaweedfs.client.version>1.5.0</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>

2
other/java/hdfs2/pom.xml

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<seaweedfs.client.version>1.4.9</seaweedfs.client.version>
<seaweedfs.client.version>1.5.0</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>

2
other/java/hdfs3/dependency-reduced-pom.xml

@ -309,7 +309,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.4.9</seaweedfs.client.version>
<seaweedfs.client.version>1.5.0</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>

2
other/java/hdfs3/pom.xml

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<seaweedfs.client.version>1.4.9</seaweedfs.client.version>
<seaweedfs.client.version>1.5.0</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>

2
other/java/s3copier/pom.xml

@ -28,7 +28,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>

2
test/random_access/pom.xml

@ -25,7 +25,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>

2
unmaintained/repeated_vacuum/repeated_vacuum.go

@ -32,7 +32,7 @@ func main() {
go func() {
for {
println("vacuum threshold", *garbageThreshold)
_, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", *master, *garbageThreshold))
_, _, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", *master, *garbageThreshold))
if err != nil {
log.Fatalf("vacuum: %v", err)
}

2
weed/Makefile

@ -16,7 +16,7 @@ debug_shell:
debug_mount:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- mount -dir=~/tmp/mm
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- mount -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/buckets
debug_server:
go build -gcflags="all=-N -l"

2
weed/command/benchmark.go

@ -290,7 +290,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
}
var bytes []byte
for _, url := range urls {
bytes, err = util.Get(url)
bytes, _, err = util.Get(url)
if err == nil {
break
}

8
weed/command/mount.go

@ -48,7 +48,7 @@ func init() {
mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system")
mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
mountOptions.nonempty = cmdMount.Flag.Bool("nonempty", false, "allows the mounting over a non-empty directory")
mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system")
mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access volume servers with publicUrl")
mountOptions.uidMap = cmdMount.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated <local_uid>:<filer_uid>")
mountOptions.gidMap = cmdMount.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated <local_gid>:<filer_gid>")
@ -72,11 +72,5 @@ var cmdMount = &Command{
On OS X, it requires OSXFUSE (http://osxfuse.github.com/).
If the SeaweedFS system runs in a container cluster, e.g. managed by kubernetes or docker compose,
the volume servers are not accessible by their own ip addresses.
In "outsideContainerClusterMode", the mount will use the filer ip address instead, assuming:
* All volume server containers are accessible through the same hostname or IP address as the filer.
* All volume server container ports are open external to the cluster.
`,
}

10
weed/filer/filechunk_manifest.go

@ -97,12 +97,16 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
var err error
var buffer bytes.Buffer
var shouldRetry bool
for waitTime := time.Second; waitTime < ReadWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
shouldRetry, err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
break
}
if err != nil {
glog.V(0).Infof("read %s failed, err: %v", urlString, err)
buffer.Reset()
@ -110,8 +114,8 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
break
}
}
if err != nil {
glog.V(0).Infof("sleep for %v before retrying reading", waitTime)
if err != nil && shouldRetry {
glog.V(0).Infof("retry reading in %v", waitTime)
time.Sleep(waitTime)
} else {
break

2
weed/filer/reader_at.go

@ -75,7 +75,7 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
}
for _, loc := range locations.Locations {
volumeServerAddress := filerClient.AdjustedUrl(loc.Url)
volumeServerAddress := filerClient.AdjustedUrl(loc)
targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
targetUrls = append(targetUrls, targetUrl)
}

17
weed/filer/stream.go

@ -2,6 +2,7 @@ package filer
import (
"bytes"
"fmt"
"io"
"math"
"strings"
@ -35,10 +36,14 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
if err != nil {
return err
glog.Errorf("read chunk: %v", err)
return fmt.Errorf("read chunk: %v", err)
}
_, err = w.Write(data)
if err != nil {
glog.Errorf("write chunk: %v", err)
return fmt.Errorf("write chunk: %v", err)
}
w.Write(data)
}
return nil
@ -174,10 +179,14 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return err
}
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
break
}
if err != nil {
glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
buffer.Reset()

11
weed/filesys/dir.go

@ -234,7 +234,11 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
dirPath := util.FullPath(dir.FullPath())
meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, util.FullPath(dirPath))
visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
if visitErr != nil {
glog.Errorf("dir Lookup %s: %v", dirPath, visitErr)
return nil, fuse.EIO
}
cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT
@ -296,7 +300,10 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
}
dirPath := util.FullPath(dir.FullPath())
meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return nil, fuse.EIO
}
listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(math.MaxInt32))
if listErr != nil {
glog.Errorf("list meta cache: %v", listErr)

9
weed/filesys/meta_cache/meta_cache.go

@ -2,6 +2,7 @@ package meta_cache
import (
"context"
"fmt"
"os"
"sync"
@ -22,10 +23,10 @@ type MetaCache struct {
uidGidMapper *UidGidMapper
}
func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper) *MetaCache {
func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper) *MetaCache {
return &MetaCache{
localStore: openMetaStore(dbFolder),
visitedBoundary: bounded_tree.NewBoundedTree(),
visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
uidGidMapper: uidGidMapper,
}
}
@ -116,6 +117,10 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full
mc.RLock()
defer mc.RUnlock()
if !mc.visitedBoundary.HasVisited(dirPath) {
return nil, fmt.Errorf("unsynchronized dir: %v", dirPath)
}
entries, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
if err != nil {
return nil, err

17
weed/filesys/meta_cache/meta_cache_init.go

@ -3,6 +3,8 @@ package meta_cache
import (
"context"
"fmt"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@ -10,12 +12,13 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) {
func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
return mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
for waitTime := time.Second; waitTime < filer.ReadWaitTime; waitTime += waitTime / 2 {
err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
entry := filer.FromPbEntry(string(dirPath), pbEntry)
if err := mc.doInsertEntry(context.Background(), entry); err != nil {
@ -27,8 +30,16 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
}
return nil
})
if err != nil {
if err == nil {
break
}
if strings.Contains(err.Error(), "transport: ") {
glog.V(0).Infof("ReadDirAllEntries %s: %v. Retry in %v", path, err, waitTime)
time.Sleep(waitTime)
continue
}
err = fmt.Errorf("list %s: %v", dirPath, err)
break
}
return
})

2
weed/filesys/wfs.go

@ -92,7 +92,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
}
wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), option.UidGidMapper)
wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper)
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
grace.OnInterrupt(func() {

2
weed/filesys/wfs_deletion.go

@ -68,7 +68,7 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se
}
for _, loc := range locations.Locations {
lr.Locations = append(lr.Locations, operation.Location{
Url: wfs.AdjustedUrl(loc.Url),
Url: wfs.AdjustedUrl(loc),
PublicUrl: loc.PublicUrl,
})
}

17
weed/filesys/wfs_filer_client.go

@ -1,9 +1,6 @@
package filesys
import (
"fmt"
"strings"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb"
@ -26,15 +23,9 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
}
func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
if !wfs.option.OutsideContainerClusterMode {
return hostAndPort
}
commaIndex := strings.Index(hostAndPort, ":")
if commaIndex < 0 {
return hostAndPort
func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
if wfs.option.OutsideContainerClusterMode {
return location.PublicUrl
}
filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":")
return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:])
return location.Url
}

8
weed/filesys/wfs_write.go

@ -38,8 +38,12 @@ func (wfs *WFS) saveDataAsChunk(dir string) filer.SaveDataAsChunkFunctionType {
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
}
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
host = wfs.AdjustedUrl(host)
fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
loc := &filer_pb.Location{
Url: resp.Url,
PublicUrl: resp.PublicUrl,
}
host = wfs.AdjustedUrl(loc)
collection, replication = resp.Collection, resp.Replication
return nil

4
weed/messaging/broker/broker_append.go

@ -108,6 +108,6 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient
}
func (broker *MessageBroker) AdjustedUrl(hostAndPort string) string {
return hostAndPort
func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}

2
weed/pb/filer_pb/filer_client.go

@ -21,7 +21,7 @@ var (
type FilerClient interface {
WithFilerClient(fn func(SeaweedFilerClient) error) error
AdjustedUrl(hostAndPort string) string
AdjustedUrl(location *Location) string
}
func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {

2
weed/replication/repl_util/replication_utli.go

@ -19,7 +19,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var writeErr error
for _, fileUrl := range fileUrls {
err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
_, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {

4
weed/replication/sink/filersink/fetch_write.go

@ -124,6 +124,6 @@ func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error)
}, fs.grpcAddress, fs.grpcDialOption)
}
func (fs *FilerSink) AdjustedUrl(hostAndPort string) string {
return hostAndPort
func (fs *FilerSink) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}

4
weed/replication/source/filer_source.go

@ -111,8 +111,8 @@ func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) erro
}
func (fs *FilerSource) AdjustedUrl(hostAndPort string) string {
return hostAndPort
func (fs *FilerSource) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func volumeId(fileId string) string {

4
weed/s3api/s3api_handlers.go

@ -50,8 +50,8 @@ func (s3a *S3ApiServer) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) err
}, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
}
func (s3a *S3ApiServer) AdjustedUrl(hostAndPort string) string {
return hostAndPort
func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
// If none of the http routes match respond with MethodNotAllowed

4
weed/server/common.go

@ -274,7 +274,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
ra := ranges[0]
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
// w.WriteHeader(http.StatusPartialContent)
w.WriteHeader(http.StatusPartialContent)
err = writeFn(w, ra.start, ra.length)
if err != nil {
@ -315,7 +315,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
if w.Header().Get("Content-Encoding") == "" {
w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
}
// w.WriteHeader(http.StatusPartialContent)
w.WriteHeader(http.StatusPartialContent)
if _, err := io.CopyN(w, sendContent, sendSize); err != nil {
http.Error(w, "Internal Error", http.StatusInternalServerError)
return

4
weed/server/webdav_server.go

@ -118,8 +118,8 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient)
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
}
func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string {
return hostAndPort
func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func clearName(name string) (string, error) {

6
weed/shell/command_volume_server_evacuate.go

@ -9,6 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"io"
"os"
"sort"
)
@ -151,6 +152,11 @@ func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEc
for i := 0; i < len(otherNodes); i++ {
emptyNode := otherNodes[i]
collectionPrefix := ""
if ecShardInfo.Collection != "" {
collectionPrefix = ecShardInfo.Collection + "_"
}
fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
if err != nil {
return

4
weed/shell/commands.go

@ -102,8 +102,8 @@ func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error
}
func (ce *CommandEnv) AdjustedUrl(hostAndPort string) string {
return hostAndPort
func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {

40
weed/util/bounded_tree/bounded_tree.go

@ -16,13 +16,15 @@ type Node struct {
type BoundedTree struct {
root *Node
sync.RWMutex
baseDir util.FullPath
}
func NewBoundedTree() *BoundedTree {
func NewBoundedTree(baseDir util.FullPath) *BoundedTree {
return &BoundedTree{
root: &Node{
Name: "/",
},
baseDir: baseDir,
}
}
@ -32,21 +34,29 @@ type VisitNodeFunc func(path util.FullPath) (childDirectories []string, err erro
// No action if the directory has been visited before or does not exist.
// A leaf node, which has no children, represents a directory not visited.
// A non-leaf node or a non-existing node represents a directory already visited, or does not need to visit.
func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) {
func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) (visitErr error) {
t.Lock()
defer t.Unlock()
if t.root == nil {
return
}
if t.baseDir != "/" {
p = p[len(t.baseDir):]
}
components := p.Split()
// fmt.Printf("components %v %d\n", components, len(components))
if canDelete := t.ensureVisited(t.root, util.FullPath("/"), components, 0, visitFn); canDelete {
canDelete, err := t.ensureVisited(t.root, t.baseDir, components, 0, visitFn)
if err != nil {
return err
}
if canDelete {
t.root = nil
}
return nil
}
func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool) {
func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool, visitErr error) {
// println("ensureVisited", currentPath, i)
@ -60,15 +70,20 @@ func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, componen
} else {
// fmt.Printf("ensure %v\n", currentPath)
children, err := visitFn(currentPath)
filerPath := currentPath
if t.baseDir != "/" {
filerPath = t.baseDir + filerPath
}
children, err := visitFn(filerPath)
if err != nil {
glog.V(0).Infof("failed to visit %s: %v", currentPath, err)
return
return false, err
}
if len(children) == 0 {
// fmt.Printf(" canDelete %v without children\n", currentPath)
return true
return true, nil
}
n.Children = make(map[string]*Node)
@ -93,19 +108,22 @@ func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, componen
}
// fmt.Printf(" ensureVisited %v %v\n", currentPath, toVisitNode.Name)
if canDelete := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn); canDelete {
canDelete, childVisitErr := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn)
if childVisitErr != nil {
return false, childVisitErr
}
if canDelete {
// fmt.Printf(" delete %v %v\n", currentPath, components[i])
delete(n.Children, components[i])
if len(n.Children) == 0 {
// fmt.Printf(" canDelete %v\n", currentPath)
return true
return true, nil
}
}
return false
return false, nil
}

4
weed/util/bounded_tree/bounded_tree_test.go

@ -52,7 +52,7 @@ func TestBoundedTree(t *testing.T) {
// g
// h
tree := NewBoundedTree()
tree := NewBoundedTree(util.FullPath("/"))
tree.EnsureVisited(util.FullPath("/a/b/c"), visitFn)
@ -100,7 +100,7 @@ func TestEmptyBoundedTree(t *testing.T) {
// g
// h
tree := NewBoundedTree()
tree := NewBoundedTree(util.FullPath("/"))
visitFn := func(path util.FullPath) (childDirectories []string, err error) {
fmt.Printf(" visit %v ...\n", path)

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
)
var (
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 03)
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 04)
COMMIT = ""
)

38
weed/util/http_util.go

@ -67,14 +67,14 @@ func Post(url string, values url.Values) ([]byte, error) {
// github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
// may need increasing http.Client.Timeout
func Get(url string) ([]byte, error) {
func Get(url string) ([]byte, bool, error) {
request, err := http.NewRequest("GET", url, nil)
request.Header.Add("Accept-Encoding", "gzip")
response, err := client.Do(request)
if err != nil {
return nil, err
return nil, true, err
}
defer response.Body.Close()
@ -89,12 +89,13 @@ func Get(url string) ([]byte, error) {
b, err := ioutil.ReadAll(reader)
if response.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, response.Status)
retryable := response.StatusCode >= 500
return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
}
if err != nil {
return nil, err
return nil, false, err
}
return b, nil
return b, false, nil
}
func Head(url string) (http.Header, error) {
@ -207,7 +208,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
if cipherKey != nil {
var n int
err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
_, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
n = copy(buf, data)
})
return int64(n), err
@ -272,7 +273,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
return n, err
}
func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
if cipherKey != nil {
return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
@ -280,7 +281,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil {
return err
return false, err
}
if isFullChunk {
@ -291,11 +292,12 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
r, err := client.Do(req)
if err != nil {
return err
return true, err
}
defer CloseResponse(r)
if r.StatusCode >= 400 {
return fmt.Errorf("%s: %s", fileUrl, r.Status)
retryable = r.StatusCode >= 500
return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
var reader io.ReadCloser
@ -317,23 +319,23 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
m, err = reader.Read(buf)
fn(buf[:m])
if err == io.EOF {
return nil
return false, nil
}
if err != nil {
return err
return false, err
}
}
}
func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
encryptedData, err := Get(fileUrl)
func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
encryptedData, retryable, err := Get(fileUrl)
if err != nil {
return fmt.Errorf("fetch %s: %v", fileUrl, err)
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
}
decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
if err != nil {
return fmt.Errorf("decrypt %s: %v", fileUrl, err)
return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
}
if isContentCompressed {
decryptedData, err = DecompressData(decryptedData)
@ -342,14 +344,14 @@ func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool
}
}
if len(decryptedData) < int(offset)+size {
return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
}
if isFullChunk {
fn(decryptedData)
} else {
fn(decryptedData[int(offset) : int(offset)+size])
}
return nil
return false, nil
}
func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {

3
weed/util/net_timeout.go

@ -54,7 +54,8 @@ func (c *Conn) Read(b []byte) (count int, e error) {
func (c *Conn) Write(b []byte) (count int, e error) {
if c.WriteTimeout != 0 {
err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
// minimum 4KB/s
err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Duration(len(b)/40000+1)))
if err != nil {
return 0, err
}

Loading…
Cancel
Save