Browse Source

Merge branch 'master' into master

pull/455/head
316014408 9 years ago
committed by GitHub
parent
commit
752afa2db2
  1. 1
      .gitignore
  2. 29
      .travis.yml
  3. 93
      Makefile
  4. 7
      README.md
  5. 19
      docker/Dockerfile
  6. 8
      docker/entrypoint.sh
  7. 29
      unmaintained/change_superblock/change_superblock.go
  8. 2
      unmaintained/fix_dat/fix_dat.go
  9. 2
      weed/command/backup.go
  10. 3
      weed/command/compact.go
  11. 2
      weed/command/filer_copy.go
  12. 2
      weed/command/fix.go
  13. 33
      weed/command/master.go
  14. 34
      weed/command/server.go
  15. 8
      weed/command/upload.go
  16. 2
      weed/command/volume.go
  17. 439
      weed/filer/postgres_store/postgres_store.go
  18. 3
      weed/images/orientation_test.go
  19. 78
      weed/operation/submit.go
  20. 203
      weed/operation/system_message.pb.go
  21. 59
      weed/operation/system_message_test.go
  22. 21
      weed/operation/upload_content.go
  23. 6
      weed/pb/Makefile
  24. 375
      weed/pb/seaweed.pb.go
  25. 40
      weed/pb/seaweed.proto
  26. 4
      weed/proto/Makefile
  27. 27
      weed/proto/system_message.proto
  28. 4
      weed/server/common.go
  29. 7
      weed/server/filer_server.go
  30. 2
      weed/server/filer_server_handlers_read.go
  31. 8
      weed/server/filer_server_handlers_write.go
  32. 69
      weed/server/master_grpc_server.go
  33. 9
      weed/server/master_server.go
  34. 44
      weed/server/master_server_handlers_admin.go
  35. 88
      weed/server/volume_grpc_client.go
  36. 34
      weed/server/volume_server.go
  37. 23
      weed/server/volume_server_handlers_admin.go
  38. 13
      weed/server/volume_server_handlers_read.go
  39. 9
      weed/storage/disk_location.go
  40. 47
      weed/storage/needle.go
  41. 2
      weed/storage/needle_map.go
  42. 6
      weed/storage/needle_map_boltdb.go
  43. 6
      weed/storage/needle_map_leveldb.go
  44. 6
      weed/storage/needle_map_memory.go
  45. 44
      weed/storage/needle_read_write.go
  46. 111
      weed/storage/store.go
  47. 5
      weed/storage/volume.go
  48. 22
      weed/storage/volume_checking.go
  49. 17
      weed/storage/volume_create.go
  50. 19
      weed/storage/volume_create_linux.go
  51. 25
      weed/storage/volume_info.go
  52. 9
      weed/storage/volume_loading.go
  53. 30
      weed/storage/volume_read_write.go
  54. 1
      weed/storage/volume_super_block.go
  55. 2
      weed/storage/volume_sync.go
  56. 18
      weed/storage/volume_vacuum.go
  57. 1
      weed/topology/allocate_volume.go
  58. 3
      weed/topology/data_node.go
  59. 8
      weed/topology/node.go
  60. 5
      weed/topology/rack.go
  61. 15
      weed/topology/store_replicate.go
  62. 38
      weed/topology/topology.go
  63. 16
      weed/topology/topology_event_handling.go
  64. 1
      weed/topology/volume_growth.go
  65. 2
      weed/util/constants.go
  66. 5
      weed/util/http_util.go
  67. 4
      weed/util/net_timeout.go

1
.gitignore

@ -76,3 +76,4 @@ crashlytics.properties
crashlytics-build.properties
test_data
build

29
.travis.yml

@ -13,3 +13,32 @@ install:
script:
- go test ./weed/...
before_deploy:
- make release
deploy:
provider: releases
skip_cleanup: true
api_key:
secure: ERL986+ncQ8lwAJUYDrQ8s2/FxF/cyNIwJIFCqspnWxQgGNNyokET9HapmlPSxjpFRF0q6L2WCg9OY3mSVRq4oI6hg1igOQ12KlLyN71XSJ3c8w0Ay5ho48TQ9l3f3Iu97mntBCe9l0R9pnT8wj1VI8YJxloXwUMG2yeTjA9aBI=
file:
- build/linux_arm.tar.gz
- build/linux_arm64.tar.gz
- build/linux_386.tar.gz
- build/linux_amd64.tar.gz
- build/darwin_amd64.tar.gz
- build/windows_386.zip
- build/windows_amd64.zip
- build/freebsd_arm.tar.gz
- build/freebsd_amd64.tar.gz
- build/freebsd_386.tar.gz
- build/netbsd_arm.tar.gz
- build/netbsd_amd64.tar.gz
- build/netbsd_386.tar.gz
- build/openbsd_arm.tar.gz
- build/openbsd_amd64.tar.gz
- build/openbsd_386.tar.gz
on:
tags: true
repo: chrislusf/seaweedfs
go: tip

93
Makefile

@ -1,15 +1,26 @@
BINARY = weed/weed
package = github.com/chrislusf/seaweedfs/weed
GO_FLAGS = #-v
SOURCE_DIR = ./weed/
appname := weed
sources := $(wildcard *.go)
build = GOOS=$(1) GOARCH=$(2) go build -o build/$(appname)$(3) $(SOURCE_DIR)
tar = cd build && tar -cvzf $(1)_$(2).tar.gz $(appname)$(3) && rm $(appname)$(3)
zip = cd build && zip $(1)_$(2).zip $(appname)$(3) && rm $(appname)$(3)
all: build
.PHONY : clean deps build linux
.PHONY : clean deps build linux release windows_build darwin_build linux_build bsd_build clean
clean:
go clean -i $(GO_FLAGS) $(SOURCE_DIR)
rm -f $(BINARY)
rm -rf build/
deps:
go get $(GO_FLAGS) -d $(SOURCE_DIR)
@ -20,3 +31,83 @@ build: deps
linux: deps
mkdir -p linux
GOOS=linux GOARCH=amd64 go build $(GO_FLAGS) -o linux/$(BINARY) $(SOURCE_DIR)
release: deps windows_build darwin_build linux_build bsd_build
##### LINUX BUILDS #####
linux_build: build/linux_arm.tar.gz build/linux_arm64.tar.gz build/linux_386.tar.gz build/linux_amd64.tar.gz
build/linux_386.tar.gz: $(sources)
$(call build,linux,386,)
$(call tar,linux,386)
build/linux_amd64.tar.gz: $(sources)
$(call build,linux,amd64,)
$(call tar,linux,amd64)
build/linux_arm.tar.gz: $(sources)
$(call build,linux,arm,)
$(call tar,linux,arm)
build/linux_arm64.tar.gz: $(sources)
$(call build,linux,arm64,)
$(call tar,linux,arm64)
##### DARWIN (MAC) BUILDS #####
darwin_build: build/darwin_amd64.tar.gz
build/darwin_amd64.tar.gz: $(sources)
$(call build,darwin,amd64,)
$(call tar,darwin,amd64)
##### WINDOWS BUILDS #####
windows_build: build/windows_386.zip build/windows_amd64.zip
build/windows_386.zip: $(sources)
$(call build,windows,386,.exe)
$(call zip,windows,386,.exe)
build/windows_amd64.zip: $(sources)
$(call build,windows,amd64,.exe)
$(call zip,windows,amd64,.exe)
##### BSD BUILDS #####
bsd_build: build/freebsd_arm.tar.gz build/freebsd_386.tar.gz build/freebsd_amd64.tar.gz \
build/netbsd_arm.tar.gz build/netbsd_386.tar.gz build/netbsd_amd64.tar.gz \
build/openbsd_arm.tar.gz build/openbsd_386.tar.gz build/openbsd_amd64.tar.gz
build/freebsd_386.tar.gz: $(sources)
$(call build,freebsd,386,)
$(call tar,freebsd,386)
build/freebsd_amd64.tar.gz: $(sources)
$(call build,freebsd,amd64,)
$(call tar,freebsd,amd64)
build/freebsd_arm.tar.gz: $(sources)
$(call build,freebsd,arm,)
$(call tar,freebsd,arm)
build/netbsd_386.tar.gz: $(sources)
$(call build,netbsd,386,)
$(call tar,netbsd,386)
build/netbsd_amd64.tar.gz: $(sources)
$(call build,netbsd,amd64,)
$(call tar,netbsd,amd64)
build/netbsd_arm.tar.gz: $(sources)
$(call build,netbsd,arm,)
$(call tar,netbsd,arm)
build/openbsd_386.tar.gz: $(sources)
$(call build,openbsd,386,)
$(call tar,openbsd,386)
build/openbsd_amd64.tar.gz: $(sources)
$(call build,openbsd,amd64,)
$(call tar,openbsd,amd64)
build/openbsd_arm.tar.gz: $(sources)
$(call build,openbsd,arm,)
$(call tar,openbsd,arm)

7
README.md

@ -3,7 +3,7 @@
[![Build Status](https://travis-ci.org/chrislusf/seaweedfs.svg?branch=master)](https://travis-ci.org/chrislusf/seaweedfs)
[![GoDoc](https://godoc.org/github.com/chrislusf/seaweedfs/weed?status.svg)](https://godoc.org/github.com/chrislusf/seaweedfs/weed)
[![Wiki](https://img.shields.io/badge/docs-wiki-blue.svg)](https://github.com/chrislusf/seaweedfs/wiki)
[![](https://api.bintray.com/packages/chrislusf/seaweedfs/seaweedfs/images/download.png)](https://bintray.com/chrislusf/seaweedfs/seaweedfs)
[download](https://github.com/chrislusf/seaweedfs/releases/latest)
![SeaweedFS Logo](https://raw.githubusercontent.com/chrislusf/seaweedfs/master/note/seaweedfs.png)
@ -24,8 +24,7 @@ There is only a 40 bytes disk storage overhead for each file's metadata. It is s
SeaweedFS started by implementing [Facebook's Haystack design paper](http://www.usenix.org/event/osdi10/tech/full_papers/Beaver.pdf). SeaweedFS is currently growing, with more features on the way.
Download latest compiled binaries for different platforms here:
[![](https://api.bintray.com/packages/chrislusf/seaweedfs/seaweedfs/images/download.png)](https://bintray.com/chrislusf/seaweedfs/seaweedfs)
[Download latest compiled binaries for different platforms](https://github.com/chrislusf/seaweedfs/releases/latest)
[SeaweedFS Discussion Group](https://groups.google.com/d/forum/seaweedfs)
@ -171,8 +170,6 @@ Volume servers can start with a specific data center name:
weed volume -dir=/tmp/2 -port=8081 -dataCenter=dc2
```
Or the master server can determine the data center via volume server's IP address and settings in weed.conf file.
When requesting a file key, an optional "dataCenter" parameter can limit the assigned volume to the specific data center. For example, this specifies that the assigned volume should be limited to 'dc1':
```

19
docker/Dockerfile

@ -1,18 +1,17 @@
FROM progrium/busybox
FROM frolvlad/alpine-glibc:alpine-3.4
COPY entrypoint.sh /entrypoint.sh
COPY Dockerfile /etc/Dockerfile
RUN opkg-install curl
RUN echo tlsv1 >> ~/.curlrc
RUN curl -Lks https://bintray.com$(curl -Lk http://bintray.com/chrislusf/seaweedfs/seaweedfs/_latestVersion | grep linux_amd64.tar.gz | sed -n "/href/ s/.*href=['\"]\([^'\"]*\)['\"].*/\1/gp") | gunzip | tar -xf - && \
mv go_*amd64/weed /usr/bin/ && \
rm -r go_*amd64
RUN apk add --no-cache --virtual=build-dependencies --update curl wget ca-certificates && \
wget -P /tmp https://github.com/$(curl -s -L https://github.com/chrislusf/seaweedfs/releases/latest | egrep -o '/chrislusf/seaweedfs/releases/download/.*/linux_amd64.tar.gz') && \
tar -C /usr/bin/ -xzvf /tmp/linux_amd64.tar.gz && \
apk del curl wget ca-certificates build-dependencies && \
rm -rf /tmp/*
EXPOSE 8080
EXPOSE 9333
VOLUME /data
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]

8
docker/entrypoint.sh

@ -8,7 +8,7 @@ case "$1" in
if [ -n "$MASTER_PORT_9333_TCP_ADDR" ] ; then
ARGS="$ARGS -peers=$MASTER_PORT_9333_TCP_ADDR:$MASTER_PORT_9333_TCP_PORT"
fi
/usr/bin/weed $@ $ARGS
exec /usr/bin/weed $@ $ARGS
;;
'volume')
@ -17,7 +17,7 @@ case "$1" in
if [ -n "$MASTER_PORT_9333_TCP_ADDR" ] ; then
ARGS="$ARGS -mserver=$MASTER_PORT_9333_TCP_ADDR:$MASTER_PORT_9333_TCP_PORT"
fi
/usr/bin/weed $@ $ARGS
exec /usr/bin/weed $@ $ARGS
;;
'server')
@ -25,10 +25,10 @@ case "$1" in
if [ -n "$MASTER_PORT_9333_TCP_ADDR" ] ; then
ARGS="$ARGS -master.peers=$MASTER_PORT_9333_TCP_ADDR:$MASTER_PORT_9333_TCP_PORT"
fi
/usr/bin/weed $@ $ARGS
exec /usr/bin/weed $@ $ARGS
;;
*)
/usr/bin/weed $@
exec /usr/bin/weed $@
;;
esac

29
unmaintained/change_replication/change_replication.go → unmaintained/change_superblock/change_superblock.go

@ -16,6 +16,7 @@ var (
fixVolumeCollection = flag.String("collection", "", "the volume collection name")
fixVolumeId = flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
targetReplica = flag.String("replication", "", "If just empty, only print out current replication setting.")
targetTTL = flag.String("ttl", "", "If just empty, only print out current ttl setting.")
)
/*
@ -58,20 +59,37 @@ func main() {
}
fmt.Printf("Current Volume Replication: %s\n", superBlock.ReplicaPlacement)
fmt.Printf("Current Volume TTL: %s\n", superBlock.Ttl.String())
if *targetReplica == "" {
return
}
hasChange := false
if *targetReplica != "" {
replica, err := storage.NewReplicaPlacementFromString(*targetReplica)
if err != nil {
glog.Fatalf("cannot parse target replica %s: %v", *targetReplica, err)
}
fmt.Printf("Changing to: %s\n", replica)
fmt.Printf("Changing replication to: %s\n", replica)
superBlock.ReplicaPlacement = replica
hasChange = true
}
if *targetTTL != "" {
ttl, err := storage.ReadTTL(*targetTTL)
if err != nil {
glog.Fatalf("cannot parse target ttl %s: %v", *targetTTL, err)
}
fmt.Printf("Changing ttl to: %s\n", ttl)
superBlock.Ttl = ttl
hasChange = true
}
if hasChange {
header = superBlock.Bytes()
@ -79,6 +97,7 @@ func main() {
glog.Fatalf("cannot write super block: %v", e)
}
fmt.Println("Done.")
fmt.Println("Change Applied.")
}
}

2
unmaintained/fix_dat/fix_dat.go

@ -60,7 +60,7 @@ func main() {
iterateEntries(datFile, indexFile, func(n *storage.Needle, offset int64) {
fmt.Printf("file id=%d name=%s size=%d dataSize=%d\n", n.Id, string(n.Name), n.Size, n.DataSize)
s, e := n.Append(newDatFile, storage.Version2)
s, _, e := n.Append(newDatFile, storage.Version2)
fmt.Printf("size %d error %v\n", s, e)
})

2
weed/command/backup.go

@ -75,7 +75,7 @@ func runBackup(cmd *Command, args []string) bool {
return true
}
v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl)
v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true

3
weed/command/compact.go

@ -24,6 +24,7 @@ var (
compactVolumeCollection = cmdCompact.Flag.String("collection", "", "volume collection name")
compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.")
compactMethod = cmdCompact.Flag.Int("method", 0, "option to choose which compact method. use 0 or 1.")
compactVolumePreallocate = cmdCompact.Flag.Int64("preallocateMB", 0, "preallocate volume disk space")
)
func runCompact(cmd *Command, args []string) bool {
@ -34,7 +35,7 @@ func runCompact(cmd *Command, args []string) bool {
vid := storage.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid,
storage.NeedleMapInMemory, nil, nil)
storage.NeedleMapInMemory, nil, nil, *compactVolumePreallocate*(1<<20))
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}

2
weed/command/filer_copy.go

@ -126,7 +126,7 @@ func doEachCopy(fileOrDir string, host string, path string) bool {
}
results, err := operation.SubmitFiles(*copy.master, parts,
*copy.replication, *copy.collection,
*copy.replication, *copy.collection, "",
*copy.ttl, *copy.maxMB, copy.secret)
if err != nil {
fmt.Printf("Failed to submit file %s: %v", fileOrDir, err)

2
weed/command/fix.go

@ -58,7 +58,7 @@ func runFix(cmd *Command, args []string) bool {
glog.V(2).Infof("saved %d with error %v", n.Size, pe)
} else {
glog.V(2).Infof("skipping deleted file ...")
return nm.Delete(n.Id)
return nm.Delete(n.Id, uint32(offset/storage.NeedlePaddingSize))
}
return nil
})

33
weed/command/master.go

@ -10,9 +10,13 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
func init() {
@ -35,10 +39,11 @@ var (
metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094")
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.")
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "Deprecating! xml configuration file")
defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.")
mTimeout = cmdMaster.Flag.Int("idleTimeout", 10, "connection idle seconds")
// mTimeout = cmdMaster.Flag.Int("idleTimeout", 30, "connection idle seconds")
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
@ -73,7 +78,8 @@ func runMaster(cmd *Command, args []string) bool {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, *mport, *metaFolder,
*volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold,
*volumeSizeLimitMB, *volumePreallocate,
*mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold,
masterWhiteList, *masterSecureKey,
)
@ -81,7 +87,7 @@ func runMaster(cmd *Command, args []string) bool {
glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress)
listener, e := util.NewListener(listeningAddress, time.Duration(*mTimeout)*time.Second)
listener, e := util.NewListener(listeningAddress, 0)
if e != nil {
glog.Fatalf("Master startup error: %v", e)
}
@ -97,8 +103,25 @@ func runMaster(cmd *Command, args []string) bool {
ms.SetRaftServer(raftServer)
}()
if e := http.Serve(listener, r); e != nil {
glog.Fatalf("Fail to serve: %v", e)
// start grpc and http server
m := cmux.New(listener)
grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
httpL := m.Match(cmux.Any())
// Create your protocol servers.
grpcS := grpc.NewServer()
pb.RegisterSeaweedServer(grpcS, ms)
reflection.Register(grpcS)
httpS := &http.Server{Handler: r}
go grpcS.Serve(grpcL)
go httpS.Serve(httpL)
if err := m.Serve(); err != nil {
glog.Fatalf("master server failed to serve: %v", err)
}
return true
}

34
weed/command/server.go

@ -11,10 +11,14 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
type ServerOptions struct {
@ -47,7 +51,7 @@ var (
serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name")
serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
serverTimeout = cmdServer.Flag.Int("idleTimeout", 10, "connection idle seconds")
serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds")
serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
@ -57,6 +61,7 @@ var (
masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
masterVolumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.")
masterConfFile = cmdServer.Flag.String("master.conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.")
volumePort = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
@ -200,12 +205,13 @@ func runServer(cmd *Command, args []string) bool {
go func() {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder,
*masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold,
*masterVolumeSizeLimitMB, *masterVolumePreallocate,
*volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold,
serverWhiteList, *serverSecureKey,
)
glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort))
masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), time.Duration(*serverTimeout)*time.Second)
masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), 0)
if e != nil {
glog.Fatalf("Master startup error: %v", e)
}
@ -224,9 +230,27 @@ func runServer(cmd *Command, args []string) bool {
}()
raftWaitForMaster.Done()
if e := http.Serve(masterListener, r); e != nil {
glog.Fatalf("Master Fail to serve:%s", e.Error())
// start grpc and http server
m := cmux.New(masterListener)
grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
httpL := m.Match(cmux.Any())
// Create your protocol servers.
grpcS := grpc.NewServer()
pb.RegisterSeaweedServer(grpcS, ms)
reflection.Register(grpcS)
httpS := &http.Server{Handler: r}
go grpcS.Serve(grpcL)
go httpS.Serve(httpL)
if err := m.Serve(); err != nil {
glog.Fatalf("master server failed to serve: %v", err)
}
}()
volumeWait.Wait()

8
weed/command/upload.go

@ -20,6 +20,7 @@ type UploadOptions struct {
include *string
replication *string
collection *string
dataCenter *string
ttl *string
maxMB *int
secretKey *string
@ -33,6 +34,7 @@ func init() {
upload.include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
upload.replication = cmdUpload.Flag.String("replication", "", "replication type")
upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name")
upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name")
upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
upload.maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
upload.secretKey = cmdUpload.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
@ -63,7 +65,7 @@ var cmdUpload = &Command{
func runUpload(cmd *Command, args []string) bool {
secret := security.Secret(*upload.secretKey)
if len(cmdUpload.Flag.Args()) == 0 {
if len(args) == 0 {
if *upload.dir == "" {
return false
}
@ -80,7 +82,7 @@ func runUpload(cmd *Command, args []string) bool {
return e
}
results, e := operation.SubmitFiles(*upload.master, parts,
*upload.replication, *upload.collection,
*upload.replication, *upload.collection, *upload.dataCenter,
*upload.ttl, *upload.maxMB, secret)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
@ -99,7 +101,7 @@ func runUpload(cmd *Command, args []string) bool {
fmt.Println(e.Error())
}
results, _ := operation.SubmitFiles(*upload.master, parts,
*upload.replication, *upload.collection,
*upload.replication, *upload.collection, *upload.dataCenter,
*upload.ttl, *upload.maxMB, secret)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))

2
weed/command/volume.go

@ -48,7 +48,7 @@ func init() {
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
v.master = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds")
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds")
v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")

439
weed/filer/postgres_store/postgres_store.go

@ -2,26 +2,30 @@ package postgres_store
import (
"database/sql"
"errors"
"fmt"
"hash/crc32"
"path/filepath"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
_ "github.com/lib/pq"
_ "path/filepath"
"strings"
)
const (
default_maxIdleConnections = 100
default_maxOpenConnections = 50
default_maxTableNums = 1024
tableName = "filer_mapping"
filesTableName = "files"
directoriesTableName = "directories"
)
var (
_init_db sync.Once
_db_connections []*sql.DB
_db_connection *sql.DB
)
type PostgresConf struct {
@ -35,20 +39,53 @@ type PostgresConf struct {
MaxOpenConnections int
}
type ShardingConf struct {
IsSharding bool `json:"isSharding"`
ShardCount int `json:"shardCount"`
}
type PostgresStore struct {
dbs []*sql.DB
isSharding bool
shardCount int
db *sql.DB
server string
user string
password string
}
func (s *PostgresStore) CreateFile(fullFileName string, fid string) (err error) {
glog.V(3).Infoln("Calling posgres_store CreateFile")
return s.Put(fullFileName, fid)
}
func (s *PostgresStore) FindFile(fullFileName string) (fid string, err error) {
glog.V(3).Infoln("Calling posgres_store FindFile")
return s.Get(fullFileName)
}
func (s *PostgresStore) DeleteFile(fullFileName string) (fid string, err error) {
glog.V(3).Infoln("Calling posgres_store DeleteFile")
return "", s.Delete(fullFileName)
}
func (s *PostgresStore) FindDirectory(dirPath string) (dirId filer.DirectoryId, err error) {
glog.V(3).Infoln("Calling posgres_store FindDirectory")
return s.FindDir(dirPath)
}
func (s *PostgresStore) ListDirectories(dirPath string) (dirs []filer.DirectoryEntry, err error) {
glog.V(3).Infoln("Calling posgres_store ListDirectories")
return s.ListDirs(dirPath)
}
func (s *PostgresStore) ListFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
glog.V(3).Infoln("Calling posgres_store ListFiles")
return s.FindFiles(dirPath, lastFileName, limit)
}
func (s *PostgresStore) DeleteDirectory(dirPath string, recursive bool) (err error) {
glog.V(3).Infoln("Calling posgres_store DeleteDirectory")
return s.DeleteDir(dirPath, recursive)
}
func (s *PostgresStore) Move(fromPath string, toPath string) (err error) {
glog.V(3).Infoln("Calling posgres_store Move")
return errors.New("Move is not yet implemented for the PostgreSQL store.")
}
func databaseExists(db *sql.DB, databaseName string) (bool, error) {
sqlStatement := "SELECT datname from pg_database WHERE datname='%s'"
row := db.QueryRow(fmt.Sprintf(sqlStatement, databaseName))
@ -70,9 +107,8 @@ func createDatabase(db *sql.DB, databaseName string) error {
return err
}
func getDbConnection(confs []PostgresConf) []*sql.DB {
func getDbConnection(conf PostgresConf) *sql.DB {
_init_db.Do(func() {
for _, conf := range confs {
sqlUrl := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s connect_timeout=30", conf.HostName, conf.Port, conf.User, conf.Password, "postgres", conf.SslMode)
glog.V(3).Infoln("Opening postgres master database")
@ -145,83 +181,46 @@ func getDbConnection(confs []PostgresConf) []*sql.DB {
_db_connection.SetMaxIdleConns(maxIdleConnections)
_db_connection.SetMaxOpenConns(maxOpenConnections)
_db_connections = append(_db_connections, _db_connection)
}
})
return _db_connections
return _db_connection
}
func NewPostgresStore(confs []PostgresConf, isSharding bool, shardCount int) *PostgresStore {
//func NewPostgresStore(master string, confs []PostgresConf, isSharding bool, shardCount int) *PostgresStore {
func NewPostgresStore(master string, conf PostgresConf) *PostgresStore {
pg := &PostgresStore{
dbs: getDbConnection(confs),
isSharding: isSharding,
shardCount: shardCount,
db: getDbConnection(conf),
}
for _, db := range pg.dbs {
if !isSharding {
pg.shardCount = 1
} else {
if pg.shardCount == 0 {
pg.shardCount = default_maxTableNums
}
}
for i := 0; i < pg.shardCount; i++ {
if err := pg.createTables(db, tableName, i); err != nil {
pg.createDirectoriesTable()
if err := pg.createFilesTable(); err != nil {
fmt.Printf("create table failed %v", err)
}
}
}
return pg
}
func (s *PostgresStore) hash(fullFileName string) (instance_offset, table_postfix int) {
hash_value := crc32.ChecksumIEEE([]byte(fullFileName))
instance_offset = int(hash_value) % len(s.dbs)
table_postfix = int(hash_value) % s.shardCount
return
}
func (s *PostgresStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) {
instance_offset, table_postfix := s.hash(path)
instanceId = instance_offset
if s.isSharding {
tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix)
} else {
tableFullName = tableName
}
return
}
func (s *PostgresStore) Get(fullFilePath string) (fid string, err error) {
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return "", fmt.Errorf("PostgresStore Get operation can not parse file path %s: err is %v", fullFilePath, err)
}
fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName)
fid, err = s.query(fullFilePath)
return fid, err
}
func (s *PostgresStore) Put(fullFilePath string, fid string) (err error) {
var tableFullName string
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return fmt.Errorf("PostgresStore Put operation can not parse file path %s: err is %v", fullFilePath, err)
}
var old_fid string
if old_fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil && err != sql.ErrNoRows {
if old_fid, err = s.query(fullFilePath); err != nil && err != sql.ErrNoRows {
return fmt.Errorf("PostgresStore Put operation failed when querying path %s: err is %v", fullFilePath, err)
} else {
if len(old_fid) == 0 {
err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
err = s.insert(fullFilePath, fid)
if err != nil {
return fmt.Errorf("PostgresStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err)
}
} else {
err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
err = s.update(fullFilePath, fid)
if err != nil {
return fmt.Errorf("PostgresStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err)
}
@ -232,16 +231,15 @@ func (s *PostgresStore) Put(fullFilePath string, fid string) (err error) {
func (s *PostgresStore) Delete(fullFilePath string) (err error) {
var fid string
instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
if err != nil {
return fmt.Errorf("PostgresStore Delete operation can not parse file path %s: err is %v", fullFilePath, err)
}
if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
if fid, err = s.query(fullFilePath); err != nil {
return fmt.Errorf("PostgresStore Delete operation failed when querying path %s: err is %v", fullFilePath, err)
} else if fid == "" {
return nil
}
if err = s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
if err = s.delete(fullFilePath); err != nil {
return fmt.Errorf("PostgresStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err)
} else {
return nil
@ -249,38 +247,85 @@ func (s *PostgresStore) Delete(fullFilePath string) (err error) {
}
func (s *PostgresStore) Close() {
for _, db := range s.dbs {
db.Close()
s.db.Close()
}
func (s *PostgresStore) FindDir(dirPath string) (dirId filer.DirectoryId, err error) {
dirId, _, err = s.lookupDirectory(dirPath)
return dirId, err
}
func (s *PostgresStore) ListDirs(dirPath string) (dirs []filer.DirectoryEntry, err error) {
dirs, err = s.findDirectories(dirPath, 20)
glog.V(3).Infof("Postgres ListDirs = found %d directories under %s", len(dirs), dirPath)
return dirs, err
}
func (s *PostgresStore) DeleteDir(dirPath string, recursive bool) (err error) {
err = s.deleteDirectory(dirPath, recursive)
if err != nil {
glog.V(0).Infof("Error in Postgres DeleteDir '%s' (recursive = '%t'): %s", err)
}
return err
}
func (s *PostgresStore) FindFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
files, err = s.findFiles(dirPath, lastFileName, limit)
return files, err
}
var createTable = `
var createDirectoryTable = `
CREATE TABLE IF NOT EXISTS %s (
id BIGSERIAL NOT NULL,
uriPath VARCHAR(1024) NOT NULL DEFAULT '',
directoryRoot VARCHAR(1024) NOT NULL DEFAULT '',
directoryName VARCHAR(1024) NOT NULL DEFAULT '',
CONSTRAINT unique_directory UNIQUE (directoryRoot, directoryName)
);
`
var createFileTable = `
CREATE TABLE IF NOT EXISTS %s (
id BIGSERIAL NOT NULL,
directoryPart VARCHAR(1024) NOT NULL DEFAULT '',
filePart VARCHAR(1024) NOT NULL DEFAULT '',
fid VARCHAR(36) NOT NULL DEFAULT '',
createTime BIGINT NOT NULL DEFAULT 0,
updateTime BIGINT NOT NULL DEFAULT 0,
remark VARCHAR(20) NOT NULL DEFAULT '',
status SMALLINT NOT NULL DEFAULT '1',
PRIMARY KEY (id),
CONSTRAINT %s_index_uriPath UNIQUE (uriPath)
CONSTRAINT %s_unique_file UNIQUE (directoryPart, filePart)
);
`
func (s *PostgresStore) createTables(db *sql.DB, tableName string, postfix int) error {
var realTableName string
if s.isSharding {
realTableName = fmt.Sprintf("%s_%04d", tableName, postfix)
} else {
realTableName = tableName
func (s *PostgresStore) createDirectoriesTable() error {
glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", directoriesTableName)
sqlCreate := fmt.Sprintf(createDirectoryTable, directoriesTableName)
stmt, err := s.db.Prepare(sqlCreate)
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
return err
}
return nil
}
glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", realTableName)
func (s *PostgresStore) createFilesTable() error {
sqlCreate := fmt.Sprintf(createTable, realTableName, realTableName)
glog.V(3).Infoln("Creating postgres table if it doesn't exist: ", filesTableName)
stmt, err := db.Prepare(sqlCreate)
sqlCreate := fmt.Sprintf(createFileTable, filesTableName, filesTableName)
stmt, err := s.db.Prepare(sqlCreate)
if err != nil {
return err
}
@ -293,10 +338,11 @@ func (s *PostgresStore) createTables(db *sql.DB, tableName string, postfix int)
return nil
}
func (s *PostgresStore) query(uriPath string, db *sql.DB, tableName string) (string, error) {
sqlStatement := fmt.Sprintf("SELECT fid FROM %s WHERE uriPath=$1", tableName)
func (s *PostgresStore) query(uriPath string) (string, error) {
directoryPart, filePart := filepath.Split(uriPath)
sqlStatement := fmt.Sprintf("SELECT fid FROM %s WHERE directoryPart=$1 AND filePart=$2", filesTableName)
row := db.QueryRow(sqlStatement, uriPath)
row := s.db.QueryRow(sqlStatement, directoryPart, filePart)
var fid string
err := row.Scan(&fid)
@ -308,12 +354,13 @@ func (s *PostgresStore) query(uriPath string, db *sql.DB, tableName string) (str
return fid, nil
}
func (s *PostgresStore) update(uriPath string, fid string, db *sql.DB, tableName string) error {
sqlStatement := fmt.Sprintf("UPDATE %s SET fid=$1, updateTime=$2 WHERE uriPath=$3", tableName)
func (s *PostgresStore) update(uriPath string, fid string) error {
directoryPart, filePart := filepath.Split(uriPath)
sqlStatement := fmt.Sprintf("UPDATE %s SET fid=$1, updateTime=$2 WHERE directoryPart=$3 AND filePart=$4", filesTableName)
glog.V(3).Infof("Postgres query -- updating path '%s' with id '%s'", uriPath, fid)
res, err := db.Exec(sqlStatement, fid, time.Now().Unix(), uriPath)
res, err := s.db.Exec(sqlStatement, fid, time.Now().Unix(), directoryPart, filePart)
if err != nil {
return err
}
@ -325,12 +372,18 @@ func (s *PostgresStore) update(uriPath string, fid string, db *sql.DB, tableName
return nil
}
func (s *PostgresStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error {
sqlStatement := fmt.Sprintf("INSERT INTO %s (uriPath,fid,createTime) VALUES($1, $2, $3)", tableName)
func (s *PostgresStore) insert(uriPath string, fid string) error {
directoryPart, filePart := filepath.Split(uriPath)
existingId, _, _ := s.lookupDirectory(directoryPart)
if existingId == 0 {
s.recursiveInsertDirectory(directoryPart)
}
sqlStatement := fmt.Sprintf("INSERT INTO %s (directoryPart,filePart,fid,createTime) VALUES($1, $2, $3, $4)", filesTableName)
glog.V(3).Infof("Postgres query -- inserting path '%s' with id '%s'", uriPath, fid)
res, err := db.Exec(sqlStatement, uriPath, fid, time.Now().Unix())
res, err := s.db.Exec(sqlStatement, directoryPart, filePart, fid, time.Now().Unix())
if err != nil {
return err
@ -343,15 +396,59 @@ func (s *PostgresStore) insert(uriPath string, fid string, db *sql.DB, tableName
if err != nil {
return err
}
return nil
}
func (s *PostgresStore) delete(uriPath string, db *sql.DB, tableName string) error {
sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE uriPath=$1", tableName)
func (s *PostgresStore) recursiveInsertDirectory(dirPath string) {
pathParts := strings.Split(dirPath, "/")
var workingPath string = "/"
for _, part := range pathParts {
if part == "" {
continue
}
workingPath += (part + "/")
existingId, _, _ := s.lookupDirectory(workingPath)
if existingId == 0 {
s.insertDirectory(workingPath)
}
}
}
func (s *PostgresStore) insertDirectory(dirPath string) {
pathParts := strings.Split(dirPath, "/")
directoryRoot := "/"
directoryName := ""
if len(pathParts) > 1 {
directoryRoot = strings.Join(pathParts[0:len(pathParts)-2], "/") + "/"
directoryName = strings.Join(pathParts[len(pathParts)-2:], "/")
} else if len(pathParts) == 1 {
directoryRoot = "/"
directoryName = pathParts[0] + "/"
}
sqlInsertDirectoryStatement := fmt.Sprintf("INSERT INTO %s (directoryroot, directoryname) "+
"SELECT $1, $2 WHERE NOT EXISTS ( SELECT id FROM %s WHERE directoryroot=$3 AND directoryname=$4 )",
directoriesTableName, directoriesTableName)
glog.V(4).Infof("Postgres query -- Inserting directory (if it doesn't exist) - root = %s, name = %s",
directoryRoot, directoryName)
_, err := s.db.Exec(sqlInsertDirectoryStatement, directoryRoot, directoryName, directoryRoot, directoryName)
if err != nil {
glog.V(0).Infof("Postgres query -- Error inserting directory - root = %s, name = %s: %s",
directoryRoot, directoryName, err)
}
}
func (s *PostgresStore) delete(uriPath string) error {
directoryPart, filePart := filepath.Split(uriPath)
sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE directoryPart=$1 AND filePart=$2", filesTableName)
glog.V(3).Infof("Postgres query -- deleting path '%s'", uriPath)
res, err := db.Exec(sqlStatement, uriPath)
res, err := s.db.Exec(sqlStatement, directoryPart, filePart)
if err != nil {
return err
}
@ -362,3 +459,163 @@ func (s *PostgresStore) delete(uriPath string, db *sql.DB, tableName string) err
}
return nil
}
func (s *PostgresStore) lookupDirectory(dirPath string) (filer.DirectoryId, string, error) {
directoryRoot, directoryName := s.mySplitPath(dirPath)
sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName)
row := s.db.QueryRow(sqlStatement, directoryRoot, directoryName)
var id filer.DirectoryId
var dirRoot string
var dirName string
err := row.Scan(&id, &dirRoot, &dirName)
glog.V(3).Infof("Postgres lookupDirectory -- looking up directory '%s' and found id '%d', root '%s', name '%s' ", dirPath, id, dirRoot, dirName)
if err != nil {
return 0, "", err
}
return id, filepath.Join(dirRoot, dirName), err
}
func (s *PostgresStore) findDirectories(dirPath string, limit int) (dirs []filer.DirectoryEntry, err error) {
sqlStatement := fmt.Sprintf("SELECT id, directoryroot, directoryname FROM %s WHERE directoryRoot=$1 AND directoryName != '' ORDER BY id LIMIT $2", directoriesTableName)
rows, err := s.db.Query(sqlStatement, dirPath, limit)
if err != nil {
glog.V(0).Infof("Postgres findDirectories error: %s", err)
}
if rows != nil {
defer rows.Close()
for rows.Next() {
var id filer.DirectoryId
var directoryRoot string
var directoryName string
scanErr := rows.Scan(&id, &directoryRoot, &directoryName)
if scanErr != nil {
err = scanErr
}
dirs = append(dirs, filer.DirectoryEntry{Name: (directoryName), Id: id})
}
}
return
}
func (s *PostgresStore) safeToDeleteDirectory(dirPath string, recursive bool) bool {
if recursive {
return true
}
sqlStatement := fmt.Sprintf("SELECT id FROM %s WHERE directoryRoot LIKE $1 LIMIT 1", directoriesTableName)
row := s.db.QueryRow(sqlStatement, dirPath+"%")
var id filer.DirectoryId
err := row.Scan(&id)
if err != nil {
if err == sql.ErrNoRows {
return true
}
}
return false
}
func (s *PostgresStore) mySplitPath(dirPath string) (directoryRoot string, directoryName string) {
pathParts := strings.Split(dirPath, "/")
directoryRoot = "/"
directoryName = ""
if len(pathParts) > 1 {
directoryRoot = strings.Join(pathParts[0:len(pathParts)-2], "/") + "/"
directoryName = strings.Join(pathParts[len(pathParts)-2:], "/")
} else if len(pathParts) == 1 {
directoryRoot = "/"
directoryName = pathParts[0] + "/"
}
return directoryRoot, directoryName
}
func (s *PostgresStore) deleteDirectory(dirPath string, recursive bool) (err error) {
directoryRoot, directoryName := s.mySplitPath(dirPath)
// delete files
sqlStatement := fmt.Sprintf("DELETE FROM %s WHERE directorypart=$1", filesTableName)
_, err = s.db.Exec(sqlStatement, dirPath)
if err != nil {
return err
}
// delete specific directory if it is empty or recursive delete was requested
safeToDelete := s.safeToDeleteDirectory(dirPath, recursive)
if safeToDelete {
sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directoryRoot=$1 AND directoryName=$2", directoriesTableName)
_, err = s.db.Exec(sqlStatement, directoryRoot, directoryName)
if err != nil {
return err
}
}
if recursive {
// delete descendant files
sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directorypart LIKE $1", filesTableName)
_, err = s.db.Exec(sqlStatement, dirPath+"%")
if err != nil {
return err
}
// delete descendant directories
sqlStatement = fmt.Sprintf("DELETE FROM %s WHERE directoryRoot LIKE $1", directoriesTableName)
_, err = s.db.Exec(sqlStatement, dirPath+"%")
if err != nil {
return err
}
}
return err
}
func (s *PostgresStore) findFiles(dirPath string, lastFileName string, limit int) (files []filer.FileEntry, err error) {
var rows *sql.Rows = nil
if lastFileName == "" {
sqlStatement :=
fmt.Sprintf("SELECT fid, directorypart, filepart FROM %s WHERE directorypart=$1 ORDER BY id LIMIT $2", filesTableName)
rows, err = s.db.Query(sqlStatement, dirPath, limit)
} else {
sqlStatement :=
fmt.Sprintf("SELECT fid, directorypart, filepart FROM %s WHERE directorypart=$1 "+
"AND id > (SELECT id FROM %s WHERE directoryPart=$2 AND filepart=$3) ORDER BY id LIMIT $4",
filesTableName, filesTableName)
_, lastFileNameName := filepath.Split(lastFileName)
rows, err = s.db.Query(sqlStatement, dirPath, dirPath, lastFileNameName, limit)
}
if err != nil {
glog.V(0).Infof("Postgres find files error: %s", err)
}
if rows != nil {
defer rows.Close()
for rows.Next() {
var fid filer.FileId
var directoryPart string
var filePart string
scanErr := rows.Scan(&fid, &directoryPart, &filePart)
if scanErr != nil {
err = scanErr
}
files = append(files, filer.FileEntry{Name: filepath.Join(directoryPart, filePart), Id: fid})
if len(files) >= limit {
break
}
}
}
glog.V(3).Infof("Postgres findFiles -- looking up files under '%s' and found %d files. Limit=%d, lastFileName=%s",
dirPath, len(files), limit, lastFileName)
return files, err
}

3
weed/images/orientation_test.go

@ -2,6 +2,7 @@ package images
import (
"io/ioutil"
"os"
"testing"
)
@ -14,4 +15,6 @@ func TestXYZ(t *testing.T) {
ioutil.WriteFile("fixed1.jpg", fixed_data, 0644)
os.Remove("fixed1.jpg")
}

78
weed/operation/submit.go

@ -23,6 +23,7 @@ type FilePart struct {
ModTime int64 //in seconds
Replication string
Collection string
DataCenter string
Ttl string
Server string //this comes from assign result
Fid string //this comes from assign result, but customizable
@ -37,7 +38,7 @@ type SubmitResult struct {
}
func SubmitFiles(master string, files []FilePart,
replication string, collection string, ttl string, maxMB int,
replication string, collection string, dataCenter string, ttl string, maxMB int,
secret security.Secret,
) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
@ -48,6 +49,7 @@ func SubmitFiles(master string, files []FilePart,
Count: uint64(len(files)),
Replication: replication,
Collection: collection,
DataCenter: dataCenter,
Ttl: ttl,
}
ret, err := Assign(master, ar)
@ -65,6 +67,7 @@ func SubmitFiles(master string, files []FilePart,
file.Server = ret.Url
file.Replication = replication
file.Collection = collection
file.DataCenter = dataCenter
results[index].Size, err = file.Upload(maxMB, master, secret)
if err != nil {
results[index].Error = err.Error()
@ -92,18 +95,15 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
}
ret.Reader = fh
if fi, fiErr := fh.Stat(); fiErr != nil {
fi, fiErr := fh.Stat()
if fiErr != nil {
glog.V(0).Info("Failed to stat file:", fullPathFilename)
return ret, fiErr
} else {
}
ret.ModTime = fi.ModTime().UTC().Unix()
ret.FileSize = fi.Size()
}
ext := strings.ToLower(path.Ext(fullPathFilename))
ret.IsGzipped = ext == ".gz"
if ret.IsGzipped {
ret.FileName = fullPathFilename[0 : len(fullPathFilename)-3]
}
ret.FileName = fullPathFilename
if ext != "" {
ret.MimeType = mime.TypeByExtension(ext)
@ -132,11 +132,46 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
Chunks: make([]*ChunkInfo, 0, chunks),
}
var ret *AssignResult
var id string
if fi.DataCenter != "" {
ar := &VolumeAssignRequest{
Count: uint64(chunks),
Replication: fi.Replication,
Collection: fi.Collection,
Ttl: fi.Ttl,
}
ret, err = Assign(master, ar)
if err != nil {
return
}
}
for i := int64(0); i < chunks; i++ {
id, count, e := upload_one_chunk(
if fi.DataCenter == "" {
ar := &VolumeAssignRequest{
Count: 1,
Replication: fi.Replication,
Collection: fi.Collection,
Ttl: fi.Ttl,
}
ret, err = Assign(master, ar)
if err != nil {
// delete all uploaded chunks
cm.DeleteChunks(master)
return
}
id = ret.Fid
} else {
id = ret.Fid
if i > 0 {
id += "_" + strconv.FormatInt(i, 10)
}
}
fileUrl := "http://" + ret.Url + "/" + id
count, e := upload_one_chunk(
baseName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(fi.Reader, chunkSize),
master, fi.Replication, fi.Collection, fi.Ttl,
master, fileUrl,
jwt)
if e != nil {
// delete all uploaded chunks
@ -158,7 +193,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
cm.DeleteChunks(master)
}
} else {
ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)
ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, nil, jwt)
if e != nil {
return 0, e
}
@ -168,26 +203,15 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
}
func upload_one_chunk(filename string, reader io.Reader, master,
replication string, collection string, ttl string, jwt security.EncodedJwt,
) (fid string, size uint32, e error) {
ar := &VolumeAssignRequest{
Count: 1,
Replication: replication,
Collection: collection,
Ttl: ttl,
}
ret, err := Assign(master, ar)
if err != nil {
return "", 0, err
}
fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
fileUrl string, jwt security.EncodedJwt,
) (size uint32, e error) {
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
"application/octet-stream", jwt)
"application/octet-stream", nil, jwt)
if uploadError != nil {
return fid, 0, uploadError
return 0, uploadError
}
return fid, uploadResult.Size, nil
return uploadResult.Size, nil
}
func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
@ -201,6 +225,6 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
q := u.Query()
q.Set("cm", "true")
u.RawQuery = q.Encode()
_, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", jwt)
_, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt)
return e
}

203
weed/operation/system_message.pb.go

@ -1,203 +0,0 @@
// Code generated by protoc-gen-go.
// source: system_message.proto
// DO NOT EDIT!
/*
Package operation is a generated protocol buffer package.
It is generated from these files:
system_message.proto
It has these top-level messages:
VolumeInformationMessage
JoinMessage
*/
package operation
import proto "github.com/golang/protobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = math.Inf
type VolumeInformationMessage struct {
Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"`
Size *uint64 `protobuf:"varint,2,req,name=size" json:"size,omitempty"`
Collection *string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"`
FileCount *uint64 `protobuf:"varint,4,req,name=file_count" json:"file_count,omitempty"`
DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"`
DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"`
ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"`
ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"`
Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"`
Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} }
func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) }
func (*VolumeInformationMessage) ProtoMessage() {}
const Default_VolumeInformationMessage_Version uint32 = 2
func (m *VolumeInformationMessage) GetId() uint32 {
if m != nil && m.Id != nil {
return *m.Id
}
return 0
}
func (m *VolumeInformationMessage) GetSize() uint64 {
if m != nil && m.Size != nil {
return *m.Size
}
return 0
}
func (m *VolumeInformationMessage) GetCollection() string {
if m != nil && m.Collection != nil {
return *m.Collection
}
return ""
}
func (m *VolumeInformationMessage) GetFileCount() uint64 {
if m != nil && m.FileCount != nil {
return *m.FileCount
}
return 0
}
func (m *VolumeInformationMessage) GetDeleteCount() uint64 {
if m != nil && m.DeleteCount != nil {
return *m.DeleteCount
}
return 0
}
func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 {
if m != nil && m.DeletedByteCount != nil {
return *m.DeletedByteCount
}
return 0
}
func (m *VolumeInformationMessage) GetReadOnly() bool {
if m != nil && m.ReadOnly != nil {
return *m.ReadOnly
}
return false
}
func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 {
if m != nil && m.ReplicaPlacement != nil {
return *m.ReplicaPlacement
}
return 0
}
func (m *VolumeInformationMessage) GetVersion() uint32 {
if m != nil && m.Version != nil {
return *m.Version
}
return Default_VolumeInformationMessage_Version
}
func (m *VolumeInformationMessage) GetTtl() uint32 {
if m != nil && m.Ttl != nil {
return *m.Ttl
}
return 0
}
type JoinMessage struct {
IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"`
Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"`
Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"`
PublicUrl *string `protobuf:"bytes,4,opt,name=public_url" json:"public_url,omitempty"`
MaxVolumeCount *uint32 `protobuf:"varint,5,req,name=max_volume_count" json:"max_volume_count,omitempty"`
MaxFileKey *uint64 `protobuf:"varint,6,req,name=max_file_key" json:"max_file_key,omitempty"`
DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"`
Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"`
Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"`
AdminPort *uint32 `protobuf:"varint,10,opt,name=admin_port" json:"admin_port,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *JoinMessage) Reset() { *m = JoinMessage{} }
func (m *JoinMessage) String() string { return proto.CompactTextString(m) }
func (*JoinMessage) ProtoMessage() {}
func (m *JoinMessage) GetIsInit() bool {
if m != nil && m.IsInit != nil {
return *m.IsInit
}
return false
}
func (m *JoinMessage) GetIp() string {
if m != nil && m.Ip != nil {
return *m.Ip
}
return ""
}
func (m *JoinMessage) GetPort() uint32 {
if m != nil && m.Port != nil {
return *m.Port
}
return 0
}
func (m *JoinMessage) GetPublicUrl() string {
if m != nil && m.PublicUrl != nil {
return *m.PublicUrl
}
return ""
}
func (m *JoinMessage) GetMaxVolumeCount() uint32 {
if m != nil && m.MaxVolumeCount != nil {
return *m.MaxVolumeCount
}
return 0
}
func (m *JoinMessage) GetMaxFileKey() uint64 {
if m != nil && m.MaxFileKey != nil {
return *m.MaxFileKey
}
return 0
}
func (m *JoinMessage) GetDataCenter() string {
if m != nil && m.DataCenter != nil {
return *m.DataCenter
}
return ""
}
func (m *JoinMessage) GetRack() string {
if m != nil && m.Rack != nil {
return *m.Rack
}
return ""
}
func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage {
if m != nil {
return m.Volumes
}
return nil
}
func (m *JoinMessage) GetAdminPort() uint32 {
if m != nil && m.AdminPort != nil {
return *m.AdminPort
}
return 0
}
func init() {
}

59
weed/operation/system_message_test.go

@ -1,59 +0,0 @@
package operation
import (
"encoding/json"
"log"
"testing"
"github.com/golang/protobuf/proto"
)
func TestSerialDeserial(t *testing.T) {
volumeMessage := &VolumeInformationMessage{
Id: proto.Uint32(12),
Size: proto.Uint64(2341234),
Collection: proto.String("benchmark"),
FileCount: proto.Uint64(2341234),
DeleteCount: proto.Uint64(234),
DeletedByteCount: proto.Uint64(21234),
ReadOnly: proto.Bool(false),
ReplicaPlacement: proto.Uint32(210),
Version: proto.Uint32(2),
}
var volumeMessages []*VolumeInformationMessage
volumeMessages = append(volumeMessages, volumeMessage)
joinMessage := &JoinMessage{
IsInit: proto.Bool(true),
Ip: proto.String("127.0.3.12"),
Port: proto.Uint32(34546),
PublicUrl: proto.String("localhost:2342"),
MaxVolumeCount: proto.Uint32(210),
MaxFileKey: proto.Uint64(324234423),
DataCenter: proto.String("dc1"),
Rack: proto.String("rack2"),
Volumes: volumeMessages,
}
data, err := proto.Marshal(joinMessage)
if err != nil {
log.Fatal("marshaling error: ", err)
}
newMessage := &JoinMessage{}
err = proto.Unmarshal(data, newMessage)
if err != nil {
log.Fatal("unmarshaling error: ", err)
}
log.Println("The pb data size is", len(data))
jsonData, jsonError := json.Marshal(joinMessage)
if jsonError != nil {
log.Fatal("json marshaling error: ", jsonError)
}
log.Println("The json data size is", len(jsonData), string(jsonData))
// Now test and newTest contain the same data.
if *joinMessage.PublicUrl != *newMessage.PublicUrl {
log.Fatalf("data mismatch %q != %q", *joinMessage.PublicUrl, *newMessage.PublicUrl)
}
}

21
weed/operation/upload_content.go

@ -36,13 +36,13 @@ func init() {
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
return upload_content(uploadUrl, func(w io.Writer) (err error) {
_, err = io.Copy(w, reader)
return
}, filename, isGzipped, mtype, jwt)
}, filename, isGzipped, mtype, pairMap, jwt)
}
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
body_buf := bytes.NewBufferString("")
body_writer := multipart.NewWriter(body_buf)
h := make(textproto.MIMEHeader)
@ -59,6 +59,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
if jwt != "" {
h.Set("Authorization", "BEARER "+string(jwt))
}
file_writer, cp_err := body_writer.CreatePart(h)
if cp_err != nil {
glog.V(0).Infoln("error creating form file", cp_err.Error())
@ -73,7 +74,17 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
glog.V(0).Infoln("error closing body", err)
return nil, err
}
resp, post_err := client.Post(uploadUrl, content_type, body_buf)
req, postErr := http.NewRequest("POST", uploadUrl, body_buf)
if postErr != nil {
glog.V(0).Infoln("failing to upload to", uploadUrl, postErr.Error())
return nil, postErr
}
req.Header.Set("Content-Type", content_type)
for k, v := range pairMap {
req.Header.Set(k, v)
}
resp, post_err := client.Do(req)
if post_err != nil {
glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error())
return nil, post_err
@ -86,7 +97,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
var ret UploadResult
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
glog.V(0).Infoln("failing to read upload resonse", uploadUrl, string(resp_body))
glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body))
return nil, unmarshal_err
}
if ret.Error != "" {

6
weed/pb/Makefile

@ -0,0 +1,6 @@
all: gen
.PHONY : gen
gen:
protoc seaweed.proto --go_out=plugins=grpc:.

375
weed/pb/seaweed.pb.go

@ -0,0 +1,375 @@
// Code generated by protoc-gen-go.
// source: seaweed.proto
// DO NOT EDIT!
/*
Package pb is a generated protocol buffer package.
It is generated from these files:
seaweed.proto
It has these top-level messages:
Heartbeat
HeartbeatResponse
VolumeInformationMessage
*/
package pb
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Heartbeat struct {
Ip string `protobuf:"bytes,1,opt,name=ip" json:"ip,omitempty"`
Port uint32 `protobuf:"varint,2,opt,name=port" json:"port,omitempty"`
PublicUrl string `protobuf:"bytes,3,opt,name=public_url,json=publicUrl" json:"public_url,omitempty"`
MaxVolumeCount uint32 `protobuf:"varint,4,opt,name=max_volume_count,json=maxVolumeCount" json:"max_volume_count,omitempty"`
MaxFileKey uint64 `protobuf:"varint,5,opt,name=max_file_key,json=maxFileKey" json:"max_file_key,omitempty"`
DataCenter string `protobuf:"bytes,6,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"`
Rack string `protobuf:"bytes,7,opt,name=rack" json:"rack,omitempty"`
AdminPort uint32 `protobuf:"varint,8,opt,name=admin_port,json=adminPort" json:"admin_port,omitempty"`
Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"`
}
func (m *Heartbeat) Reset() { *m = Heartbeat{} }
func (m *Heartbeat) String() string { return proto.CompactTextString(m) }
func (*Heartbeat) ProtoMessage() {}
func (*Heartbeat) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *Heartbeat) GetIp() string {
if m != nil {
return m.Ip
}
return ""
}
func (m *Heartbeat) GetPort() uint32 {
if m != nil {
return m.Port
}
return 0
}
func (m *Heartbeat) GetPublicUrl() string {
if m != nil {
return m.PublicUrl
}
return ""
}
func (m *Heartbeat) GetMaxVolumeCount() uint32 {
if m != nil {
return m.MaxVolumeCount
}
return 0
}
func (m *Heartbeat) GetMaxFileKey() uint64 {
if m != nil {
return m.MaxFileKey
}
return 0
}
func (m *Heartbeat) GetDataCenter() string {
if m != nil {
return m.DataCenter
}
return ""
}
func (m *Heartbeat) GetRack() string {
if m != nil {
return m.Rack
}
return ""
}
func (m *Heartbeat) GetAdminPort() uint32 {
if m != nil {
return m.AdminPort
}
return 0
}
func (m *Heartbeat) GetVolumes() []*VolumeInformationMessage {
if m != nil {
return m.Volumes
}
return nil
}
type HeartbeatResponse struct {
VolumeSizeLimit uint64 `protobuf:"varint,1,opt,name=volumeSizeLimit" json:"volumeSizeLimit,omitempty"`
SecretKey string `protobuf:"bytes,2,opt,name=secretKey" json:"secretKey,omitempty"`
}
func (m *HeartbeatResponse) Reset() { *m = HeartbeatResponse{} }
func (m *HeartbeatResponse) String() string { return proto.CompactTextString(m) }
func (*HeartbeatResponse) ProtoMessage() {}
func (*HeartbeatResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *HeartbeatResponse) GetVolumeSizeLimit() uint64 {
if m != nil {
return m.VolumeSizeLimit
}
return 0
}
func (m *HeartbeatResponse) GetSecretKey() string {
if m != nil {
return m.SecretKey
}
return ""
}
type VolumeInformationMessage struct {
Id uint32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"`
Size uint64 `protobuf:"varint,2,opt,name=size" json:"size,omitempty"`
Collection string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"`
FileCount uint64 `protobuf:"varint,4,opt,name=file_count,json=fileCount" json:"file_count,omitempty"`
DeleteCount uint64 `protobuf:"varint,5,opt,name=delete_count,json=deleteCount" json:"delete_count,omitempty"`
DeletedByteCount uint64 `protobuf:"varint,6,opt,name=deleted_byte_count,json=deletedByteCount" json:"deleted_byte_count,omitempty"`
ReadOnly bool `protobuf:"varint,7,opt,name=read_only,json=readOnly" json:"read_only,omitempty"`
ReplicaPlacement uint32 `protobuf:"varint,8,opt,name=replica_placement,json=replicaPlacement" json:"replica_placement,omitempty"`
Version uint32 `protobuf:"varint,9,opt,name=version" json:"version,omitempty"`
Ttl uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"`
}
func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} }
func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) }
func (*VolumeInformationMessage) ProtoMessage() {}
func (*VolumeInformationMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *VolumeInformationMessage) GetId() uint32 {
if m != nil {
return m.Id
}
return 0
}
func (m *VolumeInformationMessage) GetSize() uint64 {
if m != nil {
return m.Size
}
return 0
}
func (m *VolumeInformationMessage) GetCollection() string {
if m != nil {
return m.Collection
}
return ""
}
func (m *VolumeInformationMessage) GetFileCount() uint64 {
if m != nil {
return m.FileCount
}
return 0
}
func (m *VolumeInformationMessage) GetDeleteCount() uint64 {
if m != nil {
return m.DeleteCount
}
return 0
}
func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 {
if m != nil {
return m.DeletedByteCount
}
return 0
}
func (m *VolumeInformationMessage) GetReadOnly() bool {
if m != nil {
return m.ReadOnly
}
return false
}
func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 {
if m != nil {
return m.ReplicaPlacement
}
return 0
}
func (m *VolumeInformationMessage) GetVersion() uint32 {
if m != nil {
return m.Version
}
return 0
}
func (m *VolumeInformationMessage) GetTtl() uint32 {
if m != nil {
return m.Ttl
}
return 0
}
func init() {
proto.RegisterType((*Heartbeat)(nil), "pb.Heartbeat")
proto.RegisterType((*HeartbeatResponse)(nil), "pb.HeartbeatResponse")
proto.RegisterType((*VolumeInformationMessage)(nil), "pb.VolumeInformationMessage")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for Seaweed service
type SeaweedClient interface {
SendHeartbeat(ctx context.Context, opts ...grpc.CallOption) (Seaweed_SendHeartbeatClient, error)
}
type seaweedClient struct {
cc *grpc.ClientConn
}
func NewSeaweedClient(cc *grpc.ClientConn) SeaweedClient {
return &seaweedClient{cc}
}
func (c *seaweedClient) SendHeartbeat(ctx context.Context, opts ...grpc.CallOption) (Seaweed_SendHeartbeatClient, error) {
stream, err := grpc.NewClientStream(ctx, &_Seaweed_serviceDesc.Streams[0], c.cc, "/pb.Seaweed/SendHeartbeat", opts...)
if err != nil {
return nil, err
}
x := &seaweedSendHeartbeatClient{stream}
return x, nil
}
type Seaweed_SendHeartbeatClient interface {
Send(*Heartbeat) error
Recv() (*HeartbeatResponse, error)
grpc.ClientStream
}
type seaweedSendHeartbeatClient struct {
grpc.ClientStream
}
func (x *seaweedSendHeartbeatClient) Send(m *Heartbeat) error {
return x.ClientStream.SendMsg(m)
}
func (x *seaweedSendHeartbeatClient) Recv() (*HeartbeatResponse, error) {
m := new(HeartbeatResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Server API for Seaweed service
type SeaweedServer interface {
SendHeartbeat(Seaweed_SendHeartbeatServer) error
}
func RegisterSeaweedServer(s *grpc.Server, srv SeaweedServer) {
s.RegisterService(&_Seaweed_serviceDesc, srv)
}
func _Seaweed_SendHeartbeat_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(SeaweedServer).SendHeartbeat(&seaweedSendHeartbeatServer{stream})
}
type Seaweed_SendHeartbeatServer interface {
Send(*HeartbeatResponse) error
Recv() (*Heartbeat, error)
grpc.ServerStream
}
type seaweedSendHeartbeatServer struct {
grpc.ServerStream
}
func (x *seaweedSendHeartbeatServer) Send(m *HeartbeatResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *seaweedSendHeartbeatServer) Recv() (*Heartbeat, error) {
m := new(Heartbeat)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _Seaweed_serviceDesc = grpc.ServiceDesc{
ServiceName: "pb.Seaweed",
HandlerType: (*SeaweedServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "SendHeartbeat",
Handler: _Seaweed_SendHeartbeat_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "seaweed.proto",
}
func init() { proto.RegisterFile("seaweed.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 489 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x93, 0xdf, 0x8a, 0xd3, 0x40,
0x14, 0xc6, 0x4d, 0x1a, 0xdb, 0xe6, 0x74, 0xbb, 0x76, 0x07, 0x84, 0x41, 0x57, 0x8d, 0xbd, 0x0a,
0x28, 0x45, 0x56, 0xf0, 0xc6, 0x3b, 0x17, 0x16, 0x65, 0x15, 0x97, 0x29, 0x7a, 0xe3, 0x45, 0x98,
0x24, 0x67, 0x65, 0xd8, 0xc9, 0x1f, 0x26, 0xd3, 0xb5, 0xd9, 0x07, 0xf2, 0x49, 0x7c, 0x30, 0x99,
0x33, 0x4d, 0xab, 0x82, 0x77, 0xe7, 0xfc, 0xce, 0x97, 0xe4, 0x9b, 0xf3, 0x4d, 0x60, 0xde, 0xa1,
0xfc, 0x81, 0x58, 0xae, 0x5a, 0xd3, 0xd8, 0x86, 0x85, 0x6d, 0xbe, 0xfc, 0x19, 0x42, 0xfc, 0x1e,
0xa5, 0xb1, 0x39, 0x4a, 0xcb, 0x8e, 0x21, 0x54, 0x2d, 0x0f, 0x92, 0x20, 0x8d, 0x45, 0xa8, 0x5a,
0xc6, 0x20, 0x6a, 0x1b, 0x63, 0x79, 0x98, 0x04, 0xe9, 0x5c, 0x50, 0xcd, 0x9e, 0x00, 0xb4, 0x9b,
0x5c, 0xab, 0x22, 0xdb, 0x18, 0xcd, 0x47, 0xa4, 0x8d, 0x3d, 0xf9, 0x62, 0x34, 0x4b, 0x61, 0x51,
0xc9, 0x6d, 0x76, 0xdb, 0xe8, 0x4d, 0x85, 0x59, 0xd1, 0x6c, 0x6a, 0xcb, 0x23, 0x7a, 0xfc, 0xb8,
0x92, 0xdb, 0xaf, 0x84, 0xcf, 0x1d, 0x65, 0x09, 0x1c, 0x39, 0xe5, 0xb5, 0xd2, 0x98, 0xdd, 0x60,
0xcf, 0xef, 0x27, 0x41, 0x1a, 0x09, 0xa8, 0xe4, 0xf6, 0x42, 0x69, 0xbc, 0xc4, 0x9e, 0x3d, 0x83,
0x59, 0x29, 0xad, 0xcc, 0x0a, 0xac, 0x2d, 0x1a, 0x3e, 0xa6, 0x6f, 0x81, 0x43, 0xe7, 0x44, 0x9c,
0x3f, 0x23, 0x8b, 0x1b, 0x3e, 0xa1, 0x09, 0xd5, 0xce, 0x9f, 0x2c, 0x2b, 0x55, 0x67, 0xe4, 0x7c,
0x4a, 0x9f, 0x8e, 0x89, 0x5c, 0x39, 0xfb, 0x6f, 0x60, 0xe2, 0xbd, 0x75, 0x3c, 0x4e, 0x46, 0xe9,
0xec, 0xec, 0x74, 0xd5, 0xe6, 0x2b, 0xef, 0xeb, 0x43, 0x7d, 0xdd, 0x98, 0x4a, 0x5a, 0xd5, 0xd4,
0x9f, 0xb0, 0xeb, 0xe4, 0x77, 0x14, 0x83, 0x78, 0xf9, 0x0d, 0x4e, 0xf6, 0x7b, 0x12, 0xd8, 0xb5,
0x4d, 0xdd, 0x21, 0x4b, 0xe1, 0x81, 0x9f, 0xaf, 0xd5, 0x1d, 0x7e, 0x54, 0x95, 0xb2, 0xb4, 0xbc,
0x48, 0xfc, 0x8b, 0xd9, 0x29, 0xc4, 0x1d, 0x16, 0x06, 0xed, 0x25, 0xf6, 0xb4, 0xce, 0x58, 0x1c,
0xc0, 0xf2, 0x57, 0x08, 0xfc, 0x7f, 0x16, 0x28, 0x94, 0x92, 0xde, 0x3b, 0x17, 0xa1, 0x2a, 0xdd,
0xa1, 0x3b, 0x75, 0x87, 0xf4, 0x96, 0x48, 0x50, 0xcd, 0x9e, 0x02, 0x14, 0x8d, 0xd6, 0x58, 0xb8,
0x07, 0x77, 0xa1, 0xfc, 0x41, 0xdc, 0x52, 0x68, 0xcf, 0x87, 0x3c, 0x22, 0x11, 0x3b, 0xe2, 0xa3,
0x78, 0x0e, 0x47, 0x25, 0x6a, 0xb4, 0x83, 0xc0, 0x47, 0x31, 0xf3, 0xcc, 0x4b, 0x5e, 0x02, 0xf3,
0x6d, 0x99, 0xe5, 0xfd, 0x5e, 0x38, 0x26, 0xe1, 0x62, 0x37, 0x79, 0xd7, 0x0f, 0xea, 0xc7, 0x10,
0x1b, 0x94, 0x65, 0xd6, 0xd4, 0xba, 0xa7, 0x74, 0xa6, 0x62, 0xea, 0xc0, 0xe7, 0x5a, 0xf7, 0xec,
0x05, 0x9c, 0x18, 0x6c, 0xb5, 0x2a, 0x64, 0xd6, 0x6a, 0x59, 0x60, 0x85, 0xf5, 0x10, 0xd4, 0x62,
0x37, 0xb8, 0x1a, 0x38, 0xe3, 0x30, 0xb9, 0x45, 0xd3, 0xb9, 0x63, 0xc5, 0x24, 0x19, 0x5a, 0xb6,
0x80, 0x91, 0xb5, 0x9a, 0x03, 0x51, 0x57, 0x9e, 0x5d, 0xc0, 0x64, 0xed, 0x6f, 0x38, 0x7b, 0x0b,
0xf3, 0x35, 0xd6, 0xe5, 0xe1, 0x6a, 0xcf, 0x5d, 0xcc, 0xfb, 0xf6, 0xd1, 0xc3, 0xbf, 0xda, 0x21,
0xd0, 0xe5, 0xbd, 0x34, 0x78, 0x15, 0xe4, 0x63, 0xfa, 0x3f, 0x5e, 0xff, 0x0e, 0x00, 0x00, 0xff,
0xff, 0xd5, 0x08, 0xa6, 0xf2, 0x30, 0x03, 0x00, 0x00,
}

40
weed/pb/seaweed.proto

@ -0,0 +1,40 @@
syntax = "proto3";
package pb;
//////////////////////////////////////////////////
service Seaweed {
rpc SendHeartbeat(stream Heartbeat) returns (stream HeartbeatResponse) {}
}
//////////////////////////////////////////////////
message Heartbeat {
string ip = 1;
uint32 port = 2;
string public_url = 3;
uint32 max_volume_count = 4;
uint64 max_file_key = 5;
string data_center = 6;
string rack = 7;
uint32 admin_port = 8;
repeated VolumeInformationMessage volumes = 9;
}
message HeartbeatResponse {
uint64 volumeSizeLimit = 1;
string secretKey = 2;
}
message VolumeInformationMessage {
uint32 id = 1;
uint64 size = 2;
string collection = 3;
uint64 file_count = 4;
uint64 delete_count = 5;
uint64 deleted_byte_count = 6;
bool read_only = 7;
uint32 replica_placement = 8;
uint32 version = 9;
uint32 ttl = 10;
}

4
weed/proto/Makefile

@ -1,4 +0,0 @@
TARG=../operation
all:
protoc --go_out=$(TARG) system_message.proto

27
weed/proto/system_message.proto

@ -1,27 +0,0 @@
package operation;
message VolumeInformationMessage {
required uint32 id = 1;
required uint64 size = 2;
optional string collection = 3;
required uint64 file_count = 4;
required uint64 delete_count = 5;
required uint64 deleted_byte_count = 6;
optional bool read_only = 7;
required uint32 replica_placement = 8;
optional uint32 version = 9 [default=2];
optional uint32 ttl = 10;
}
message JoinMessage {
optional bool is_init = 1;
required string ip = 2;
required uint32 port = 3;
optional string public_url = 4;
required uint32 max_volume_count = 5;
required uint64 max_file_key = 6;
optional string data_center = 7;
optional string rack = 8;
repeated VolumeInformationMessage volumes = 9;
optional uint32 admin_port = 10;
}

4
weed/server/common.go

@ -86,7 +86,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("parsing upload file...")
fname, data, mimeType, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r)
fname, data, mimeType, pairMap, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r)
if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe)
return
@ -112,7 +112,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("upload file to store", url)
uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, jwt)
uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, jwt)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return

7
weed/server/filer_server.go

@ -25,7 +25,7 @@ import (
type filerConf struct {
MysqlConf []mysql_store.MySqlConf `json:"mysql"`
mysql_store.ShardingConf
PostgresConf []postgres_store.PostgresConf `json:"postgres"`
PostgresConf *postgres_store.PostgresConf `json:"postgres"`
}
func parseConfFile(confPath string) (*filerConf, error) {
@ -88,9 +88,8 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 {
mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardCount)
fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store)
} else if setting.PostgresConf != nil && len(setting.PostgresConf) != 0 {
postgres_store := postgres_store.NewPostgresStore(setting.PostgresConf, setting.IsSharding, setting.ShardCount)
fs.filer = flat_namespace.NewFlatNamespaceFiler(master, postgres_store)
} else if setting.PostgresConf != nil {
fs.filer = postgres_store.NewPostgresStore(master, *setting.PostgresConf)
} else if cassandra_server != "" {
cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
if err != nil {

2
weed/server/filer_server_handlers_read.go

@ -83,7 +83,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
shouldDisplayLoadMore,
}
if strings.ToLower(r.Header.Get("Content-Type")) == "application/json" {
if r.Header.Get("Accept") == "application/json" {
writeJsonQuiet(w, r, http.StatusOK, args)
} else {
ui.StatusTpl.Execute(w, args)

8
weed/server/filer_server_handlers_write.go

@ -13,6 +13,8 @@ import (
"net/http"
"net/textproto"
"net/url"
"path"
"strconv"
"strings"
"github.com/chrislusf/seaweedfs/weed/filer"
@ -20,8 +22,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
"path"
"strconv"
)
type FilerPostResult struct {
@ -112,7 +112,7 @@ func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Re
if r.Method == "PUT" {
buf, _ := ioutil.ReadAll(r.Body)
r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r)
fileName, _, _, _, _, _, _, _, pe := storage.ParseUpload(r)
if pe != nil {
glog.V(0).Infoln("failing to parse post body", pe.Error())
writeJsonError(w, r, http.StatusInternalServerError, pe)
@ -521,7 +521,7 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
err = nil
ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf))
uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId))
uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, fs.jwt(fileId))
if uploadResult != nil {
glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size)
}

69
weed/server/master_grpc_server.go

@ -0,0 +1,69 @@
package weed_server
import (
"net"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/topology"
"google.golang.org/grpc/peer"
)
func (ms MasterServer) SendHeartbeat(stream pb.Seaweed_SendHeartbeatServer) error {
var dn *topology.DataNode
t := ms.Topo
for {
heartbeat, err := stream.Recv()
if err == nil {
if dn == nil {
t.Sequence.SetMax(heartbeat.MaxFileKey)
if heartbeat.Ip == "" {
if pr, ok := peer.FromContext(stream.Context()); ok {
if pr.Addr != net.Addr(nil) {
heartbeat.Ip = pr.Addr.String()[0:strings.LastIndex(pr.Addr.String(), ":")]
glog.V(0).Infof("remote IP address is detected as %v", heartbeat.Ip)
}
}
}
dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn = rack.GetOrCreateDataNode(heartbeat.Ip,
int(heartbeat.Port), heartbeat.PublicUrl,
int(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
SecretKey: string(ms.guard.SecretKey),
}); err != nil {
return err
}
}
var volumeInfos []storage.VolumeInfo
for _, v := range heartbeat.Volumes {
if vi, err := storage.NewVolumeInfo(v); err == nil {
volumeInfos = append(volumeInfos, vi)
} else {
glog.V(0).Infof("Fail to convert joined volume information: %v", err)
}
}
deletedVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
}
for _, v := range deletedVolumes {
t.UnRegisterVolumeLayout(v, dn)
}
} else {
glog.V(0).Infof("lost volume server %s:%d", dn.Ip, dn.Port)
if dn != nil {
t.UnRegisterDataNode(dn)
}
return err
}
}
}

9
weed/server/master_server.go

@ -20,6 +20,7 @@ type MasterServer struct {
port int
metaFolder string
volumeSizeLimitMB uint
preallocate int64
pulseSeconds int
defaultReplicaPlacement string
garbageThreshold string
@ -34,6 +35,7 @@ type MasterServer struct {
func NewMasterServer(r *mux.Router, port int, metaFolder string,
volumeSizeLimitMB uint,
preallocate bool,
pulseSeconds int,
confFile string,
defaultReplicaPlacement string,
@ -41,9 +43,15 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
whiteList []string,
secureKey string,
) *MasterServer {
var preallocateSize int64
if preallocate {
preallocateSize = int64(volumeSizeLimitMB) * (1 << 20)
}
ms := &MasterServer{
port: port,
volumeSizeLimitMB: volumeSizeLimitMB,
preallocate: preallocateSize,
pulseSeconds: pulseSeconds,
defaultReplicaPlacement: defaultReplicaPlacement,
garbageThreshold: garbageThreshold,
@ -64,7 +72,6 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
r.HandleFunc("/dir/join", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoinHandler)))
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeLookupHandler)))

44
weed/server/master_server_handlers_admin.go

@ -1,21 +1,16 @@
package weed_server
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"strconv"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
)
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
@ -34,37 +29,6 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
ms.Topo.DeleteCollection(r.FormValue("collection"))
}
func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
joinMessage := &operation.JoinMessage{}
if err = proto.Unmarshal(body, joinMessage); err != nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
if *joinMessage.Ip == "" {
*joinMessage.Ip = r.RemoteAddr[0:strings.LastIndex(r.RemoteAddr, ":")]
}
if glog.V(4) {
if jsonData, jsonError := json.Marshal(joinMessage); jsonError != nil {
glog.V(0).Infoln("json marshaling error: ", jsonError)
writeJsonError(w, r, http.StatusBadRequest, jsonError)
return
} else {
glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData))
}
}
ms.Topo.ProcessJoinMessage(joinMessage)
writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{
VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
SecretKey: string(ms.guard.SecretKey),
})
}
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION
@ -181,10 +145,18 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
preallocate := ms.preallocate
if r.FormValue("preallocate") != "" {
preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64)
if err != nil {
return nil, fmt.Errorf("Failed to parse int64 preallocate = %s: %v", r.FormValue("preallocate"), err)
}
}
volumeGrowOption := &topology.VolumeGrowOption{
Collection: r.FormValue("collection"),
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
Prealloacte: preallocate,
DataCenter: r.FormValue("dataCenter"),
Rack: r.FormValue("rack"),
DataNode: r.FormValue("dataNode"),

88
weed/server/volume_grpc_client.go

@ -0,0 +1,88 @@
package weed_server
import (
"fmt"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
func (vs *VolumeServer) heartbeat() {
glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode())
vs.masterNodes = storage.NewMasterNodes(vs.masterNode)
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
for {
err := vs.doHeartbeat(time.Duration(vs.pulseSeconds) * time.Second)
if err != nil {
glog.V(0).Infof("heartbeat error: %v", err)
time.Sleep(time.Duration(3*vs.pulseSeconds) * time.Second)
}
}
}
func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error {
masterNode, err := vs.masterNodes.FindMaster()
if err != nil {
return fmt.Errorf("No master found: %v", err)
}
grpcConection, err := grpc.Dial(masterNode, grpc.WithInsecure())
if err != nil {
return fmt.Errorf("fail to dial: %v", err)
}
defer grpcConection.Close()
client := pb.NewSeaweedClient(grpcConection)
stream, err := client.SendHeartbeat(context.Background())
if err != nil {
glog.V(0).Infof("%v.SendHeartbeat(_) = _, %v", client, err)
return err
}
vs.SetMasterNode(masterNode)
glog.V(0).Infof("Heartbeat to %s", masterNode)
vs.store.Client = stream
defer func() { vs.store.Client = nil }()
doneChan := make(chan error, 1)
go func() {
for {
in, err := stream.Recv()
if err != nil {
doneChan <- err
return
}
vs.store.VolumeSizeLimit = in.GetVolumeSizeLimit()
vs.guard.SecretKey = security.Secret(in.GetSecretKey())
}
}()
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
return err
}
tickChan := time.NewTimer(sleepInterval).C
for {
select {
case <-tickChan:
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
return err
}
case err := <-doneChan:
return err
}
}
}

34
weed/server/volume_server.go

@ -1,10 +1,8 @@
package weed_server
import (
"math/rand"
"net/http"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
@ -19,6 +17,7 @@ type VolumeServer struct {
rack string
store *storage.Store
guard *security.Guard
masterNodes *storage.MasterNodes
needleMapKind storage.NeedleMapType
FixJpgOrientation bool
@ -70,36 +69,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
publicMux.HandleFunc("/", vs.publicReadOnlyHandler)
}
go func() {
connected := true
glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode())
vs.store.SetBootstrapMaster(vs.GetMasterNode())
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
for {
glog.V(4).Infof("Volume server sending to master %s", vs.GetMasterNode())
master, secretKey, err := vs.store.SendHeartbeatToMaster()
if err == nil {
if !connected {
connected = true
vs.SetMasterNode(master)
vs.guard.SecretKey = secretKey
glog.V(0).Infoln("Volume Server Connected with master at", master)
}
} else {
glog.V(1).Infof("Volume Server Failed to talk with master %s: %v", vs.masterNode, err)
if connected {
connected = false
}
}
if connected {
time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond)
} else {
time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*0.25) * time.Millisecond)
}
}
}()
go vs.heartbeat()
return vs
}

23
weed/server/volume_server_handlers_admin.go

@ -3,6 +3,7 @@ package weed_server
import (
"net/http"
"path/filepath"
"strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/stats"
@ -17,13 +18,29 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
}
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("replication"), r.FormValue("ttl"))
var err error
preallocate := int64(0)
if r.FormValue("preallocate") != "" {
preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64)
if err != nil {
glog.V(0).Infoln("ignoring invalid int64 value for preallocate = %v", r.FormValue("preallocate"))
}
}
err = vs.store.AddVolume(
r.FormValue("volume"),
r.FormValue("collection"),
vs.needleMapKind,
r.FormValue("replication"),
r.FormValue("ttl"),
preallocate,
)
if err == nil {
writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
} else {
writeJsonError(w, r, http.StatusNotAcceptable, err)
}
glog.V(2).Infoln("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replication =", r.FormValue("replication"), ", error =", err)
glog.V(2).Infoln("assign volume = %s, collection = %s , replication = %s, error = %v",
r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), err)
}
func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.Request) {
@ -33,7 +50,7 @@ func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.R
} else {
writeJsonError(w, r, http.StatusInternalServerError, err)
}
glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err)
glog.V(2).Infof("deleting collection = %s, error = %v", r.FormValue("collection"), err)
}
func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) {

13
weed/server/volume_server_handlers_read.go

@ -16,6 +16,8 @@ import (
"strings"
"time"
"encoding/json"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation"
@ -105,6 +107,17 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
w.Header().Set("Etag", etag)
if n.HasPairs() {
pairMap := make(map[string]string)
err = json.Unmarshal(n.Pairs, &pairMap)
if err != nil {
glog.V(0).Infoln("Unmarshal pairs error:", err)
}
for k, v := range pairMap {
w.Header().Set(k, v)
}
}
if vs.tryHandleChunkedFile(n, filename, w, r) {
return
}

9
weed/storage/disk_location.go

@ -36,11 +36,16 @@ func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleM
_, found := l.volumes[vid]
mutex.RUnlock()
if !found {
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil {
if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0); e == nil {
mutex.Lock()
l.volumes[vid] = v
mutex.Unlock()
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
if v.Size() != v.dataFileSize {
glog.V(0).Infof("data file %s, size=%d expected=%d",
l.Directory+"/"+name, v.Size(), v.dataFileSize)
}
} else {
glog.V(0).Infof("new volume %s error %s", name, e)
}

47
weed/storage/needle.go

@ -1,8 +1,10 @@
package storage
import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"mime"
"net/http"
"path"
@ -20,6 +22,8 @@ const (
NeedlePaddingSize = 8
NeedleChecksumSize = 4
MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
TombstoneFileSize = math.MaxUint32
PairNamePrefix = "Seaweed-"
)
/*
@ -38,6 +42,8 @@ type Needle struct {
Name []byte `comment:"maximum 256 characters"` //version2
MimeSize uint8 //version2
Mime []byte `comment:"maximum 256 characters"` //version2
PairsSize uint16 //version2
Pairs []byte `comment:"additional name value pairs, json format, maximum 64kB"`
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
Ttl *TTL
@ -53,8 +59,15 @@ func (n *Needle) String() (str string) {
}
func ParseUpload(r *http.Request) (
fileName string, data []byte, mimeType string, isGzipped bool,
fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool,
modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) {
pairMap = make(map[string]string)
for k, v := range r.Header {
if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) {
pairMap[k] = v[0]
}
}
form, fe := r.MultipartReader()
if fe != nil {
glog.V(0).Infoln("MultipartReader [ERROR]", fe)
@ -106,6 +119,10 @@ func ParseUpload(r *http.Request) (
}
}
isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm"))
if !isChunkedFile {
dotIndex := strings.LastIndex(fileName, ".")
ext, mtype := "", ""
if dotIndex > 0 {
@ -117,6 +134,7 @@ func ParseUpload(r *http.Request) (
mimeType = contentType //only return mime type if not deductable
mtype = contentType
}
if part.Header.Get("Content-Encoding") == "gzip" {
isGzipped = true
} else if operation.IsGzippable(ext, mtype) {
@ -126,21 +144,25 @@ func ParseUpload(r *http.Request) (
isGzipped = true
}
if ext == ".gz" {
if strings.HasSuffix(fileName, ".css.gz") ||
strings.HasSuffix(fileName, ".html.gz") ||
strings.HasSuffix(fileName, ".txt.gz") ||
strings.HasSuffix(fileName, ".js.gz") {
fileName = fileName[:len(fileName)-3]
isGzipped = true
}
if strings.HasSuffix(fileName, ".gz") &&
!strings.HasSuffix(fileName, ".tar.gz") {
fileName = fileName[:len(fileName)-3]
}
}
modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
ttl, _ = ReadTTL(r.FormValue("ttl"))
isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm"))
return
}
func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
var pairMap map[string]string
fname, mimeType, isGzipped, isChunkedFile := "", "", false, false
n = new(Needle)
fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r)
fname, n.Data, mimeType, pairMap, isGzipped, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r)
if e != nil {
return
}
@ -152,6 +174,19 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
n.Mime = []byte(mimeType)
n.SetHasMime()
}
if len(pairMap) != 0 {
trimmedPairMap := make(map[string]string)
for k, v := range pairMap {
trimmedPairMap[k[len(PairNamePrefix):]] = v
}
pairs, _ := json.Marshal(trimmedPairMap)
if len(pairs) < 65536 {
n.Pairs = pairs
n.PairsSize = uint16(len(pairs))
n.SetHasPairs()
}
}
if isGzipped {
n.SetGzipped()
}

2
weed/storage/needle_map.go

@ -24,7 +24,7 @@ const (
type NeedleMapper interface {
Put(key uint64, offset uint32, size uint32) error
Get(key uint64) (element *NeedleValue, ok bool)
Delete(key uint64) error
Delete(key uint64, offset uint32) error
Close()
Destroy() error
ContentSize() uint64

6
weed/storage/needle_map_boltdb.go

@ -63,7 +63,7 @@ func generateBoltDbFile(dbFileName string, indexFile *os.File) error {
}
defer db.Close()
return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error {
if offset > 0 {
if offset > 0 && size != TombstoneFileSize {
boltDbWrite(db, key, offset, size)
} else {
boltDbDelete(db, key)
@ -143,12 +143,12 @@ func boltDbDelete(db *bolt.DB, key uint64) error {
})
}
func (m *BoltDbNeedleMap) Delete(key uint64) error {
func (m *BoltDbNeedleMap) Delete(key uint64, offset uint32) error {
if oldNeedle, ok := m.Get(key); ok {
m.logDelete(oldNeedle.Size)
}
// write to index file first
if err := m.appendToIndexFile(key, 0, 0); err != nil {
if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
return err
}
return boltDbDelete(m.db, key)

6
weed/storage/needle_map_leveldb.go

@ -61,7 +61,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
}
defer db.Close()
return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error {
if offset > 0 {
if offset > 0 && size != TombstoneFileSize {
levelDbWrite(db, key, offset, size)
} else {
levelDbDelete(db, key)
@ -112,12 +112,12 @@ func levelDbDelete(db *leveldb.DB, key uint64) error {
return db.Delete(bytes, nil)
}
func (m *LevelDbNeedleMap) Delete(key uint64) error {
func (m *LevelDbNeedleMap) Delete(key uint64, offset uint32) error {
if oldNeedle, ok := m.Get(key); ok {
m.logDelete(oldNeedle.Size)
}
// write to index file first
if err := m.appendToIndexFile(key, 0, 0); err != nil {
if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
return err
}
return levelDbDelete(m.db, key)

6
weed/storage/needle_map_memory.go

@ -33,7 +33,7 @@ func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
}
nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
if offset > 0 {
if offset > 0 && size != TombstoneFileSize {
oldSize := nm.m.Set(Key(key), offset, size)
glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
if oldSize > 0 {
@ -92,10 +92,10 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
element, ok = nm.m.Get(Key(key))
return
}
func (nm *NeedleMap) Delete(key uint64) error {
func (nm *NeedleMap) Delete(key uint64, offset uint32) error {
deletedBytes := nm.m.Delete(Key(key))
nm.logDelete(deletedBytes)
return nm.appendToIndexFile(key, 0, 0)
return nm.appendToIndexFile(key, offset, TombstoneFileSize)
}
func (nm *NeedleMap) Close() {
_ = nm.indexFile.Close()

44
weed/storage/needle_read_write.go

@ -16,16 +16,17 @@ const (
FlagHasMime = 0x04
FlagHasLastModifiedDate = 0x08
FlagHasTtl = 0x10
FlagHasPairs = 0x20
FlagIsChunkManifest = 0x80
LastModifiedBytesLength = 5
TtlBytesLength = 2
)
func (n *Needle) DiskSize() int64 {
padding := NeedlePaddingSize - ((NeedleHeaderSize + int64(n.Size) + NeedleChecksumSize) % NeedlePaddingSize)
return NeedleHeaderSize + int64(n.Size) + padding + NeedleChecksumSize
return getActualSize(n.Size)
}
func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize int64, err error) {
if s, ok := w.(io.Seeker); ok {
if end, e := s.Seek(0, 1); e == nil {
defer func(s io.Seeker, off int64) {
@ -54,6 +55,7 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
if _, err = w.Write(n.Data); err != nil {
return
}
actualSize = NeedleHeaderSize + int64(n.Size)
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
_, err = w.Write(header[0 : NeedleChecksumSize+padding])
@ -77,6 +79,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
if n.HasTtl() {
n.Size = n.Size + TtlBytesLength
}
if n.HasPairs() {
n.Size += 2 + uint32(n.PairsSize)
}
} else {
n.Size = 0
}
@ -127,19 +132,27 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
return
}
}
if n.HasPairs() {
util.Uint16toBytes(header[0:2], n.PairsSize)
if _, err = w.Write(header[0:2]); err != nil {
return
}
if _, err = w.Write(n.Pairs); err != nil {
return
}
}
}
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
_, err = w.Write(header[0 : NeedleChecksumSize+padding])
return n.DataSize, err
return n.DataSize, getActualSize(n.Size), err
}
return 0, fmt.Errorf("Unsupported Version! (%d)", version)
return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) {
padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
readSize := NeedleHeaderSize + size + NeedleChecksumSize + padding
return getBytesForFileBlock(r, offset, int(readSize))
return getBytesForFileBlock(r, offset, int(getActualSize(size)))
}
func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
@ -209,6 +222,13 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) {
n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
index = index + TtlBytesLength
}
if index < lenBytes && n.HasPairs() {
n.PairsSize = util.BytesToUint16(bytes[index : index+2])
index += 2
end := index + int(n.PairsSize)
n.Pairs = bytes[index:end]
index = end
}
}
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
@ -292,3 +312,11 @@ func (n *Needle) IsChunkedManifest() bool {
func (n *Needle) SetIsChunkManifest() {
n.Flags = n.Flags | FlagIsChunkManifest
}
func (n *Needle) HasPairs() bool {
return n.Flags&FlagHasPairs != 0
}
func (n *Needle) SetHasPairs() {
n.Flags = n.Flags | FlagHasPairs
}

111
weed/storage/store.go

@ -1,7 +1,6 @@
package storage
import (
"encoding/json"
"errors"
"fmt"
"math/rand"
@ -10,9 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/pb"
)
const (
@ -76,12 +73,12 @@ type Store struct {
dataCenter string //optional informaton, overwriting master setting if exists
rack string //optional information, overwriting master setting if exists
connected bool
volumeSizeLimit uint64 //read from the master
masterNodes *MasterNodes
VolumeSizeLimit uint64 //read from the master
Client pb.Seaweed_SendHeartbeatClient
}
func (s *Store) String() (str string) {
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes)
str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.VolumeSizeLimit)
return
}
@ -95,7 +92,7 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
}
return
}
func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error {
func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
@ -111,7 +108,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl)
e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl, preallocate)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@ -123,7 +120,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil {
if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl, preallocate); err != nil {
e = err
}
}
@ -160,14 +157,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL, preallocate int64) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate); err == nil {
location.SetVolume(vid, volume)
return nil
} else {
@ -208,15 +205,8 @@ func (s *Store) SetRack(rack string) {
s.rack = rack
}
func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
s.masterNodes = NewMasterNodes(bootstrapMaster)
}
func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
masterNode, e = s.masterNodes.FindMaster()
if e != nil {
return
}
var volumeMessages []*operation.VolumeInformationMessage
func (s *Store) CollectHeartbeat() *pb.Heartbeat {
var volumeMessages []*pb.VolumeInformationMessage
maxVolumeCount := 0
var maxFileKey uint64
for _, location := range s.Locations {
@ -226,18 +216,18 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
if !v.expired(s.volumeSizeLimit) {
volumeMessage := &operation.VolumeInformationMessage{
Id: proto.Uint32(uint32(k)),
Size: proto.Uint64(uint64(v.Size())),
Collection: proto.String(v.Collection),
FileCount: proto.Uint64(uint64(v.nm.FileCount())),
DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
ReadOnly: proto.Bool(v.readOnly),
ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
Version: proto.Uint32(uint32(v.Version())),
Ttl: proto.Uint32(v.Ttl.ToUint32()),
if !v.expired(s.VolumeSizeLimit) {
volumeMessage := &pb.VolumeInformationMessage{
Id: uint32(k),
Size: uint64(v.Size()),
Collection: v.Collection,
FileCount: uint64(v.nm.FileCount()),
DeleteCount: uint64(v.nm.DeletedCount()),
DeletedByteCount: v.nm.DeletedSize(),
ReadOnly: v.readOnly,
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
}
volumeMessages = append(volumeMessages, volumeMessage)
} else {
@ -252,45 +242,17 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
location.Unlock()
}
joinMessage := &operation.JoinMessage{
IsInit: proto.Bool(!s.connected),
Ip: proto.String(s.Ip),
Port: proto.Uint32(uint32(s.Port)),
PublicUrl: proto.String(s.PublicUrl),
MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)),
MaxFileKey: proto.Uint64(maxFileKey),
DataCenter: proto.String(s.dataCenter),
Rack: proto.String(s.rack),
return &pb.Heartbeat{
Ip: s.Ip,
Port: uint32(s.Port),
PublicUrl: s.PublicUrl,
MaxVolumeCount: uint32(maxVolumeCount),
MaxFileKey: maxFileKey,
DataCenter: s.dataCenter,
Rack: s.rack,
Volumes: volumeMessages,
}
data, err := proto.Marshal(joinMessage)
if err != nil {
return "", "", err
}
joinUrl := "http://" + masterNode + "/dir/join"
glog.V(4).Infof("Connecting to %s ...", joinUrl)
jsonBlob, err := util.PostBytes(joinUrl, data)
if err != nil {
s.masterNodes.Reset()
return "", "", err
}
var ret operation.JoinResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
s.masterNodes.Reset()
return masterNode, "", err
}
if ret.Error != "" {
s.masterNodes.Reset()
return masterNode, "", errors.New(ret.Error)
}
s.volumeSizeLimit = ret.VolumeSizeLimit
secretKey = security.Secret(ret.SecretKey)
s.connected = true
return
}
func (s *Store) Close() {
for _, location := range s.Locations {
@ -303,17 +265,20 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
err = fmt.Errorf("Volume %d is read only", i)
return
}
// TODO: count needle size ahead
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
size, err = v.writeNeedle(n)
} else {
err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.VolumeSizeLimit, v.ContentSize())
}
if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
if _, _, e := s.SendHeartbeatToMaster(); e != nil {
if s.VolumeSizeLimit < v.ContentSize()+3*uint64(size) {
glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.VolumeSizeLimit)
if s.Client != nil {
if e := s.Client.Send(s.CollectHeartbeat()); e != nil {
glog.V(0).Infoln("error when reporting size:", e)
}
}
}
return
}
glog.V(0).Infoln("volume", i, "not found!")

5
weed/storage/volume.go

@ -15,6 +15,7 @@ type Volume struct {
dir string
Collection string
dataFile *os.File
dataFileSize int64
nm NeedleMapper
needleMapKind NeedleMapType
readOnly bool
@ -28,11 +29,11 @@ type Volume struct {
lastCompactRevision uint16
}
func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL, preallocate int64) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.needleMapKind = needleMapKind
e = v.load(true, true, needleMapKind)
e = v.load(true, true, needleMapKind, preallocate)
return
}
func (v *Volume) String() string {

22
weed/storage/volume_checking.go

@ -7,27 +7,33 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error {
func getActualSize(size uint32) int64 {
padding := NeedlePaddingSize - ((NeedleHeaderSize + size + NeedleChecksumSize) % NeedlePaddingSize)
return NeedleHeaderSize + int64(size) + NeedleChecksumSize + int64(padding)
}
func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (int64, error) {
var indexSize int64
var e error
if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil {
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e)
return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e)
}
if indexSize == 0 {
return nil
return int64(SuperBlockSize), nil
}
var lastIdxEntry []byte
if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleIndexSize); e != nil {
return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
}
key, offset, size := idxFileEntry(lastIdxEntry)
if offset == 0 {
return nil
if offset == 0 || size == TombstoneFileSize {
return 0, nil
}
if e = verifyNeedleIntegrity(v.dataFile, v.Version(), int64(offset)*NeedlePaddingSize, key, size); e != nil {
return fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
return 0, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
}
return nil
return int64(offset)*int64(NeedlePaddingSize) + getActualSize(size), nil
}
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {

17
weed/storage/volume_create.go

@ -0,0 +1,17 @@
// +build !linux
package storage
import (
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
)
func createVolumeFile(fileName string, preallocate int64) (file *os.File, e error) {
file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644)
if preallocate > 0 {
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
}
return file, e
}

19
weed/storage/volume_create_linux.go

@ -0,0 +1,19 @@
// +build linux
package storage
import (
"os"
"syscall"
"github.com/chrislusf/seaweedfs/weed/glog"
)
func createVolumeFile(fileName string, preallocate int64) (file *os.File, e error) {
file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644)
if preallocate != 0 {
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName)
}
return file, e
}

25
weed/storage/volume_info.go

@ -2,8 +2,9 @@ package storage
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/operation"
"sort"
"github.com/chrislusf/seaweedfs/weed/pb"
)
type VolumeInfo struct {
@ -19,23 +20,23 @@ type VolumeInfo struct {
ReadOnly bool
}
func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err error) {
func NewVolumeInfo(m *pb.VolumeInformationMessage) (vi VolumeInfo, err error) {
vi = VolumeInfo{
Id: VolumeId(*m.Id),
Size: *m.Size,
Collection: *m.Collection,
FileCount: int(*m.FileCount),
DeleteCount: int(*m.DeleteCount),
DeletedByteCount: *m.DeletedByteCount,
ReadOnly: *m.ReadOnly,
Version: Version(*m.Version),
Id: VolumeId(m.Id),
Size: m.Size,
Collection: m.Collection,
FileCount: int(m.FileCount),
DeleteCount: int(m.DeleteCount),
DeletedByteCount: m.DeletedByteCount,
ReadOnly: m.ReadOnly,
Version: Version(m.Version),
}
rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement))
rp, e := NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {
return vi, e
}
vi.ReplicaPlacement = rp
vi.Ttl = LoadTTLFromUint32(*m.Ttl)
vi.Ttl = LoadTTLFromUint32(m.Ttl)
return vi, nil
}

9
weed/storage/volume_loading.go

@ -12,11 +12,11 @@ func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, need
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{}
v.needleMapKind = needleMapKind
e = v.load(false, false, needleMapKind)
e = v.load(false, false, needleMapKind, 0)
return
}
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error {
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) error {
var e error
fileName := v.FileName()
@ -34,7 +34,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
}
} else {
if createDatIfMissing {
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
v.dataFile, e = createVolumeFile(fileName+".dat", preallocate)
} else {
return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
}
@ -64,7 +64,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
}
}
if e = CheckVolumeDataIntegrity(v, indexFile); e != nil {
if v.dataFileSize, e = CheckVolumeDataIntegrity(v, indexFile); e != nil {
v.readOnly = true
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e)
}
@ -86,6 +86,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
}
}
}
return e
}

30
weed/storage/volume_read_write.go

@ -60,6 +60,8 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
if offset, err = v.dataFile.Seek(0, 2); err != nil {
glog.V(0).Infof("failed to seek the end of file: %v", err)
return
} else if offset != int64(v.dataFileSize) {
glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset)
}
//ensure file writing starting from aligned positions
if offset%NeedlePaddingSize != 0 {
@ -67,9 +69,12 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
if offset, err = v.dataFile.Seek(offset, 0); err != nil {
glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
return
} else if offset != int64(v.dataFileSize) {
glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset)
}
}
v.dataFile.Write(b)
_, err = v.dataFile.Write(b)
v.dataFileSize += int64(len(b))
return
}
@ -86,10 +91,12 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) {
glog.V(4).Infof("needle is unchanged!")
return
}
var offset int64
var offset, actualSize int64
if offset, err = v.dataFile.Seek(0, 2); err != nil {
glog.V(0).Infof("failed to seek the end of file: %v", err)
return
} else if offset != int64(v.dataFileSize) {
glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset)
}
//ensure file writing starting from aligned positions
@ -101,12 +108,14 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) {
}
}
if size, err = n.Append(v.dataFile, v.Version()); err != nil {
if size, actualSize, err = n.Append(v.dataFile, v.Version()); err != nil {
if e := v.dataFile.Truncate(offset); e != nil {
err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
}
return
}
v.dataFileSize += actualSize
nv, ok := v.nm.Get(n.Id)
if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
@ -128,16 +137,20 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) {
defer v.dataFileAccessLock.Unlock()
nv, ok := v.nm.Get(n.Id)
//fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
if ok {
if ok && nv.Size != TombstoneFileSize {
size := nv.Size
if err := v.nm.Delete(n.Id); err != nil {
// println("adding tombstone", n.Id, "at offset", v.dataFileSize)
if err := v.nm.Delete(n.Id, uint32(v.dataFileSize/NeedlePaddingSize)); err != nil {
return size, err
}
if _, err := v.dataFile.Seek(0, 2); err != nil {
if offset, err := v.dataFile.Seek(0, 2); err != nil {
return size, err
} else if offset != int64(v.dataFileSize) {
glog.V(0).Infof("dataFileSize %d != actual data file size: %d, deleteMarker: %d", v.dataFileSize, offset, getActualSize(0))
}
n.Data = nil
_, err := n.Append(v.dataFile, v.Version())
_, actualSize, err := n.Append(v.dataFile, v.Version())
v.dataFileSize += actualSize
return size, err
}
return 0, nil
@ -149,6 +162,9 @@ func (v *Volume) readNeedle(n *Needle) (int, error) {
if !ok || nv.Offset == 0 {
return -1, errors.New("Not Found")
}
if nv.Size == TombstoneFileSize {
return -1, errors.New("Already Deleted")
}
err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
if err != nil {
return 0, err

1
weed/storage/volume_super_block.go

@ -56,6 +56,7 @@ func (v *Volume) maybeWriteSuperBlock() error {
}
}
}
v.dataFileSize = SuperBlockSize
}
return e
}

2
weed/storage/volume_sync.go

@ -148,7 +148,7 @@ func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m CompactMap, la
total := 0
err = operation.GetVolumeIdxEntries(volumeServer, vid.String(), func(key uint64, offset, size uint32) {
// println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
if offset != 0 && size != 0 {
if offset > 0 && size != TombstoneFileSize {
m.Set(Key(key), offset, size)
} else {
m.Delete(Key(key))

18
weed/storage/volume_vacuum.go

@ -24,7 +24,7 @@ func (v *Volume) Compact() error {
v.lastCompactIndexOffset = v.nm.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactRevision
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", v.dataFileSize)
}
func (v *Volume) Compact2() error {
@ -35,7 +35,7 @@ func (v *Volume) Compact2() error {
}
func (v *Volume) commitCompact() error {
glog.V(3).Infof("Committing vacuuming...")
glog.V(0).Infof("Committing vacuuming...")
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
glog.V(3).Infof("Got Committing lock...")
@ -66,7 +66,7 @@ func (v *Volume) commitCompact() error {
//glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second)
glog.V(3).Infof("Loading Commit file...")
if e = v.load(true, false, v.needleMapKind); e != nil {
if e = v.load(true, false, v.needleMapKind, 0); e != nil {
return e
}
return nil
@ -189,7 +189,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
fakeDelNeedle := new(Needle)
fakeDelNeedle.Id = key
fakeDelNeedle.Cookie = 0x12345678
_, err = fakeDelNeedle.Append(dst, v.Version())
_, _, err = fakeDelNeedle.Append(dst, v.Version())
if err != nil {
return
}
@ -207,11 +207,11 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return nil
}
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64) (err error) {
var (
dst, idx *os.File
)
if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
if dst, err = createVolumeFile(dstName, preallocate); err != nil {
return
}
defer dst.Close()
@ -241,7 +241,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
if _, err = n.Append(dst, v.Version()); err != nil {
if _, _, err := n.Append(dst, v.Version()); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
new_offset += n.DiskSize()
@ -280,7 +280,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
new_offset := int64(SuperBlockSize)
WalkIndexFile(oldIndexFile, func(key uint64, offset, size uint32) error {
if size <= 0 {
if offset == 0 || size == TombstoneFileSize {
return nil
}
@ -302,7 +302,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
if _, err = n.Append(dst, v.Version()); err != nil {
if _, _, err = n.Append(dst, v.Version()); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
new_offset += n.DiskSize()

1
weed/topology/allocate_volume.go

@ -20,6 +20,7 @@ func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption
values.Add("collection", option.Collection)
values.Add("replication", option.ReplicaPlacement.String())
values.Add("ttl", option.Ttl.String())
values.Add("preallocate", fmt.Sprintf("%d", option.Prealloacte))
jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
if err != nil {
return err

3
weed/topology/data_node.go

@ -15,7 +15,6 @@ type DataNode struct {
Port int
PublicUrl string
LastSeen int64 // unix time in seconds
Dead bool
}
func NewDataNode(id string) *DataNode {
@ -30,7 +29,7 @@ func NewDataNode(id string) *DataNode {
func (dn *DataNode) String() string {
dn.RLock()
defer dn.RUnlock()
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl)
}
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {

8
weed/topology/node.go

@ -234,7 +234,7 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
glog.V(0).Infoln(n, "removes", node, "volumeCount =", n.activeVolumeCount)
glog.V(0).Infoln(n, "removes", node.Id())
}
}
@ -242,12 +242,6 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
if n.IsRack() {
for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode
if dn.LastSeen < freshThreshHold {
if !dn.Dead {
dn.Dead = true
n.GetTopology().chanDeadDataNodes <- dn
}
}
for _, v := range dn.GetVolumes() {
if uint64(v.Size) >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)

5
weed/topology/rack.go

@ -32,11 +32,6 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
dn.LastSeen = time.Now().Unix()
if dn.Dead {
dn.Dead = false
r.GetTopology().chanRecoveredDataNodes <- dn
dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
}
return dn
}
}

15
weed/topology/store_replicate.go

@ -2,14 +2,14 @@ package topology
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"net/url"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
@ -55,9 +55,18 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
q.Set("cm", "true")
}
u.RawQuery = q.Encode()
pairMap := make(map[string]string)
if needle.HasPairs() {
err := json.Unmarshal(needle.Pairs, &pairMap)
if err != nil {
glog.V(0).Infoln("Unmarshal pairs error:", err)
}
}
_, err := operation.Upload(u.String(),
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
jwt)
pairMap, jwt)
return err
}); err != nil {
ret = 0

38
weed/topology/topology.go

@ -7,7 +7,6 @@ import (
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
@ -24,11 +23,9 @@ type Topology struct {
Sequence sequence.Sequencer
chanDeadDataNodes chan *DataNode
chanRecoveredDataNodes chan *DataNode
chanFullVolumes chan storage.VolumeInfo
configuration *Configuration
Configuration *Configuration
RaftServer raft.Server
}
@ -45,8 +42,6 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.Sequence = seq
t.chanDeadDataNodes = make(chan *DataNode)
t.chanRecoveredDataNodes = make(chan *DataNode)
t.chanFullVolumes = make(chan storage.VolumeInfo)
err := t.loadConfiguration(confFile)
@ -80,7 +75,7 @@ func (t *Topology) Leader() (string, error) {
func (t *Topology) loadConfiguration(configurationFile string) error {
b, e := ioutil.ReadFile(configurationFile)
if e == nil {
t.configuration, e = NewConfiguration(b)
t.Configuration, e = NewConfiguration(b)
return e
}
glog.V(0).Infoln("Using default configurations.")
@ -147,35 +142,6 @@ func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
}
func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
t.Sequence.SetMax(*joinMessage.MaxFileKey)
dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port))
if *joinMessage.IsInit && dn != nil {
t.UnRegisterDataNode(dn)
}
dn = rack.GetOrCreateDataNode(*joinMessage.Ip,
int(*joinMessage.Port), *joinMessage.PublicUrl,
int(*joinMessage.MaxVolumeCount))
var volumeInfos []storage.VolumeInfo
for _, v := range joinMessage.Volumes {
if vi, err := storage.NewVolumeInfo(v); err == nil {
volumeInfos = append(volumeInfos, vi)
} else {
glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
}
}
deletedVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
}
for _, v := range deletedVolumes {
t.UnRegisterVolumeLayout(v, dn)
}
}
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
for _, c := range t.Children() {
dc := c.(*DataCenter)

16
weed/topology/topology_event_handling.go

@ -31,12 +31,6 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
select {
case v := <-t.chanFullVolumes:
t.SetVolumeCapacityFull(v)
case dn := <-t.chanRecoveredDataNodes:
t.RegisterRecoveredDataNode(dn)
glog.V(0).Infoln("Recovered DataNode: %v", dn)
case dn := <-t.chanDeadDataNodes:
t.UnRegisterDataNode(dn)
glog.V(0).Infof("Dead DataNode: %v", dn)
}
}
}()
@ -55,7 +49,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
}
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.GetVolumes() {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn.Id())
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
vl.SetVolumeUnavailable(dn, v.Id)
}
@ -64,11 +58,3 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
dn.Parent().UnlinkChildNode(dn.Id())
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.GetVolumes() {
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id)
}
}
}

1
weed/topology/volume_growth.go

@ -21,6 +21,7 @@ type VolumeGrowOption struct {
Collection string
ReplicaPlacement *storage.ReplicaPlacement
Ttl *storage.TTL
Prealloacte int64
DataCenter string
Rack string
DataNode string

2
weed/util/constants.go

@ -1,5 +1,5 @@
package util
const (
VERSION = "0.71 beta"
VERSION = "0.72"
)

5
weed/util/http_util.go

@ -148,8 +148,9 @@ func DownloadUrl(fileUrl string) (filename string, rc io.ReadCloser, e error) {
}
contentDisposition := response.Header["Content-Disposition"]
if len(contentDisposition) > 0 {
if strings.HasPrefix(contentDisposition[0], "filename=") {
filename = contentDisposition[0][len("filename="):]
idx := strings.Index(contentDisposition[0], "filename=")
if idx != -1 {
filename = contentDisposition[0][idx+len("filename="):]
filename = strings.Trim(filename, "\"")
}
}

4
weed/util/net_timeout.go

@ -38,10 +38,12 @@ type Conn struct {
}
func (c *Conn) Read(b []byte) (count int, e error) {
if c.ReadTimeout != 0 {
err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout))
if err != nil {
return 0, err
}
}
count, e = c.Conn.Read(b)
if e == nil {
stats.BytesIn(int64(count))
@ -50,10 +52,12 @@ func (c *Conn) Read(b []byte) (count int, e error) {
}
func (c *Conn) Write(b []byte) (count int, e error) {
if c.WriteTimeout != 0 {
err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
if err != nil {
return 0, err
}
}
count, e = c.Conn.Write(b)
if e == nil {
stats.BytesOut(int64(count))

Loading…
Cancel
Save