Browse Source

Merge pull request #77 from chrislusf/master

sync
pull/2394/head
hilimd 4 years ago
committed by GitHub
parent
commit
9524b08bec
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 114
      .github/workflows/container_latest.yml
  2. 118
      .github/workflows/container_release.yml
  3. 2
      README.md
  4. 6
      docker/Dockerfile.go_build
  5. 6
      docker/Dockerfile.go_build_large
  6. 2
      docker/compose/local-mount-profile-compose.yml
  7. 6
      go.mod
  8. 2
      go.sum
  9. 4
      k8s/seaweedfs/Chart.yaml
  10. 2
      k8s/seaweedfs/values.yaml
  11. 60
      weed/command/filer_copy.go
  12. 2
      weed/filer/filerstore_wrapper.go
  13. 6
      weed/filer/reader_at.go
  14. 2
      weed/filesys/dir.go
  15. 9
      weed/filesys/dir_rename.go
  16. 3
      weed/filesys/dirty_page_interval.go
  17. 10
      weed/filesys/dirty_pages.go
  18. 32
      weed/filesys/dirty_pages_continuous.go
  19. 166
      weed/filesys/dirty_pages_temp_file.go
  20. 289
      weed/filesys/dirty_pages_temp_interval.go
  21. 10
      weed/filesys/file.go
  22. 25
      weed/filesys/filehandle.go
  23. 2
      weed/filesys/meta_cache/meta_cache.go
  24. 7
      weed/filesys/wfs.go
  25. 2
      weed/s3api/s3err/s3api_errors.go
  26. 16
      weed/server/common.go
  27. 2
      weed/server/volume_grpc_copy.go
  28. 13
      weed/statik/statik.go
  29. 4
      weed/storage/volume_vacuum.go
  30. 4
      weed/topology/node.go
  31. 8
      weed/topology/topology.go
  32. 11
      weed/topology/topology_event_handling.go
  33. 6
      weed/topology/volume_layout.go
  34. 2
      weed/util/bounded_tree/bounded_tree.go
  35. 2
      weed/util/constants.go
  36. 13
      weed/weed.go

114
.github/workflows/container_latest.yml

@ -0,0 +1,114 @@
name: Build Latest Containers
on:
push:
branches:
- master
workflow_dispatch: []
jobs:
build-latest:
runs-on: [ubuntu-latest]
steps:
-
name: Checkout
uses: actions/checkout@v2
-
name: Docker meta
id: docker_meta
uses: crazy-max/ghaction-docker-meta@v2
with:
images: |
chrislusf/seaweedfs
ghcr.io/chrislusf/seaweedfs
tags: |
type=raw,value=latest
labels: |
org.opencontainers.image.title=seaweedfs
org.opencontainers.image.vendor=Chris Lu
-
name: Set up QEMU
uses: docker/setup-qemu-action@v1
-
name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
with:
buildkitd-flags: "--debug"
-
name: Login to Docker Hub
if: github.event_name != 'pull_request'
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
-
name: Login to GHCR
if: github.event_name != 'pull_request'
uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{ secrets.GHCR_USERNAME }}
password: ${{ secrets.GHCR_TOKEN }}
-
name: Build
uses: docker/build-push-action@v2
with:
context: ./docker
push: ${{ github.event_name != 'pull_request' }}
file: ./docker/Dockerfile
platforms: linux/amd64
tags: ${{ steps.docker_meta.outputs.tags }}
labels: ${{ steps.docker_meta.outputs.labels }}
build-dev:
runs-on: [ubuntu-latest]
steps:
-
name: Checkout
uses: actions/checkout@v2
-
name: Docker meta
id: docker_meta
uses: crazy-max/ghaction-docker-meta@v2
with:
images: |
chrislusf/seaweedfs
ghcr.io/chrislusf/seaweedfs
tags: |
type=raw,value=dev
labels: |
org.opencontainers.image.title=seaweedfs
org.opencontainers.image.description=SeaweedFS is a distributed storage system for blobs, objects, files, and data lake, to store and serve billions of files fast!
org.opencontainers.image.vendor=Chris Lu
-
name: Set up QEMU
uses: docker/setup-qemu-action@v1
-
name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
with:
buildkitd-flags: "--debug"
-
name: Login to Docker Hub
if: github.event_name != 'pull_request'
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
-
name: Login to GHCR
if: github.event_name != 'pull_request'
uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{ secrets.GHCR_USERNAME }}
password: ${{ secrets.GHCR_TOKEN }}
-
name: Build
uses: docker/build-push-action@v2
with:
context: ./docker
push: ${{ github.event_name != 'pull_request' }}
file: ./docker/Dockerfile.go_build
platforms: linux/amd64
tags: ${{ steps.docker_meta.outputs.tags }}
labels: ${{ steps.docker_meta.outputs.labels }}

118
.github/workflows/container_release.yml

@ -0,0 +1,118 @@
name: Build Release Containers
on:
push:
tags:
- '*'
workflow_dispatch: []
jobs:
build-default:
runs-on: [ubuntu-latest]
steps:
-
name: Checkout
uses: actions/checkout@v2
-
name: Docker meta
id: docker_meta
uses: crazy-max/ghaction-docker-meta@v2
with:
images: |
chrislusf/seaweedfs
ghcr.io/chrislusf/seaweedfs
tags: |
type=ref,event=tag
flavor: |
latest=false
labels: |
org.opencontainers.image.title=seaweedfs
org.opencontainers.image.description=SeaweedFS is a distributed storage system for blobs, objects, files, and data lake, to store and serve billions of files fast!
org.opencontainers.image.vendor=Chris Lu
-
name: Set up QEMU
uses: docker/setup-qemu-action@v1
-
name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
with:
buildkitd-flags: "--debug"
-
name: Login to Docker Hub
if: github.event_name != 'pull_request'
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
-
name: Login to GHCR
if: github.event_name != 'pull_request'
uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{ secrets.GHCR_USERNAME }}
password: ${{ secrets.GHCR_TOKEN }}
-
name: Build
uses: docker/build-push-action@v2
with:
context: ./docker
push: ${{ github.event_name != 'pull_request' }}
file: ./docker/Dockerfile.go_build
platforms: linux/amd64
tags: ${{ steps.docker_meta.outputs.tags }}
labels: ${{ steps.docker_meta.outputs.labels }}
build-large:
runs-on: [ubuntu-latest]
steps:
-
name: Checkout
uses: actions/checkout@v2
-
name: Docker meta
id: docker_meta
uses: crazy-max/ghaction-docker-meta@v2
with:
images: |
chrislusf/seaweedfs
ghcr.io/chrislusf/seaweedfs
tags: |
type=ref,event=tag,suffix=_large_disk
flavor: |
latest=false
labels: |
org.opencontainers.image.title=seaweedfs
org.opencontainers.image.description=SeaweedFS is a distributed storage system for blobs, objects, files, and data lake, to store and serve billions of files fast!
org.opencontainers.image.vendor=Chris Lu
-
name: Set up QEMU
uses: docker/setup-qemu-action@v1
-
name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
with:
buildkitd-flags: "--debug"
-
name: Login to Docker Hub
if: github.event_name != 'pull_request'
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
-
name: Login to GHCR
if: github.event_name != 'pull_request'
uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{ secrets.GHCR_USERNAME }}
password: ${{ secrets.GHCR_TOKEN }}
-
name: Build
uses: docker/build-push-action@v2
with:
context: ./docker
push: ${{ github.event_name != 'pull_request' }}
file: ./docker/Dockerfile.go_build_large
platforms: linux/amd64
tags: ${{ steps.docker_meta.outputs.tags }}
labels: ${{ steps.docker_meta.outputs.labels }}

2
README.md

@ -437,7 +437,7 @@ SeaweedFS has a centralized master group to look up free volumes, while Ceph use
Same as SeaweedFS, Ceph is also based on the object store RADOS. Ceph is rather complicated with mixed reviews.
Ceph uses CRUSH hashing to automatically manage the data placement, which is efficient to locate the data. But the data has to be placed according to the CRUSH algorithm. Any wrong configuration would cause data loss. SeaweedFS places data by assigning them to any writable volumes. If writes to one volume failed, just pick another volume to write. Adding more volumes are also as simple as it can be.
Ceph uses CRUSH hashing to automatically manage the data placement, which is efficient to locate the data. But the data has to be placed according to the CRUSH algorithm. Any wrong configuration would cause data loss. Topology changes, such as adding new servers to increase capacity, will cause data migration with high IO cost to fit the CRUSH algorithm. SeaweedFS places data by assigning them to any writable volumes. If writes to one volume failed, just pick another volume to write. Adding more volumes are also as simple as it can be.
SeaweedFS is optimized for small files. Small files are stored as one continuous block of content, with at most 8 unused bytes between files. Small file access is O(1) disk read.

6
docker/Dockerfile.go_build

@ -1,5 +1,5 @@
FROM frolvlad/alpine-glibc as builder
RUN apk add git go g++ fuse
FROM amd64/golang:1.16-alpine as builder
RUN apk add git g++ fuse
RUN mkdir -p /go/src/github.com/chrislusf/
RUN git clone https://github.com/chrislusf/seaweedfs /go/src/github.com/chrislusf/seaweedfs
ARG BRANCH=${BRANCH:-master}
@ -10,7 +10,7 @@ RUN cd /go/src/github.com/chrislusf/seaweedfs/weed \
FROM alpine AS final
LABEL author="Chris Lu"
COPY --from=builder /root/go/bin/weed /usr/bin/
COPY --from=builder /go/bin/weed /usr/bin/
RUN mkdir -p /etc/seaweedfs
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/filer.toml /etc/seaweedfs/filer.toml
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/entrypoint.sh /entrypoint.sh

6
docker/Dockerfile.go_build_large

@ -1,5 +1,5 @@
FROM frolvlad/alpine-glibc as builder
RUN apk add git go g++ fuse
FROM amd64/golang:1.16-alpine as builder
RUN apk add git g++ fuse
RUN mkdir -p /go/src/github.com/chrislusf/
RUN git clone https://github.com/chrislusf/seaweedfs /go/src/github.com/chrislusf/seaweedfs
ARG BRANCH=${BRANCH:-master}
@ -10,7 +10,7 @@ RUN cd /go/src/github.com/chrislusf/seaweedfs/weed \
FROM alpine AS final
LABEL author="Chris Lu"
COPY --from=builder /root/go/bin/weed /usr/bin/
COPY --from=builder /go/bin/weed /usr/bin/
RUN mkdir -p /etc/seaweedfs
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/filer.toml /etc/seaweedfs/filer.toml
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/entrypoint.sh /entrypoint.sh

2
docker/compose/local-mount-profile-compose.yml

@ -40,7 +40,7 @@ services:
- fuse
volumes:
- /Volumes/mobile_disk/99:/data
entrypoint: '/bin/sh -c "mkdir -p t1 && weed mount -filer=filer:8888 -dir=./t1 -cacheCapacityMB=0 -memprofile=/data/mount.mem.pprof"'
entrypoint: '/bin/sh -c "mkdir -p t1 && weed -v=4 mount -filer=filer:8888 -dir=./t1 -cacheCapacityMB=0 -memprofile=/data/mount.mem.pprof"'
depends_on:
- master
- volume

6
go.mod

@ -1,6 +1,6 @@
module github.com/chrislusf/seaweedfs
go 1.12
go 1.16
require (
cloud.google.com/go v0.58.0 // indirect
@ -15,7 +15,7 @@ require (
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
github.com/bwmarrin/snowflake v0.3.0
github.com/cespare/xxhash v1.1.0
github.com/chrislusf/raft v1.0.6
github.com/chrislusf/raft v1.0.7
github.com/coreos/go-semver v0.3.0 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/disintegration/imaging v1.6.2
@ -60,7 +60,6 @@ require (
github.com/peterh/liner v1.1.0
github.com/pierrec/lz4 v2.2.7+incompatible // indirect
github.com/prometheus/client_golang v1.3.0
github.com/rakyll/statik v0.1.7
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect
github.com/seaweedfs/fuse v1.1.6
github.com/seaweedfs/goexif v1.0.2
@ -76,7 +75,6 @@ require (
github.com/tidwall/match v1.0.1
github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365
github.com/valyala/bytebufferpool v1.0.0
github.com/valyala/fasthttp v1.20.0
github.com/viant/assertly v0.5.4 // indirect
github.com/viant/ptrie v0.3.0
github.com/viant/toolbox v0.33.2 // indirect

2
go.sum

@ -159,6 +159,8 @@ github.com/chrislusf/raft v1.0.5 h1:g8GxKCSStfm0/bGBDpNEbmEXL6MJkpXX+NI0ksbX5D4=
github.com/chrislusf/raft v1.0.5/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
github.com/chrislusf/raft v1.0.6 h1:wunb85WWhMKhNRn7EmdIw35D4Lmew0ZJv8oYDizR/+Y=
github.com/chrislusf/raft v1.0.6/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
github.com/chrislusf/raft v1.0.7 h1:reybAIwnQOTSgTj1YgflbJFWLSN0KVQSxe8gDZYa04o=
github.com/chrislusf/raft v1.0.7/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=

4
k8s/seaweedfs/Chart.yaml

@ -1,5 +1,5 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
appVersion: "2.43"
version: 2.43
appVersion: "2.48"
version: 2.48

2
k8s/seaweedfs/values.yaml

@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
# imageTag: "2.43" - started using {.Chart.appVersion}
# imageTag: "2.48" - started using {.Chart.appVersion}
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always

60
weed/command/filer_copy.go

@ -3,6 +3,7 @@ package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"io"
"io/ioutil"
"net/http"
@ -46,6 +47,8 @@ type CopyOptions struct {
masters []string
cipher bool
ttlSec int32
checkSize *bool
verbose *bool
}
func init() {
@ -59,6 +62,8 @@ func init() {
copy.maxMB = cmdCopy.Flag.Int("maxMB", 4, "split files larger than the limit")
copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
copy.checkSize = cmdCopy.Flag.Bool("check.size", false, "copy when the target file size is different from the source file")
copy.verbose = cmdCopy.Flag.Bool("verbose", false, "print out details during copying")
}
var cmdCopy = &Command{
@ -220,9 +225,9 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi
if mode.IsDir() {
files, _ := ioutil.ReadDir(fileOrDir)
println("checking directory", fileOrDir)
for _, subFileOrDir := range files {
if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil {
cleanedDestDirectory := filepath.Clean(destPath + fi.Name())
if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), cleanedDestDirectory+"/", fileCopyTaskChan); err != nil {
return err
}
}
@ -275,6 +280,15 @@ func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error {
}
}
if shouldCopy, err := worker.checkExistingFileFirst(task, f); err != nil {
return fmt.Errorf("check existing file: %v", err)
} else if !shouldCopy {
if *worker.options.verbose {
fmt.Printf("skipping copied file: %v\n", f.Name())
}
return nil
}
// find the chunk count
chunkSize := int64(*worker.options.maxMB * 1024 * 1024)
chunkCount := 1
@ -289,6 +303,42 @@ func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error {
return worker.uploadFileInChunks(task, f, chunkCount, chunkSize)
}
func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.File) (shouldCopy bool, err error) {
shouldCopy = true
if !*worker.options.checkSize {
return
}
fileStat, err := f.Stat()
if err != nil {
shouldCopy = false
return
}
err = pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: task.destinationUrlPath,
Name: filepath.Base(f.Name()),
}
resp, lookupErr := client.LookupDirectoryEntry(context.Background(), request)
if lookupErr != nil {
// mostly not found error
return nil
}
if fileStat.Size() == int64(filer.FileSize(resp.Entry)) {
shouldCopy = false
}
return nil
})
return
}
func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) error {
// upload the file content
@ -343,11 +393,13 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
if uploadResult.Error != "" {
return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
}
if *worker.options.verbose {
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
}
chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName)
}
if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@ -501,7 +553,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
}
fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName)
return nil
}

2
weed/filer/filerstore_wrapper.go

@ -150,7 +150,7 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (
}()
entry, err = actualStore.FindEntry(ctx, fp)
glog.V(4).Infof("FindEntry %s: %v", fp, err)
// glog.V(4).Infof("FindEntry %s: %v", fp, err)
if err != nil {
return nil, err
}

6
weed/filer/reader_at.go

@ -106,7 +106,7 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerLock.Lock()
defer c.readerLock.Unlock()
glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
return c.doReadAt(p, offset)
}
@ -137,7 +137,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
if chunkStart >= chunkStop {
continue
}
glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
// glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
var buffer []byte
bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
bufferLength := chunkStop - chunkStart
@ -152,7 +152,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
}
glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
// glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
if err == nil && remaining > 0 && c.fileSize > startOffset {
delta := int(min(remaining, c.fileSize-startOffset))

2
weed/filesys/dir.go

@ -296,7 +296,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
dirPath := util.FullPath(dir.FullPath())
glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String())
// glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String())
fullFilePath := dirPath.Child(req.Name)
visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)

9
weed/filesys/dir_rename.go

@ -68,9 +68,16 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
newFsNode := NodeWithId(newPath.AsInode())
dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) {
if file, ok := internalNode.(*File); ok {
glog.V(4).Infof("internal node %s", file.Name)
glog.V(4).Infof("internal file node %s", file.Name)
file.Name = req.NewName
file.id = uint64(newFsNode)
file.dir = newDir
}
if dir, ok := internalNode.(*Dir); ok {
glog.V(4).Infof("internal dir node %s", dir.name)
dir.name = req.NewName
dir.id = uint64(newFsNode)
dir.parent = newDir
}
})

3
weed/filesys/dirty_page_interval.go

@ -1,7 +1,6 @@
package filesys
import (
"bytes"
"io"
"github.com/chrislusf/seaweedfs/weed/util"
@ -214,7 +213,7 @@ func (l *IntervalLinkedList) ToReader() io.Reader {
readers = append(readers, util.NewBytesReader(t.Data))
for t.Next != nil {
t = t.Next
readers = append(readers, bytes.NewReader(t.Data))
readers = append(readers, util.NewBytesReader(t.Data))
}
if len(readers) == 1 {
return readers[0]

10
weed/filesys/dirty_pages.go

@ -0,0 +1,10 @@
package filesys
type DirtyPages interface {
AddPage(offset int64, data []byte)
FlushData() error
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
GetStorageOptions() (collection, replication string)
SetWriteOnly(writeOnly bool)
GetWriteOnly() (writeOnly bool)
}

32
weed/filesys/dirty_page.go → weed/filesys/dirty_pages_continuous.go

@ -2,6 +2,7 @@ package filesys
import (
"bytes"
"fmt"
"io"
"sync"
"time"
@ -13,7 +14,7 @@ import (
type ContinuousDirtyPages struct {
intervals *ContinuousIntervals
f *File
fh *FileHandle
writeOnly bool
writeWaitGroup sync.WaitGroup
chunkAddLock sync.Mutex
lastErr error
@ -21,10 +22,11 @@ type ContinuousDirtyPages struct {
replication string
}
func newDirtyPages(file *File) *ContinuousDirtyPages {
func newContinuousDirtyPages(file *File, writeOnly bool) *ContinuousDirtyPages {
dirtyPages := &ContinuousDirtyPages{
intervals: &ContinuousIntervals{},
f: file,
writeOnly: writeOnly,
}
return dirtyPages
}
@ -58,6 +60,16 @@ func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) {
return
}
func (pages *ContinuousDirtyPages) FlushData() error {
pages.saveExistingPagesToStorage()
pages.writeWaitGroup.Wait()
if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr)
}
return nil
}
func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() {
for pages.saveExistingLargestPageToStorage() {
}
@ -95,7 +107,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
defer pages.writeWaitGroup.Done()
reader = io.LimitReader(reader, size)
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.fh.writeOnly)(reader, pages.f.Name, offset)
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset)
if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
pages.lastErr = err
@ -132,3 +144,17 @@ func min(x, y int64) int64 {
func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
return pages.intervals.ReadDataAt(data, startOffset)
}
func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) {
return pages.collection, pages.replication
}
func (pages *ContinuousDirtyPages) SetWriteOnly(writeOnly bool) {
if pages.writeOnly {
pages.writeOnly = writeOnly
}
}
func (pages *ContinuousDirtyPages) GetWriteOnly() (writeOnly bool) {
return pages.writeOnly
}

166
weed/filesys/dirty_pages_temp_file.go

@ -0,0 +1,166 @@
package filesys
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"os"
"path/filepath"
"sync"
"time"
)
type TempFileDirtyPages struct {
f *File
tf *os.File
writtenIntervals *WrittenContinuousIntervals
writeOnly bool
writeWaitGroup sync.WaitGroup
pageAddLock sync.Mutex
chunkAddLock sync.Mutex
lastErr error
collection string
replication string
}
var (
tmpDir = filepath.Join(os.TempDir(), "sw")
)
func init() {
os.Mkdir(tmpDir, 0755)
}
func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages {
tempFile := &TempFileDirtyPages{
f: file,
writeOnly: writeOnly,
writtenIntervals: &WrittenContinuousIntervals{},
}
return tempFile
}
func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
pages.pageAddLock.Lock()
defer pages.pageAddLock.Unlock()
if pages.tf == nil {
tf, err := os.CreateTemp(tmpDir, "")
if err != nil {
glog.Errorf("create temp file: %v", err)
pages.lastErr = err
return
}
pages.tf = tf
pages.writtenIntervals.tempFile = tf
pages.writtenIntervals.lastOffset = 0
}
writtenOffset := pages.writtenIntervals.lastOffset
dataSize := int64(len(data))
// glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize)
if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil {
pages.lastErr = err
} else {
pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset)
pages.writtenIntervals.lastOffset += dataSize
}
// pages.writtenIntervals.debug()
return
}
func (pages *TempFileDirtyPages) FlushData() error {
pages.saveExistingPagesToStorage()
pages.writeWaitGroup.Wait()
if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr)
}
pages.pageAddLock.Lock()
defer pages.pageAddLock.Unlock()
if pages.tf != nil {
pages.writtenIntervals.tempFile = nil
pages.writtenIntervals.lists = nil
pages.tf.Close()
os.Remove(pages.tf.Name())
pages.tf = nil
}
return nil
}
func (pages *TempFileDirtyPages) saveExistingPagesToStorage() {
pageSize := pages.f.wfs.option.ChunkSizeLimit
// glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists))
for _, list := range pages.writtenIntervals.lists {
listStopOffset := list.Offset() + list.Size()
for uploadedOffset:=int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize)
if start >= stop {
continue
}
// glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists))
pages.saveToStorage(list.ToReader(start, stop), start, stop-start)
}
}
}
func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
mtime := time.Now().UnixNano()
pages.writeWaitGroup.Add(1)
writer := func() {
defer pages.writeWaitGroup.Done()
reader = io.LimitReader(reader, size)
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset)
if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
pages.lastErr = err
return
}
chunk.Mtime = mtime
pages.collection, pages.replication = collection, replication
pages.chunkAddLock.Lock()
defer pages.chunkAddLock.Unlock()
pages.f.addChunks([]*filer_pb.FileChunk{chunk})
glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size)
}
if pages.f.wfs.concurrentWriters != nil {
pages.f.wfs.concurrentWriters.Execute(writer)
} else {
go writer()
}
}
func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
return pages.writtenIntervals.ReadDataAt(data, startOffset)
}
func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
return pages.collection, pages.replication
}
func (pages *TempFileDirtyPages) SetWriteOnly(writeOnly bool) {
if pages.writeOnly {
pages.writeOnly = writeOnly
}
}
func (pages *TempFileDirtyPages) GetWriteOnly() (writeOnly bool) {
return pages.writeOnly
}

289
weed/filesys/dirty_pages_temp_interval.go

@ -0,0 +1,289 @@
package filesys
import (
"io"
"log"
"os"
)
type WrittenIntervalNode struct {
DataOffset int64
TempOffset int64
Size int64
Next *WrittenIntervalNode
}
type WrittenIntervalLinkedList struct {
tempFile *os.File
Head *WrittenIntervalNode
Tail *WrittenIntervalNode
}
type WrittenContinuousIntervals struct {
tempFile *os.File
lastOffset int64
lists []*WrittenIntervalLinkedList
}
func (list *WrittenIntervalLinkedList) Offset() int64 {
return list.Head.DataOffset
}
func (list *WrittenIntervalLinkedList) Size() int64 {
return list.Tail.DataOffset + list.Tail.Size - list.Head.DataOffset
}
func (list *WrittenIntervalLinkedList) addNodeToTail(node *WrittenIntervalNode) {
// glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size)
if list.Tail.TempOffset+list.Tail.Size == node.TempOffset {
// already connected
list.Tail.Size += node.Size
} else {
list.Tail.Next = node
list.Tail = node
}
}
func (list *WrittenIntervalLinkedList) addNodeToHead(node *WrittenIntervalNode) {
// glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size)
node.Next = list.Head
list.Head = node
}
func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) {
t := list.Head
for {
nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size)
if nodeStart < nodeStop {
// glog.V(4).Infof("copying start=%d stop=%d t=[%d,%d) => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.DataOffset, t.DataOffset+t.Size, len(buf), nodeStart, nodeStop)
list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], t.TempOffset + nodeStart - t.DataOffset)
}
if t.Next == nil {
break
}
t = t.Next
}
}
func (c *WrittenContinuousIntervals) TotalSize() (total int64) {
for _, list := range c.lists {
total += list.Size()
}
return
}
func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenIntervalLinkedList {
var nodes []*WrittenIntervalNode
for t := list.Head; t != nil; t = t.Next {
nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size)
if nodeStart >= nodeStop {
// skip non overlapping WrittenIntervalNode
continue
}
nodes = append(nodes, &WrittenIntervalNode{
TempOffset: t.TempOffset + nodeStart - t.DataOffset,
DataOffset: nodeStart,
Size: nodeStop - nodeStart,
Next: nil,
})
}
for i := 1; i < len(nodes); i++ {
nodes[i-1].Next = nodes[i]
}
return &WrittenIntervalLinkedList{
tempFile: list.tempFile,
Head: nodes[0],
Tail: nodes[len(nodes)-1],
}
}
func (c *WrittenContinuousIntervals) debug() {
log.Printf("++")
for _, l := range c.lists {
log.Printf("++++")
for t := l.Head; ; t = t.Next {
log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
if t.Next == nil {
break
}
}
log.Printf("----")
}
log.Printf("--")
}
func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, dataOffset int64) {
interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)}
// append to the tail and return
if len(c.lists) == 1 {
lastSpan := c.lists[0]
if lastSpan.Tail.DataOffset+lastSpan.Tail.Size == dataOffset {
lastSpan.addNodeToTail(interval)
return
}
}
var newLists []*WrittenIntervalLinkedList
for _, list := range c.lists {
// if list is to the left of new interval, add to the new list
if list.Tail.DataOffset+list.Tail.Size <= interval.DataOffset {
newLists = append(newLists, list)
}
// if list is to the right of new interval, add to the new list
if interval.DataOffset+interval.Size <= list.Head.DataOffset {
newLists = append(newLists, list)
}
// if new interval overwrite the right part of the list
if list.Head.DataOffset < interval.DataOffset && interval.DataOffset < list.Tail.DataOffset+list.Tail.Size {
// create a new list of the left part of existing list
newLists = append(newLists, list.subList(list.Offset(), interval.DataOffset))
}
// if new interval overwrite the left part of the list
if list.Head.DataOffset < interval.DataOffset+interval.Size && interval.DataOffset+interval.Size < list.Tail.DataOffset+list.Tail.Size {
// create a new list of the right part of existing list
newLists = append(newLists, list.subList(interval.DataOffset+interval.Size, list.Tail.DataOffset+list.Tail.Size))
}
// skip anything that is fully overwritten by the new interval
}
c.lists = newLists
// add the new interval to the lists, connecting neighbor lists
var prevList, nextList *WrittenIntervalLinkedList
for _, list := range c.lists {
if list.Head.DataOffset == interval.DataOffset+interval.Size {
nextList = list
break
}
}
for _, list := range c.lists {
if list.Head.DataOffset+list.Size() == dataOffset {
list.addNodeToTail(interval)
prevList = list
break
}
}
if prevList != nil && nextList != nil {
// glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size)
prevList.Tail.Next = nextList.Head
prevList.Tail = nextList.Tail
c.removeList(nextList)
} else if nextList != nil {
// add to head was not done when checking
nextList.addNodeToHead(interval)
}
if prevList == nil && nextList == nil {
c.lists = append(c.lists, &WrittenIntervalLinkedList{
tempFile: c.tempFile,
Head: interval,
Tail: interval,
})
}
return
}
func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenIntervalLinkedList {
var maxSize int64
maxIndex := -1
for k, list := range c.lists {
if maxSize <= list.Size() {
maxSize = list.Size()
maxIndex = k
}
}
if maxSize <= 0 {
return nil
}
t := c.lists[maxIndex]
t.tempFile = c.tempFile
c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...)
return t
}
func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedList) {
index := -1
for k, list := range c.lists {
if list.Offset() == target.Offset() {
index = k
}
}
if index < 0 {
return
}
c.lists = append(c.lists[0:index], c.lists[index+1:]...)
}
func (c *WrittenContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) {
for _, list := range c.lists {
start := max(startOffset, list.Offset())
stop := min(startOffset+int64(len(data)), list.Offset()+list.Size())
if start < stop {
list.ReadData(data[start-startOffset:], start, stop)
maxStop = max(maxStop, stop)
}
}
return
}
func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader {
// TODO: optimize this to avoid another loop
var readers []io.Reader
for t := l.Head; ; t = t.Next {
startOffset, stopOffset := max(t.DataOffset, start), min(t.DataOffset+t.Size, stop)
if startOffset < stopOffset {
// glog.V(4).Infof("ToReader read [%d,%d) from [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
readers = append(readers, newFileSectionReader(l.tempFile, startOffset-t.DataOffset+t.TempOffset, startOffset, stopOffset-startOffset))
}
if t.Next == nil {
break
}
}
if len(readers) == 1 {
return readers[0]
}
return io.MultiReader(readers...)
}
type FileSectionReader struct {
file *os.File
tempStartOffset int64
Offset int64
dataStart int64
dataStop int64
}
var _ = io.Reader(&FileSectionReader{})
func newFileSectionReader(tempfile *os.File, offset int64, dataOffset int64, size int64) *FileSectionReader {
return &FileSectionReader{
file: tempfile,
tempStartOffset: offset,
Offset: offset,
dataStart: dataOffset,
dataStop: dataOffset + size,
}
}
func (f *FileSectionReader) Read(p []byte) (n int, err error) {
remaining := (f.dataStop - f.dataStart) - (f.Offset - f.tempStartOffset)
if remaining <= 0 {
return 0, io.EOF
}
dataLen := min(remaining, int64(len(p)))
// glog.V(4).Infof("reading [%d,%d) from %v [%d,%d)/[%d,%d) %d", f.Offset-f.tempStartOffset+f.dataStart, f.Offset-f.tempStartOffset+f.dataStart+dataLen, f.file.Name(), f.Offset, f.Offset+dataLen, f.tempStartOffset, f.tempStartOffset+f.dataStop-f.dataStart, f.dataStop-f.dataStart)
n, err = f.file.ReadAt(p[:dataLen], f.Offset)
if n > 0 {
f.Offset += int64(n)
} else {
err = io.EOF
}
return
}

10
weed/filesys/file.go

@ -83,7 +83,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
glog.V(4).Infof("file Getxattr %s", file.fullpath())
// glog.V(4).Infof("file Getxattr %s", file.fullpath())
entry, err := file.maybeLoadEntry(ctx)
if err != nil {
@ -267,7 +267,7 @@ func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, er
file.wfs.handlesLock.Unlock()
entry = file.entry
if found {
glog.V(4).Infof("maybeLoadEntry found opened file %s/%s", file.dir.FullPath(), file.Name)
// glog.V(4).Infof("maybeLoadEntry found opened file %s/%s", file.dir.FullPath(), file.Name)
entry = handle.f.entry
}
@ -336,20 +336,20 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error {
file.wfs.mapPbIdFromLocalToFiler(entry)
defer file.wfs.mapPbIdFromFilerToLocal(entry)
request := &filer_pb.UpdateEntryRequest{
request := &filer_pb.CreateEntryRequest{
Directory: file.dir.FullPath(),
Entry: entry,
Signatures: []int32{file.wfs.signature},
}
glog.V(4).Infof("save file entry: %v", request)
_, err := client.UpdateEntry(context.Background(), request)
_, err := client.CreateEntry(context.Background(), request)
if err != nil {
glog.Errorf("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
return fuse.EIO
}
file.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
file.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
})

25
weed/filesys/filehandle.go

@ -20,7 +20,7 @@ import (
type FileHandle struct {
// cache file has been written to
dirtyPages *ContinuousDirtyPages
dirtyPages DirtyPages
entryViewCache []filer.VisibleInterval
reader io.ReaderAt
contentType string
@ -35,14 +35,14 @@ type FileHandle struct {
writeOnly bool
}
func newFileHandle(file *File, uid, gid uint32) *FileHandle {
func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle {
fh := &FileHandle{
f: file,
dirtyPages: newDirtyPages(file),
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
dirtyPages: newTempFileDirtyPages(file, writeOnly),
Uid: uid,
Gid: gid,
}
fh.dirtyPages.fh = fh
entry := fh.f.getEntry()
if entry != nil {
entry.Attributes.FileSize = filer.FileSize(entry)
@ -149,7 +149,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
glog.Errorf("file handle read %s: %v", fileFullPath, err)
}
glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
// glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
return int64(totalRead), err
}
@ -175,7 +175,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
entry.Content = nil
entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
// glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(req.Offset, data)
@ -239,12 +239,8 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
// send the data to the OS
glog.V(4).Infof("doFlush %s fh %d", fh.f.fullpath(), fh.handle)
fh.dirtyPages.saveExistingPagesToStorage()
fh.dirtyPages.writeWaitGroup.Wait()
if fh.dirtyPages.lastErr != nil {
glog.Errorf("%v doFlush last err: %v", fh.f.fullpath(), fh.dirtyPages.lastErr)
if err := fh.dirtyPages.FlushData(); err != nil {
glog.Errorf("%v doFlush: %v", fh.f.fullpath(), err)
return fuse.EIO
}
@ -272,8 +268,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
}
entry.Attributes.Mtime = time.Now().Unix()
entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
entry.Attributes.Collection = fh.dirtyPages.collection
entry.Attributes.Replication = fh.dirtyPages.replication
entry.Attributes.Collection, entry.Attributes.Replication = fh.dirtyPages.GetStorageOptions()
}
request := &filer_pb.CreateEntryRequest{
@ -290,7 +285,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath(), fh.writeOnly), chunks)
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath(), fh.dirtyPages.GetWriteOnly()), chunks)
if manifestErr != nil {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)

2
weed/filesys/meta_cache/meta_cache.go

@ -74,7 +74,7 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti
// skip the unnecessary deletion
// leave the update to the following InsertEntry operation
} else {
glog.V(3).Infof("DeleteEntry %s/%s", oldPath, oldPath.Name())
glog.V(3).Infof("DeleteEntry %s", oldPath)
if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
return err
}

7
weed/filesys/wfs.go

@ -150,17 +150,14 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (file
wfs.handlesLock.Unlock()
if found && existingHandle != nil {
existingHandle.f.isOpen++
if existingHandle.writeOnly {
existingHandle.writeOnly = writeOnly
}
existingHandle.dirtyPages.SetWriteOnly(writeOnly)
glog.V(4).Infof("Acquired Handle %s open %d", fullpath, existingHandle.f.isOpen)
return existingHandle
}
entry, _ := file.maybeLoadEntry(context.Background())
file.entry = entry
fileHandle = newFileHandle(file, uid, gid)
fileHandle.writeOnly = writeOnly
fileHandle = newFileHandle(file, uid, gid, writeOnly)
file.isOpen++
wfs.handlesLock.Lock()

2
weed/s3api/s3err/s3api_errors.go

@ -116,7 +116,7 @@ var errorCodeResponse = map[ErrorCode]APIError{
},
ErrBucketAlreadyExists: {
Code: "BucketAlreadyExists",
Description: "The requested bucket name is not available. The bucket namespace is shared by all users of the system. Please select a different name and try again.",
Description: "The requested bucket name is not available. The bucket name can not be an existing collection, and the bucket namespace is shared by all users of the system. Please select a different name and try again.",
HTTPStatusCode: http.StatusConflict,
},
ErrBucketAlreadyOwnedByYou: {

16
weed/server/common.go

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"mime/multipart"
"net/http"
"path/filepath"
@ -21,19 +22,14 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
statik "github.com/rakyll/statik/fs"
_ "github.com/chrislusf/seaweedfs/weed/statik"
)
var serverStats *stats.ServerStats
var startTime = time.Now()
var statikFS http.FileSystem
func init() {
serverStats = stats.NewServerStats()
go serverStats.Start()
statikFS, _ = statik.New()
}
func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) {
@ -212,14 +208,16 @@ func statsMemoryHandler(w http.ResponseWriter, r *http.Request) {
writeJsonQuiet(w, r, http.StatusOK, m)
}
var StaticFS fs.FS
func handleStaticResources(defaultMux *http.ServeMux) {
defaultMux.Handle("/favicon.ico", http.FileServer(statikFS))
defaultMux.Handle("/seaweedfsstatic/", http.StripPrefix("/seaweedfsstatic", http.FileServer(statikFS)))
defaultMux.Handle("/favicon.ico", http.FileServer(http.FS(StaticFS)))
defaultMux.Handle("/seaweedfsstatic/", http.StripPrefix("/seaweedfsstatic", http.FileServer(http.FS(StaticFS))))
}
func handleStaticResources2(r *mux.Router) {
r.Handle("/favicon.ico", http.FileServer(statikFS))
r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(statikFS)))
r.Handle("/favicon.ico", http.FileServer(http.FS(StaticFS)))
r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(http.FS(StaticFS))))
}
func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, filename string) {

2
weed/server/volume_grpc_copy.go

@ -151,7 +151,7 @@ todo: maybe should check the received count and deleted count of the volume
func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error {
stat, err := os.Stat(idxFileName)
if err != nil {
return fmt.Errorf("stat idx file %s failed, %v", idxFileName, err)
return fmt.Errorf("stat idx file %s failed: %v", idxFileName, err)
}
if originFileInf.IdxFileSize != uint64(stat.Size()) {
return fmt.Errorf("idx file %s size [%v] is not same as origin file size [%v]",

13
weed/statik/statik.go
File diff suppressed because it is too large
View File

4
weed/storage/volume_vacuum.go

@ -158,10 +158,10 @@ func (v *Volume) cleanupCompact() error {
e1 := os.Remove(v.FileName(".cpd"))
e2 := os.Remove(v.FileName(".cpx"))
if e1 != nil {
if e1 != nil && !os.IsNotExist(e1) {
return e1
}
if e2 != nil {
if e2 != nil && !os.IsNotExist(e2) {
return e2
}
return nil

4
weed/topology/node.go

@ -242,9 +242,9 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
for _, v := range dn.GetVolumes() {
if v.Size >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
n.GetTopology().chanFullVolumes <- &v
n.GetTopology().chanFullVolumes <- v
}else if float64(v.Size) > float64(volumeSizeLimit) * growThreshold {
n.GetTopology().chanCrowdedVolumes <- &v
n.GetTopology().chanCrowdedVolumes <- v
}
}
}

8
weed/topology/topology.go

@ -34,8 +34,8 @@ type Topology struct {
Sequence sequence.Sequencer
chanFullVolumes chan *storage.VolumeInfo
chanCrowdedVolumes chan *storage.VolumeInfo
chanFullVolumes chan storage.VolumeInfo
chanCrowdedVolumes chan storage.VolumeInfo
Configuration *Configuration
@ -57,8 +57,8 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
t.Sequence = seq
t.chanFullVolumes = make(chan *storage.VolumeInfo)
t.chanCrowdedVolumes = make(chan *storage.VolumeInfo)
t.chanFullVolumes = make(chan storage.VolumeInfo)
t.chanCrowdedVolumes = make(chan storage.VolumeInfo)
t.Configuration = &Configuration{}

11
weed/topology/topology_event_handling.go

@ -39,7 +39,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
}
}()
}
func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) bool {
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
diskType := types.ToDiskType(volumeInfo.DiskType)
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, diskType)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
@ -49,7 +49,12 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) bool {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
for _, dn := range vl.vid2location[volumeInfo.Id].list {
vidLocations, found := vl.vid2location[volumeInfo.Id]
if !found {
return false
}
for _, dn := range vidLocations.list {
if !volumeInfo.ReadOnly {
disk := dn.getOrCreateDisk(volumeInfo.DiskType)
@ -63,7 +68,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) bool {
return true
}
func (t *Topology) SetVolumeCrowded(volumeInfo *storage.VolumeInfo) {
func (t *Topology) SetVolumeCrowded(volumeInfo storage.VolumeInfo) {
diskType := types.ToDiskType(volumeInfo.DiskType)
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, diskType)
vl.SetVolumeCrowded(volumeInfo.Id)

6
weed/topology/volume_layout.go

@ -108,7 +108,7 @@ type VolumeLayout struct {
diskType types.DiskType
vid2location map[needle.VolumeId]*VolumeLocationList
writables []needle.VolumeId // transient array of writable volume id
crowded map[needle.VolumeId]interface{}
crowded map[needle.VolumeId]struct{}
readonlyVolumes *volumesBinaryState // readonly volumes
oversizedVolumes *volumesBinaryState // oversized volumes
volumeSizeLimit uint64
@ -129,7 +129,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType
diskType: diskType,
vid2location: make(map[needle.VolumeId]*VolumeLocationList),
writables: *new([]needle.VolumeId),
crowded: make(map[needle.VolumeId]interface{}),
crowded: make(map[needle.VolumeId]struct{}),
readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()),
oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()),
volumeSizeLimit: volumeSizeLimit,
@ -421,7 +421,7 @@ func (vl *VolumeLayout) removeFromCrowded(vid needle.VolumeId) {
func (vl *VolumeLayout) setVolumeCrowded(vid needle.VolumeId) {
if _, ok := vl.crowded[vid]; !ok {
vl.crowded[vid] = nil
vl.crowded[vid] = struct{}{}
glog.V(0).Infoln("Volume", vid, "becomes crowded")
}
}

2
weed/util/bounded_tree/bounded_tree.go

@ -43,7 +43,7 @@ func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) (vis
}
components := p.Split()
// fmt.Printf("components %v %d\n", components, len(components))
canDelete, err := t.ensureVisited(t.root, t.baseDir, components, 0, visitFn)
canDelete, err := t.ensureVisited(t.root, util.FullPath("/"), components, 0, visitFn)
if err != nil {
return err
}

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
)
var (
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 43)
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 48)
COMMIT = ""
)

13
weed/weed.go

@ -1,12 +1,12 @@
//go:generate statik -src=./static
// install this first "go get github.com/rakyll/statik"
package main
import (
"embed"
"fmt"
weed_server "github.com/chrislusf/seaweedfs/weed/server"
flag "github.com/chrislusf/seaweedfs/weed/util/fla9"
"io"
"io/fs"
"math/rand"
"os"
"strings"
@ -35,6 +35,13 @@ func setExitStatus(n int) {
exitMu.Unlock()
}
//go:embed static
var static embed.FS
func init() {
weed_server.StaticFS, _ = fs.Sub(static, "static")
}
func main() {
glog.MaxSize = 1024 * 1024 * 32
rand.Seed(time.Now().UnixNano())

Loading…
Cancel
Save