diff --git a/docker/compose/local-clusters-compose.yml b/docker/compose/local-clusters-compose.yml
index e813ca35f..7bd16aa3f 100644
--- a/docker/compose/local-clusters-compose.yml
+++ b/docker/compose/local-clusters-compose.yml
@@ -20,4 +20,5 @@ services:
- 18085:18080
- 8889:8888
- 18889:18888
- command: "server -ip=server2 -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1"
+ - 8334:8333
+ command: "server -ip=server2 -filer -s3 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1"
diff --git a/k8s/README.md b/k8s/README.md
index 36230f7b2..c5615522c 100644
--- a/k8s/README.md
+++ b/k8s/README.md
@@ -29,6 +29,14 @@ please set/update the corresponding affinity rule in values.yaml to an empty one
```affinity: ""```
+### PVC - storage class ###
+
+on the volume stateful set added support for K8S PVC, currently example
+with the simple local-path-provisioner from Rancher (comes included with k3d / k3s)
+https://github.com/rancher/local-path-provisioner
+
+you can use ANY storage class you like, just update the correct storage-class
+for your deployment.
### current instances config (AIO):
1 instance for each type (master/filer+s3/volume)
diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml
index a16bb0133..424307ff6 100644
--- a/k8s/seaweedfs/Chart.yaml
+++ b/k8s/seaweedfs/Chart.yaml
@@ -1,5 +1,5 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
-appVersion: "2.26"
-version: 2.26
+appVersion: "2.28"
+version: 2.28
diff --git a/k8s/seaweedfs/templates/_helpers.tpl b/k8s/seaweedfs/templates/_helpers.tpl
index f6c4fa570..a9ee89f03 100644
--- a/k8s/seaweedfs/templates/_helpers.tpl
+++ b/k8s/seaweedfs/templates/_helpers.tpl
@@ -126,3 +126,26 @@ Inject extra environment vars in the format key:value, if populated
{{- printf "%s%s%s:%s" $registryName $repositoryName $name $tag -}}
{{- end -}}
{{- end -}}
+
+
+{{/* check if any PVC exists */}}
+{{- define "volume.pvc_exists" -}}
+{{- if or (or (eq .Values.volume.data.type "persistentVolumeClaim") (and (eq .Values.volume.idx.type "persistentVolumeClaim") .Values.volume.dir_idx )) (eq .Values.volume.logs.type "persistentVolumeClaim") -}}
+{{- printf "true" -}}
+{{- else -}}
+{{- printf "false" -}}
+{{- end -}}
+{{- end -}}
+
+{{/* check if any HostPath exists */}}
+{{- define "volume.hostpath_exists" -}}
+{{- if or (or (eq .Values.volume.data.type "hostPath") (and (eq .Values.volume.idx.type "hostPath") .Values.volume.dir_idx )) (eq .Values.volume.logs.type "hostPath") -}}
+{{- printf "true" -}}
+{{- else -}}
+{{- if or .Values.global.enableSecurity .Values.volume.extraVolumes -}}
+{{- printf "true" -}}
+{{- else -}}
+{{- printf "false" -}}
+{{- end -}}
+{{- end -}}
+{{- end -}}
diff --git a/k8s/seaweedfs/templates/cronjob.yaml b/k8s/seaweedfs/templates/cronjob.yaml
index c7dcd52b1..4caf4bad1 100644
--- a/k8s/seaweedfs/templates/cronjob.yaml
+++ b/k8s/seaweedfs/templates/cronjob.yaml
@@ -40,7 +40,7 @@ spec:
{{ if .Values.volume.dataCenter }} -dataCenter {{ .Values.volume.dataCenter }}{{ end }}\
{{ if .Values.cronjob.collection }} -collection {{ .Values.cronjob.collection }}{{ end }}\n\
{{- if .Values.cronjob.enableFixReplication }}
- volume.fix.replication {{ if .Values.cronjob.collectionPattern }} -collectionPattern={{ .Values.cronjob.collectionPattern }} {{ end }} \n\
+ volume.fix.replication -collectionPattern={{ .Values.cronjob.collectionPattern }} \n\
{{- end }}
unlock\n" | \
/usr/bin/weed shell \
diff --git a/k8s/seaweedfs/templates/filer-service-client.yaml b/k8s/seaweedfs/templates/filer-service-client.yaml
index f509086e3..929b6f8bc 100644
--- a/k8s/seaweedfs/templates/filer-service-client.yaml
+++ b/k8s/seaweedfs/templates/filer-service-client.yaml
@@ -10,6 +10,7 @@ metadata:
monitoring: "true"
{{- end }}
spec:
+ clusterIP: None
ports:
- name: "swfs-filer"
port: {{ .Values.filer.port }}
diff --git a/k8s/seaweedfs/templates/volume-statefulset.yaml b/k8s/seaweedfs/templates/volume-statefulset.yaml
index f9e55e0d3..652fd9ea3 100644
--- a/k8s/seaweedfs/templates/volume-statefulset.yaml
+++ b/k8s/seaweedfs/templates/volume-statefulset.yaml
@@ -45,6 +45,19 @@ spec:
priorityClassName: {{ .Values.volume.priorityClassName | quote }}
{{- end }}
enableServiceLinks: false
+ {{- if .Values.volume.dir_idx }}
+ initContainers:
+ - name: seaweedfs-vol-move-idx
+ image: {{ template "volume.image" . }}
+ imagePullPolicy: {{ .Values.global.pullPolicy | default "IfNotPresent" }}
+ command: [ '/bin/sh', '-c' ]
+ args: ['if ls {{ .Values.volume.dir }}/*.idx >/dev/null 2>&1; then mv {{ .Values.volume.dir }}/*.idx {{ .Values.volume.dir_idx }}/; fi;']
+ volumeMounts:
+ - name: idx
+ mountPath: {{ .Values.volume.dir_idx }}
+ - name: data
+ mountPath: {{ .Values.volume.dir }}
+ {{- end }}
containers:
- name: seaweedfs
image: {{ template "volume.image" . }}
@@ -118,9 +131,13 @@ spec:
-compactionMBps={{ .Values.volume.compactionMBps }} \
-mserver={{ range $index := until (.Values.master.replicas | int) }}${SEAWEEDFS_FULLNAME}-master-{{ $index }}.${SEAWEEDFS_FULLNAME}-master:{{ $.Values.master.port }}{{ if lt $index (sub ($.Values.master.replicas | int) 1) }},{{ end }}{{ end }}
volumeMounts:
- - name: seaweedfs-volume-storage
- mountPath: "/data/"
- - name: seaweedfs-volume-log-volume
+ - name: data
+ mountPath: "{{ .Values.volume.dir }}/"
+ {{- if .Values.volume.dir_idx }}
+ - name: idx
+ mountPath: "{{ .Values.volume.dir_idx }}/"
+ {{- end }}
+ - name: logs
mountPath: "/logs/"
{{- if .Values.global.enableSecurity }}
- name: security-config
@@ -173,15 +190,27 @@ spec:
resources:
{{ tpl .Values.volume.resources . | nindent 12 | trim }}
{{- end }}
+ {{- $hostpath_exists := include "volume.hostpath_exists" . -}}
+ {{- if $hostpath_exists }}
volumes:
- - name: seaweedfs-volume-log-volume
+ {{- if eq .Values.volume.data.type "hostPath" }}
+ - name: data
hostPath:
- path: /storage/logs/seaweedfs/volume
+ path: /storage/object_store/
type: DirectoryOrCreate
- - name: seaweedfs-volume-storage
+ {{- end }}
+ {{- if and (eq .Values.volume.idx.type "hostPath") .Values.volume.dir_idx }}
+ - name: idx
hostPath:
- path: /storage/object_store/
+ path: /ssd/seaweedfs-volume-idx/
type: DirectoryOrCreate
+ {{- end }}
+ {{- if eq .Values.volume.logs.type "hostPath" }}
+ - name: logs
+ hostPath:
+ path: /storage/logs/seaweedfs/volume
+ type: DirectoryOrCreate
+ {{- end }}
{{- if .Values.global.enableSecurity }}
- name: security-config
configMap:
@@ -205,8 +234,43 @@ spec:
{{- if .Values.volume.extraVolumes }}
{{ tpl .Values.volume.extraVolumes . | indent 8 | trim }}
{{- end }}
+ {{- end }}
{{- if .Values.volume.nodeSelector }}
nodeSelector:
{{ tpl .Values.volume.nodeSelector . | indent 8 | trim }}
{{- end }}
+ {{- $pvc_exists := include "volume.pvc_exists" . -}}
+ {{- if $pvc_exists }}
+ volumeClaimTemplates:
+ {{- if eq .Values.volume.data.type "persistentVolumeClaim"}}
+ - metadata:
+ name: data
+ spec:
+ accessModes: [ "ReadWriteOnce" ]
+ storageClassName: {{ .Values.volume.data.storageClass }}
+ resources:
+ requests:
+ storage: {{ .Values.volume.data.size }}
+ {{- end }}
+ {{- if and (eq .Values.volume.idx.type "persistentVolumeClaim") .Values.volume.dir_idx }}
+ - metadata:
+ name: idx
+ spec:
+ accessModes: [ "ReadWriteOnce" ]
+ storageClassName: {{ .Values.volume.idx.storageClass }}
+ resources:
+ requests:
+ storage: {{ .Values.volume.idx.size }}
+ {{- end }}
+ {{- if eq .Values.volume.logs.type "persistentVolumeClaim" }}
+ - metadata:
+ name: logs
+ spec:
+ accessModes: [ "ReadWriteOnce" ]
+ storageClassName: {{ .Values.volume.logs.storageClass }}
+ resources:
+ requests:
+ storage: {{ .Values.volume.logs.size }}
+ {{- end }}
+ {{- end }}
{{- end }}
diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml
index 6dc182b29..8768b32f6 100644
--- a/k8s/seaweedfs/values.yaml
+++ b/k8s/seaweedfs/values.yaml
@@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
- # imageTag: "2.26" - started using {.Chart.appVersion}
+ # imageTag: "2.28" - started using {.Chart.appVersion}
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always
@@ -138,6 +138,24 @@ volume:
# minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly
minFreeSpacePercent: 7
+# can use ANY storage-class , example with local-path-provisner
+# data:
+# type: "persistentVolumeClaim"
+# size: "24Ti"
+# storageClass: "local-path-provisioner"
+ data:
+ type: "hostPath"
+ size: ""
+ storageClass: ""
+ idx:
+ type: "hostPath"
+ size: ""
+ storageClass: ""
+
+ logs:
+ type: "hostPath"
+ size: ""
+ storageClass: ""
# limit background compaction or copying speed in mega bytes per second
compactionMBps: "50"
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 056904ebe..a7da16a93 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -5,7 +5,7 @@
com.github.chrislusf
seaweedfs-client
- 1.6.1
+ 1.6.2
org.sonatype.oss
diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy
index 69b900017..b45c193ec 100644
--- a/other/java/client/pom.xml.deploy
+++ b/other/java/client/pom.xml.deploy
@@ -5,7 +5,7 @@
com.github.chrislusf
seaweedfs-client
- 1.6.1
+ 1.6.2
org.sonatype.oss
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
index 1447401b7..217f708e9 100644
--- a/other/java/client/pom_debug.xml
+++ b/other/java/client/pom_debug.xml
@@ -5,7 +5,7 @@
com.github.chrislusf
seaweedfs-client
- 1.6.1
+ 1.6.2
org.sonatype.oss
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index 58269d41f..c2ffe0ac6 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -4,8 +4,7 @@ import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
@@ -94,9 +93,9 @@ public class FilerClient extends FilerGrpcClient {
if ("/".equals(path)) {
return true;
}
- Path pathObject = Paths.get(path);
- String parent = pathObject.getParent().toString();
- String name = pathObject.getFileName().toString();
+ File pathFile = new File(path);
+ String parent = pathFile.getParent();
+ String name = pathFile.getName();
mkdirs(parent, mode, uid, gid, userName, groupNames);
@@ -115,13 +114,13 @@ public class FilerClient extends FilerGrpcClient {
public boolean mv(String oldPath, String newPath) {
- Path oldPathObject = Paths.get(oldPath);
- String oldParent = oldPathObject.getParent().toString();
- String oldName = oldPathObject.getFileName().toString();
+ File oldPathFile = new File(oldPath);
+ String oldParent = oldPathFile.getParent();
+ String oldName = oldPathFile.getName();
- Path newPathObject = Paths.get(newPath);
- String newParent = newPathObject.getParent().toString();
- String newName = newPathObject.getFileName().toString();
+ File newPathFile = new File(newPath);
+ String newParent = newPathFile.getParent();
+ String newName = newPathFile.getName();
return atomicRenameEntry(oldParent, oldName, newParent, newName);
@@ -129,9 +128,9 @@ public class FilerClient extends FilerGrpcClient {
public boolean rm(String path, boolean isRecursive, boolean ignoreRecusiveError) {
- Path pathObject = Paths.get(path);
- String parent = pathObject.getParent().toString();
- String name = pathObject.getFileName().toString();
+ File pathFile = new File(path);
+ String parent = pathFile.getParent();
+ String name = pathFile.getName();
return deleteEntry(
parent,
@@ -148,9 +147,9 @@ public class FilerClient extends FilerGrpcClient {
public boolean touch(String path, int mode, int uid, int gid, String userName, String[] groupNames) {
- Path pathObject = Paths.get(path);
- String parent = pathObject.getParent().toString();
- String name = pathObject.getFileName().toString();
+ File pathFile = new File(path);
+ String parent = pathFile.getParent();
+ String name = pathFile.getName();
FilerProto.Entry entry = lookupEntry(parent, name);
if (entry == null) {
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
index b73e99e69..ba298a713 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
@@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
+import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.concurrent.*;
@@ -217,7 +218,7 @@ public class SeaweedOutputStream extends OutputStream {
private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException {
- bufferToWrite.flip();
+ ((Buffer)bufferToWrite).flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
diff --git a/other/java/examples/pom.xml b/other/java/examples/pom.xml
index 2456113d0..d0f22a44d 100644
--- a/other/java/examples/pom.xml
+++ b/other/java/examples/pom.xml
@@ -11,13 +11,13 @@
com.github.chrislusf
seaweedfs-client
- 1.6.1
+ 1.6.2
compile
com.github.chrislusf
seaweedfs-hadoop2-client
- 1.6.1
+ 1.6.2
compile
diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml
index 0680d86bb..64fb4da75 100644
--- a/other/java/hdfs2/dependency-reduced-pom.xml
+++ b/other/java/hdfs2/dependency-reduced-pom.xml
@@ -301,7 +301,7 @@
- 1.6.1
+ 1.6.2
2.9.2
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml
index 897477066..3bc9709a7 100644
--- a/other/java/hdfs2/pom.xml
+++ b/other/java/hdfs2/pom.xml
@@ -5,7 +5,7 @@
4.0.0
- 1.6.1
+ 1.6.2
2.9.2
diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml
index 2b4a1a494..ea7c08c17 100644
--- a/other/java/hdfs3/dependency-reduced-pom.xml
+++ b/other/java/hdfs3/dependency-reduced-pom.xml
@@ -309,7 +309,7 @@
- 1.6.1
+ 1.6.2
3.1.1
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml
index 49ff8f926..3e0f373f7 100644
--- a/other/java/hdfs3/pom.xml
+++ b/other/java/hdfs3/pom.xml
@@ -5,7 +5,7 @@
4.0.0
- 1.6.1
+ 1.6.2
3.1.1
diff --git a/weed/Makefile b/weed/Makefile
index fd0843c22..8f1257d09 100644
--- a/weed/Makefile
+++ b/weed/Makefile
@@ -33,3 +33,7 @@ debug_webdav:
debug_s3:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 s3
+
+debug_filer_copy:
+ go build -gcflags="all=-N -l"
+ dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index e1b6d8d6c..c1bc80c42 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -63,7 +63,7 @@ func init() {
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
- b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
+ b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
diff --git a/weed/command/command.go b/weed/command/command.go
index bbc2e0423..5506e6969 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -15,6 +15,7 @@ var Commands = []*Command{
cmdDownload,
cmdExport,
cmdFiler,
+ cmdFilerBackup,
cmdFilerCat,
cmdFilerMetaTail,
cmdFilerReplicate,
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 6660bd694..534bd9e04 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -84,7 +84,7 @@ func init() {
filerWebDavOptions.port = cmdFiler.Flag.Int("webdav.port", 7333, "webdav server http listen port")
filerWebDavOptions.collection = cmdFiler.Flag.String("webdav.collection", "", "collection to create the files")
filerWebDavOptions.replication = cmdFiler.Flag.String("webdav.replication", "", "replication to create the files")
- filerWebDavOptions.disk = cmdFiler.Flag.String("webdav.disk", "", "[hdd|ssd] hard drive or solid state drive")
+ filerWebDavOptions.disk = cmdFiler.Flag.String("webdav.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag")
filerWebDavOptions.tlsPrivateKey = cmdFiler.Flag.String("webdav.key.file", "", "path to the TLS private key file")
filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file")
filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
new file mode 100644
index 000000000..8cb7441f6
--- /dev/null
+++ b/weed/command/filer_backup.go
@@ -0,0 +1,158 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
+ "io"
+ "time"
+)
+
+type FilerBackupOptions struct {
+ isActivePassive *bool
+ filer *string
+ path *string
+ debug *bool
+ proxyByFiler *bool
+ timeAgo *time.Duration
+}
+
+var (
+ filerBackupOptions FilerBackupOptions
+)
+
+func init() {
+ cmdFilerBackup.Run = runFilerBackup // break init cycle
+ filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
+ filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
+ filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
+ filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
+ filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+}
+
+var cmdFilerBackup = &Command{
+ UsageLine: "filer.backup -filer=: ",
+ Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml",
+ Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml
+
+ filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content,
+ and write to the destination. This is to replace filer.replicate command since additional message queue is not needed.
+
+ If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute.
+ A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value.
+
+`,
+}
+
+func runFilerBackup(cmd *Command, args []string) bool {
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ util.LoadConfiguration("security", false)
+ util.LoadConfiguration("replication", true)
+
+ for {
+ err := doFilerBackup(grpcDialOption, &filerBackupOptions)
+ if err != nil {
+ glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
+ time.Sleep(1747 * time.Millisecond)
+ }
+ }
+
+ return true
+}
+
+const (
+ BackupKeyPrefix = "backup."
+)
+
+func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error {
+
+ // find data sink
+ config := util.GetViper()
+ dataSink := findSink(config)
+ if dataSink == nil {
+ return fmt.Errorf("no data sink configured in replication.toml")
+ }
+
+ sourceFiler := *backupOption.filer
+ sourcePath := *backupOption.path
+ timeAgo := *backupOption.timeAgo
+ targetPath := dataSink.GetSinkToDirectory()
+ debug := *backupOption.debug
+
+ // get start time for the data sink
+ startFrom := time.Unix(0, 0)
+ sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory())
+ if timeAgo.Milliseconds() == 0 {
+ lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId))
+ if err != nil {
+ glog.V(0).Infof("starting from %v", startFrom)
+ } else {
+ startFrom = time.Unix(0, lastOffsetTsNs)
+ glog.V(0).Infof("resuming from %v", startFrom)
+ }
+ } else {
+ startFrom = time.Now().Add(-timeAgo)
+ glog.V(0).Infof("start time is set to %v", startFrom)
+ }
+
+ // create filer sink
+ filerSource := &source.FilerSource{}
+ filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, *backupOption.proxyByFiler)
+ dataSink.SetSourceFiler(filerSource)
+
+ processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
+
+ return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "backup_" + dataSink.GetName(),
+ PathPrefix: sourcePath,
+ SinceNs: startFrom.UnixNano(),
+ })
+ if err != nil {
+ return fmt.Errorf("listen: %v", err)
+ }
+
+ var counter int64
+ var lastWriteTime time.Time
+ for {
+ resp, listenErr := stream.Recv()
+
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+
+ if err := processEventFn(resp); err != nil {
+ return fmt.Errorf("processEventFn: %v", err)
+ }
+
+ counter++
+ if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
+ glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
+ counter = 0
+ lastWriteTime = time.Now()
+ if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
+ return fmt.Errorf("setOffset: %v", err)
+ }
+ }
+
+ }
+
+ })
+
+}
+
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index bf64e72b3..42f5ec4c3 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -55,7 +55,7 @@ func init() {
copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
- copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
+ copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag")
copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index f055b19a8..189cacefc 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -24,8 +24,8 @@ func init() {
var cmdFilerMetaTail = &Command{
UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]",
- Short: "see recent changes on a filer",
- Long: `See recent changes on a filer.
+ Short: "see continuous changes on a filer",
+ Long: `See continuous changes on a filer.
weed filer.meta.tail -timeAgo=30h | grep truncate
weed filer.meta.tail -timeAgo=30h | jq .
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index e8c06b208..885c95540 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -74,18 +74,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
}
- var dataSink sink.ReplicationSink
- for _, sk := range sink.Sinks {
- if config.GetBool("sink." + sk.GetName() + ".enabled") {
- if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
- glog.Fatalf("Failed to initialize sink for %s: %+v",
- sk.GetName(), err)
- }
- glog.V(0).Infof("Configure sink to %s", sk.GetName())
- dataSink = sk
- break
- }
- }
+ dataSink := findSink(config)
if dataSink == nil {
println("no data sink configured in replication.toml:")
@@ -135,6 +124,22 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
+func findSink(config *util.ViperProxy) sink.ReplicationSink {
+ var dataSink sink.ReplicationSink
+ for _, sk := range sink.Sinks {
+ if config.GetBool("sink." + sk.GetName() + ".enabled") {
+ if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
+ glog.Fatalf("Failed to initialize sink for %s: %+v",
+ sk.GetName(), err)
+ }
+ glog.V(0).Infof("Configure sink to %s", sk.GetName())
+ dataSink = sk
+ break
+ }
+ }
+ return dataSink
+}
+
func validateOneEnabledInput(config *util.ViperProxy) {
enabledInput := ""
for _, input := range sub.NotificationInputs {
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 725f7d485..0f34e5701 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication"
+ "github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -58,8 +59,8 @@ func init() {
syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
- syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd] hard drive or solid state drive on filer A")
- syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd] hard drive or solid state drive on filer B")
+ syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag on filer A")
+ syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag on filer B")
syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
@@ -137,7 +138,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
// if first time, start from now
// if has previously synced, resume from that point of time
- sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature)
+ sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature)
if err != nil {
return err
}
@@ -151,93 +152,17 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource)
+ persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
+
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
-
- var sourceOldKey, sourceNewKey util.FullPath
- if message.OldEntry != nil {
- sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
- }
- if message.NewEntry != nil {
- sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
- }
-
for _, sig := range message.Signatures {
if sig == targetFilerSignature && targetFilerSignature != 0 {
fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
return nil
}
}
- if debug {
- fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature)
- }
-
- if !strings.HasPrefix(resp.Directory, sourcePath) {
- return nil
- }
-
- // handle deletions
- if message.OldEntry != nil && message.NewEntry == nil {
- if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
- return nil
- }
- key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
- return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
- }
-
- // handle new entries
- if message.OldEntry == nil && message.NewEntry != nil {
- if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
- return nil
- }
- key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
- return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
- }
-
- // this is something special?
- if message.OldEntry == nil && message.NewEntry == nil {
- return nil
- }
-
- // handle updates
- if strings.HasPrefix(string(sourceOldKey), sourcePath) {
- // old key is in the watched directory
- if strings.HasPrefix(string(sourceNewKey), sourcePath) {
- // new key is also in the watched directory
- oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
- message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
- foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
- if foundExisting {
- return err
- }
-
- // not able to find old entry
- if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
- return fmt.Errorf("delete old entry %v: %v", oldKey, err)
- }
-
- // create the new entry
- newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
- return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
-
- } else {
- // new key is outside of the watched directory
- key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
- return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
- }
- } else {
- // old key is outside of the watched directory
- if strings.HasPrefix(string(sourceNewKey), sourcePath) {
- // new key is in the watched directory
- key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
- return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
- } else {
- // new key is also outside of the watched directory
- // skip
- }
- }
-
- return nil
+ return persistEventFn(resp)
}
return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@@ -275,7 +200,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
counter = 0
lastWriteTime = time.Now()
- if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil {
+ if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil {
return err
}
}
@@ -290,11 +215,11 @@ const (
SyncKeyPrefix = "sync."
)
-func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) {
+func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- syncKey := []byte(SyncKeyPrefix + "____")
- util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
+ syncKey := []byte(signaturePrefix + "____")
+ util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
if err != nil {
@@ -317,11 +242,11 @@ func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature
}
-func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error {
+func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error {
return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- syncKey := []byte(SyncKeyPrefix + "____")
- util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
+ syncKey := []byte(signaturePrefix + "____")
+ util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
valueBuf := make([]byte, 8)
util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
@@ -343,3 +268,107 @@ func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignatur
})
}
+
+func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
+ // process function
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+
+ var sourceOldKey, sourceNewKey util.FullPath
+ if message.OldEntry != nil {
+ sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
+ }
+ if message.NewEntry != nil {
+ sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
+ }
+
+ if debug {
+ glog.V(0).Infof("received %v", resp)
+ }
+
+ if !strings.HasPrefix(resp.Directory, sourcePath) {
+ return nil
+ }
+
+ // handle deletions
+ if message.OldEntry != nil && message.NewEntry == nil {
+ if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
+ return nil
+ }
+ key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
+ return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ }
+
+ // handle new entries
+ if message.OldEntry == nil && message.NewEntry != nil {
+ if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ return nil
+ }
+ key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
+ return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
+ }
+
+ // this is something special?
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+
+ // handle updates
+ if strings.HasPrefix(string(sourceOldKey), sourcePath) {
+ // old key is in the watched directory
+ if strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ // new key is also in the watched directory
+ if !dataSink.IsIncremental() {
+ oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
+ message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
+ foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
+ if foundExisting {
+ return err
+ }
+
+ // not able to find old entry
+ if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
+ return fmt.Errorf("delete old entry %v: %v", oldKey, err)
+ }
+ }
+ // create the new entry
+ newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
+ return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
+
+ } else {
+ // new key is outside of the watched directory
+ if !dataSink.IsIncremental() {
+ key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
+ return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ }
+ }
+ } else {
+ // old key is outside of the watched directory
+ if strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ // new key is in the watched directory
+ key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
+ return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
+ } else {
+ // new key is also outside of the watched directory
+ // skip
+ }
+ }
+
+ return nil
+ }
+ return processEventFn
+}
+
+func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string {
+ if !dataSink.IsIncremental() {
+ return util.Join(targetPath, string(sourceKey)[len(sourcePath):])
+ }
+ var mTime int64
+ if message.NewEntry != nil {
+ mTime = message.NewEntry.Attributes.Mtime
+ } else if message.OldEntry != nil {
+ mTime = message.OldEntry.Attributes.Mtime
+ }
+ dateKey := time.Unix(mTime, 0).Format("2006-01-02")
+ return util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
+}
diff --git a/weed/command/mount.go b/weed/command/mount.go
index aa6d91740..ea439af7c 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -42,7 +42,7 @@ func init() {
mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
- mountOptions.diskType = cmdMount.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
+ mountOptions.diskType = cmdMount.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 128, "limit concurrent goroutine writers if not 0")
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 993391a42..c2d53e4bd 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -356,6 +356,9 @@ directory = "/buckets"
[sink.local]
enabled = false
directory = "/data"
+# all replicated files are under modified time as yyyy-mm-dd directories
+# so each date directory contains all new and updated files.
+is_incremental = false
[sink.local_incremental]
# all replicated files are under modified time as yyyy-mm-dd directories
@@ -373,6 +376,7 @@ directory = "/backup"
replication = ""
collection = ""
ttlSec = 0
+is_incremental = false
[sink.s3]
# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html
@@ -384,6 +388,7 @@ region = "us-east-2"
bucket = "your_bucket_name" # an existing bucket
directory = "/" # destination directory
endpoint = ""
+is_incremental = false
[sink.google_cloud_storage]
# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
@@ -391,6 +396,7 @@ enabled = false
google_application_credentials = "/path/to/x.json" # path to json credential file
bucket = "your_bucket_seaweedfs" # an existing bucket
directory = "/" # destination directory
+is_incremental = false
[sink.azure]
# experimental, let me know if it works
@@ -399,6 +405,7 @@ account_name = ""
account_key = ""
container = "mycontainer" # an existing container
directory = "/" # destination directory
+is_incremental = false
[sink.backblaze]
enabled = false
@@ -406,6 +413,7 @@ b2_account_id = ""
b2_master_application_key = ""
bucket = "mybucket" # an existing bucket
directory = "/" # destination directory
+is_incremental = false
`
diff --git a/weed/command/server.go b/weed/command/server.go
index d7c41b014..611578953 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -102,7 +102,7 @@ func init() {
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
- serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd] hard drive or solid state drive")
+ serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag")
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
@@ -122,7 +122,7 @@ func init() {
webdavOptions.port = cmdServer.Flag.Int("webdav.port", 7333, "webdav server http listen port")
webdavOptions.collection = cmdServer.Flag.String("webdav.collection", "", "collection to create the files")
webdavOptions.replication = cmdServer.Flag.String("webdav.replication", "", "replication to create the files")
- webdavOptions.disk = cmdServer.Flag.String("webdav.disk", "", "[hdd|ssd] hard drive or solid state drive")
+ webdavOptions.disk = cmdServer.Flag.String("webdav.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag")
webdavOptions.tlsPrivateKey = cmdServer.Flag.String("webdav.key.file", "", "path to the TLS private key file")
webdavOptions.tlsCertificate = cmdServer.Flag.String("webdav.cert.file", "", "path to the TLS certificate file")
webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 149d71241..67fde2185 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -41,7 +41,7 @@ func init() {
upload.replication = cmdUpload.Flag.String("replication", "", "replication type")
upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name")
upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name")
- upload.diskType = cmdUpload.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
+ upload.diskType = cmdUpload.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag")
upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit")
upload.usePublicUrl = cmdUpload.Flag.Bool("usePublicUrl", false, "upload to public url from volume server")
diff --git a/weed/command/volume.go b/weed/command/volume.go
index ff951afdc..659c93d96 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -78,7 +78,7 @@ func init() {
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
- v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
+ v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.")
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 3e4532d6e..2bd4a3c61 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -39,7 +39,7 @@ func init() {
webDavStandaloneOptions.port = cmdWebDav.Flag.Int("port", 7333, "webdav server http listen port")
webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files")
webDavStandaloneOptions.replication = cmdWebDav.Flag.String("replication", "", "replication to create the files")
- webDavStandaloneOptions.disk = cmdWebDav.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
+ webDavStandaloneOptions.disk = cmdWebDav.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag")
webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file")
webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file")
webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 075204b79..6a87a2b7d 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -181,7 +181,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
- shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 934075ff3..70428bb07 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -130,7 +130,8 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
// gzip if possible
// this could be double copying
clearDataLen = len(data)
- if shouldGzipNow {
+ clearData := data
+ if shouldGzipNow && !cipher {
compressed, compressErr := util.GzipData(data)
// fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed))
if compressErr == nil {
@@ -139,7 +140,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
}
} else if isInputCompressed {
// just to get the clear data length
- clearData, err := util.DecompressData(data)
+ clearData, err = util.DecompressData(data)
if err == nil {
clearDataLen = len(clearData)
}
@@ -150,7 +151,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
// encrypt
cipherKey := util.GenCipherKey()
- encryptedData, encryptionErr := util.Encrypt(data, cipherKey)
+ encryptedData, encryptionErr := util.Encrypt(clearData, cipherKey)
if encryptionErr != nil {
err = fmt.Errorf("encrypt input: %v", encryptionErr)
return
@@ -161,26 +162,26 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
_, err = w.Write(encryptedData)
return
}, "", false, len(encryptedData), "", nil, jwt)
- if uploadResult != nil {
- uploadResult.Name = filename
- uploadResult.Mime = mtype
- uploadResult.CipherKey = cipherKey
+ if uploadResult == nil {
+ return
}
+ uploadResult.Name = filename
+ uploadResult.Mime = mtype
+ uploadResult.CipherKey = cipherKey
+ uploadResult.Size = uint32(clearDataLen)
} else {
// upload data
uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
_, err = w.Write(data)
return
}, filename, contentIsGzipped, len(data), mtype, pairMap, jwt)
- }
-
- if uploadResult == nil {
- return
- }
-
- uploadResult.Size = uint32(clearDataLen)
- if contentIsGzipped {
- uploadResult.Gzip = 1
+ if uploadResult == nil {
+ return
+ }
+ uploadResult.Size = uint32(clearDataLen)
+ if contentIsGzipped {
+ uploadResult.Gzip = 1
+ }
}
return uploadResult, err
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_util.go
similarity index 85%
rename from weed/replication/repl_util/replication_utli.go
rename to weed/replication/repl_util/replication_util.go
index 3514c6977..f642bb801 100644
--- a/weed/replication/repl_util/replication_utli.go
+++ b/weed/replication/repl_util/replication_util.go
@@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var shouldRetry bool
for _, fileUrl := range fileUrls {
- shouldRetry, err = util.FastReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ shouldRetry, err = util.FastReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index 7688029e6..d7e609c68 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -42,7 +42,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
return nil
}
var dateKey string
- if r.sink.GetName() == "local_incremental" {
+ if r.sink.IsIncremental() {
var mTime int64
if message.NewEntry != nil {
mTime = message.NewEntry.Attributes.Mtime
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index df70be64b..865f1b25c 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -18,10 +18,11 @@ import (
)
type AzureSink struct {
- containerURL azblob.ContainerURL
- container string
- dir string
- filerSource *source.FilerSource
+ containerURL azblob.ContainerURL
+ container string
+ dir string
+ filerSource *source.FilerSource
+ isIncremental bool
}
func init() {
@@ -36,7 +37,12 @@ func (g *AzureSink) GetSinkToDirectory() string {
return g.dir
}
+func (g *AzureSink) IsIncremental() bool {
+ return g.isIncremental
+}
+
func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error {
+ g.isIncremental = configuration.GetBool(prefix+"is_incremental")
return g.initialize(
configuration.GetString(prefix+"account_name"),
configuration.GetString(prefix+"account_key"),
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go
index 24f0ecbbc..8738231d5 100644
--- a/weed/replication/sink/b2sink/b2_sink.go
+++ b/weed/replication/sink/b2sink/b2_sink.go
@@ -18,6 +18,7 @@ type B2Sink struct {
bucket string
dir string
filerSource *source.FilerSource
+ isIncremental bool
}
func init() {
@@ -32,7 +33,12 @@ func (g *B2Sink) GetSinkToDirectory() string {
return g.dir
}
+func (g *B2Sink) IsIncremental() bool {
+ return g.isIncremental
+}
+
func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error {
+ g.isIncremental = configuration.GetBool(prefix+"is_incremental")
return g.initialize(
configuration.GetString(prefix+"b2_account_id"),
configuration.GetString(prefix+"b2_master_application_key"),
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 509f75116..4165e87be 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -30,6 +30,7 @@ type FilerSink struct {
grpcDialOption grpc.DialOption
address string
writeChunkByFiler bool
+ isIncremental bool
}
func init() {
@@ -44,7 +45,12 @@ func (fs *FilerSink) GetSinkToDirectory() string {
return fs.dir
}
+func (fs *FilerSink) IsIncremental() bool {
+ return fs.isIncremental
+}
+
func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
+ fs.isIncremental = configuration.GetBool(prefix+"is_incremental")
return fs.DoInitialize(
"",
configuration.GetString(prefix+"grpcAddress"),
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
index badabc32c..02f482862 100644
--- a/weed/replication/sink/gcssink/gcs_sink.go
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -22,6 +22,7 @@ type GcsSink struct {
bucket string
dir string
filerSource *source.FilerSource
+ isIncremental bool
}
func init() {
@@ -36,7 +37,12 @@ func (g *GcsSink) GetSinkToDirectory() string {
return g.dir
}
+func (g *GcsSink) IsIncremental() bool {
+ return g.isIncremental
+}
+
func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error {
+ g.isIncremental = configuration.GetBool(prefix+"is_incremental")
return g.initialize(
configuration.GetString(prefix+"google_application_credentials"),
configuration.GetString(prefix+"bucket"),
diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go
index 21c625c3f..2b9b3e69a 100644
--- a/weed/replication/sink/localsink/local_sink.go
+++ b/weed/replication/sink/localsink/local_sink.go
@@ -50,6 +50,10 @@ func (localsink *LocalSink) GetSinkToDirectory() string {
return localsink.Dir
}
+func (localsink *LocalSink) IsIncremental() bool {
+ return true
+}
+
func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
if localsink.isMultiPartEntry(key) {
return nil
@@ -74,13 +78,13 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa
if _, err := os.Stat(dir); os.IsNotExist(err) {
glog.V(4).Infof("Create Direcotry key: %s", dir)
- if err = os.MkdirAll(dir, 0); err != nil {
+ if err = os.MkdirAll(dir, 0755); err != nil {
return err
}
}
writeFunc := func(data []byte) error {
- writeErr := ioutil.WriteFile(key, data, 0)
+ writeErr := ioutil.WriteFile(key, data, 0755)
return writeErr
}
diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go
index cfc6e0a4d..4ffd09462 100644
--- a/weed/replication/sink/replication_sink.go
+++ b/weed/replication/sink/replication_sink.go
@@ -14,6 +14,7 @@ type ReplicationSink interface {
UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error)
GetSinkToDirectory() string
SetSourceFiler(s *source.FilerSource)
+ IsIncremental() bool
}
var (
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index 58432ee6b..9a36573e3 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -21,12 +21,13 @@ import (
)
type S3Sink struct {
- conn s3iface.S3API
- region string
- bucket string
- dir string
- endpoint string
- filerSource *source.FilerSource
+ conn s3iface.S3API
+ region string
+ bucket string
+ dir string
+ endpoint string
+ filerSource *source.FilerSource
+ isIncremental bool
}
func init() {
@@ -41,11 +42,17 @@ func (s3sink *S3Sink) GetSinkToDirectory() string {
return s3sink.dir
}
+func (s3sink *S3Sink) IsIncremental() bool {
+ return s3sink.isIncremental
+}
+
func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error {
glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region"))
glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket"))
glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory"))
glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint"))
+ glog.V(0).Infof("sink.s3.is_incremental: %v", configuration.GetString(prefix+"is_incremental"))
+ s3sink.isIncremental = configuration.GetBool(prefix + "is_incremental")
return s3sink.initialize(
configuration.GetString(prefix+"aws_access_key_id"),
configuration.GetString(prefix+"aws_secret_access_key"),
@@ -67,8 +74,9 @@ func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, buc
s3sink.endpoint = endpoint
config := &aws.Config{
- Region: aws.String(s3sink.region),
- Endpoint: aws.String(s3sink.endpoint),
+ Region: aws.String(s3sink.region),
+ Endpoint: aws.String(s3sink.endpoint),
+ S3ForcePathStyle: aws.Bool(true),
}
if awsAccessKeyId != "" && awsSecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
@@ -104,7 +112,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
uploadId, err := s3sink.createMultipartUpload(key, entry)
if err != nil {
- return err
+ return fmt.Errorf("createMultipartUpload: %v", err)
}
totalSize := filer.FileSize(entry)
@@ -120,6 +128,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
defer wg.Done()
if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil {
err = uploadErr
+ glog.Errorf("uploadPart: %v", uploadErr)
} else {
parts[index] = part
}
@@ -129,7 +138,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
if err != nil {
s3sink.abortMultipartUpload(key, uploadId)
- return err
+ return fmt.Errorf("uploadPart: %v", err)
}
return s3sink.completeMultipartUpload(context.Background(), key, uploadId, parts)
diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go
index b172ea2c3..bf1ad9b76 100644
--- a/weed/replication/sink/s3sink/s3_write.go
+++ b/weed/replication/sink/s3sink/s3_write.go
@@ -94,12 +94,13 @@ func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId
result, err := s3sink.conn.CompleteMultipartUpload(input)
if err == nil {
- glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
+ glog.V(1).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
} else {
glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
+ return fmt.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
}
- return err
+ return nil
}
// To upload a part
@@ -163,7 +164,7 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, er
}
buf := make([]byte, chunk.Size)
for _, fileUrl := range fileUrls {
- _, err = util.ReadUrl(fileUrl+"?readDeleted=true", nil, false, false, chunk.Offset, int(chunk.Size), buf)
+ _, err = util.ReadUrl(fileUrl, chunk.CipherKey, chunk.IsGzipped, false, chunk.Offset, int(chunk.Size), buf)
if err != nil {
glog.V(1).Infof("read from %s: %v", fileUrl, err)
} else {
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 3982360b0..e2e3575dc 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -83,8 +83,12 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
}
- for _, loc := range locations.Locations {
- fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s", loc.Url, part))
+ if !fs.proxyByFiler {
+ for _, loc := range locations.Locations {
+ fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s?readDeleted=true", loc.Url, part))
+ }
+ } else {
+ fileUrls = append(fileUrls, fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, part))
}
return
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index f77462adb..e210f4bdf 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -61,15 +61,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return
}
- if len(entry.Chunks) == 0 && len(entry.Content) == 0 {
- glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
- stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc()
- w.WriteHeader(http.StatusNoContent)
- return
- }
-
w.Header().Set("Accept-Ranges", "bytes")
- w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat))
// mime type
mimeType := entry.Attr.Mime
@@ -164,6 +156,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
if offset+size <= int64(len(entry.Content)) {
_, err := writer.Write(entry.Content[offset : offset+size])
+ glog.Errorf("failed to write entry content: %v", err)
return err
}
return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 29aff5c0b..156afd4a1 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -77,7 +77,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.AvailableSpaceFor(option) <= 0 {
- return nil, fmt.Errorf("no free volumes left for "+option.String())
+ return nil, fmt.Errorf("no free volumes left for " + option.String())
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {
@@ -122,11 +122,8 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
stats := volumeLayout.Stats()
-
- totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
-
resp := &master_pb.StatisticsResponse{
- TotalSize: uint64(totalSize),
+ TotalSize: stats.TotalSize,
UsedSize: stats.UsedSize,
FileCount: stats.FileCount,
}
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 3b6524df8..cf9f9f777 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -61,7 +61,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}
location := vs.store.FindFreeLocation(types.ToDiskType(diskType))
if location == nil {
- return fmt.Errorf("no space left")
+ return fmt.Errorf("no space left for disk type %s", types.ToDiskType(diskType).ReadableString())
}
dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 87a138ab6..fd35bb14b 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -201,17 +201,14 @@ type EcRack struct {
func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
- var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
- return err
- })
+ // collect topology information
+ topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil {
- return nil, 0, err
+ return
}
// find out all volume servers with one slot left.
- ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(resp.TopologyInfo, selectedDataCenter)
+ ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
sortEcNodesByFreeslotsDecending(ecNodes)
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index 3e1499d41..dafdb041a 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -51,7 +51,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
vid := needle.VolumeId(*volumeId)
// collect topology information
- topologyInfo, err := collectTopologyInfo(commandEnv)
+ topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil {
return err
}
@@ -208,7 +208,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasur
}
-func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) {
+func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
@@ -219,7 +219,7 @@ func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyIn
return
}
- return resp.TopologyInfo, nil
+ return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
}
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index e937f4490..edacf22c6 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -265,11 +265,8 @@ func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
- var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
- return err
- })
+ // collect topology information
+ topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv)
if err != nil {
return
}
@@ -280,11 +277,11 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds)
vidMap := make(map[uint32]bool)
- eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos {
if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
- if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 {
+ if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
vidMap[v.Id] = true
}
}
diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go
index d0db3722a..02cd7ac69 100644
--- a/weed/shell/command_fs_configure.go
+++ b/weed/shell/command_fs_configure.go
@@ -52,7 +52,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
collection := fsConfigureCommand.String("collection", "", "assign writes to this collection")
replication := fsConfigureCommand.String("replication", "", "assign writes with this replication")
ttl := fsConfigureCommand.String("ttl", "", "assign writes with this ttl")
- diskType := fsConfigureCommand.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
+ diskType := fsConfigureCommand.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag")
fsync := fsConfigureCommand.Bool("fsync", false, "fsync for the writes")
volumeGrowthCount := fsConfigureCommand.Int("volumeGrowthCount", 0, "the number of physical volumes to add if no writable volumes")
isDelete := fsConfigureCommand.Bool("delete", false, "delete the configuration by locationPrefix")
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go
index a1e3c1ab6..e0c41f310 100644
--- a/weed/shell/command_volume_balance.go
+++ b/weed/shell/command_volume_balance.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
@@ -75,18 +74,15 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return nil
}
- var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
- return err
- })
+ // collect topology information
+ topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv)
if err != nil {
return err
}
- volumeServers := collectVolumeServersByDc(resp.TopologyInfo, *dc)
- volumeReplicas, _ := collectVolumeReplicaLocations(resp)
- diskTypes := collectVolumeDiskTypes(resp.TopologyInfo)
+ volumeServers := collectVolumeServersByDc(topologyInfo, *dc)
+ volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
+ diskTypes := collectVolumeDiskTypes(topologyInfo)
if *collection == "EACH_COLLECTION" {
collections, err := ListCollectionNames(commandEnv, true, false)
@@ -94,16 +90,16 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return err
}
for _, c := range collections {
- if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
+ if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
return err
}
}
} else if *collection == "ALL_COLLECTIONS" {
- if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
+ if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
return err
}
} else {
- if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
+ if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
return err
}
}
diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go
index ecbe402e8..e3f034873 100644
--- a/weed/shell/command_volume_configure_replication.go
+++ b/weed/shell/command_volume_configure_replication.go
@@ -56,11 +56,8 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
}
replicaPlacementInt32 := uint32(replicaPlacement.Byte())
- var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
- return err
- })
+ // collect topology information
+ topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil {
return err
}
@@ -69,7 +66,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
// find all data nodes with volumes that needs replication change
var allLocations []location
- eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
loc := newLocation(dc, string(rack), dn)
for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos {
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 7b2eb6769..538351fd0 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -64,18 +64,15 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
takeAction := !*skipChange
- var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
- return err
- })
+ // collect topology information
+ topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil {
return err
}
// find all volumes that needs replication
// collect all data nodes
- volumeReplicas, allLocations := collectVolumeReplicaLocations(resp)
+ volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
if len(allLocations) == 0 {
return fmt.Errorf("no data nodes at all")
@@ -107,10 +104,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
-func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint32][]*VolumeReplica, []location) {
+func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
volumeReplicas := make(map[uint32][]*VolumeReplica)
var allLocations []location
- eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
loc := newLocation(dc, string(rack), dn)
for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos {
@@ -165,10 +162,10 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
foundNewLocation := false
hasSkippedCollection := false
- keepDataNodesSorted(allLocations, replica.info.DiskType)
+ keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
+ fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
for _, dst := range allLocations {
// check whether data nodes satisfy the constraints
- fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
// check collection name pattern
if *c.collectionPattern != "" {
@@ -219,8 +216,8 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
return nil
}
-func keepDataNodesSorted(dataNodes []location, diskType string) {
- fn := capacityByFreeVolumeCount(types.ToDiskType(diskType))
+func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
+ fn := capacityByFreeVolumeCount(diskType)
sort.Slice(dataNodes, func(i, j int) bool {
return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode)
})
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index f9dcf3b5f..1fbc9ad35 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -73,7 +73,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
defer os.RemoveAll(tempFolder)
// collect all volume id locations
- volumeIdToVInfo, err := c.collectVolumeIds(*verbose, writer)
+ volumeIdToVInfo, err := c.collectVolumeIds(commandEnv, *verbose, writer)
if err != nil {
return fmt.Errorf("failed to collect all volume locations: %v", err)
}
@@ -268,23 +268,20 @@ type VInfo struct {
isEcVolume bool
}
-func (c *commandVolumeFsck) collectVolumeIds(verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) {
+func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) {
if verbose {
fmt.Fprintf(writer, "collecting volume id and locations from master ...\n")
}
volumeIdToServer = make(map[uint32]VInfo)
- var resp *master_pb.VolumeListResponse
- err = c.env.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
- return err
- })
+ // collect topology information
+ topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil {
return
}
- eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
+ eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
for _, diskInfo := range t.DiskInfos {
for _, vi := range diskInfo.VolumeInfos {
volumeIdToServer[vi.Id] = VInfo{
diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go
index dc8783cd1..9856de10b 100644
--- a/weed/shell/command_volume_list.go
+++ b/weed/shell/command_volume_list.go
@@ -2,7 +2,6 @@ package shell
import (
"bytes"
- "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
@@ -32,16 +31,13 @@ func (c *commandVolumeList) Help() string {
func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
- return err
- })
+ // collect topology information
+ topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv)
if err != nil {
return err
}
- writeTopologyInfo(writer, resp.TopologyInfo, resp.VolumeSizeLimitMb)
+ writeTopologyInfo(writer, topologyInfo, volumeSizeLimitMb)
return nil
}
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index f9462beee..84f33db34 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -29,7 +29,7 @@ func (c *commandVolumeMove) Help() string {
return `move a live volume from one volume server to another volume server
volume.move -source