Browse Source

Merge pull request #5 from chrislusf/master

sync
pull/1851/head
bingoohuang 4 years ago
committed by GitHub
parent
commit
eb65cbf002
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      docker/compose/local-clusters-compose.yml
  2. 8
      k8s/README.md
  3. 4
      k8s/seaweedfs/Chart.yaml
  4. 23
      k8s/seaweedfs/templates/_helpers.tpl
  5. 2
      k8s/seaweedfs/templates/cronjob.yaml
  6. 1
      k8s/seaweedfs/templates/filer-service-client.yaml
  7. 78
      k8s/seaweedfs/templates/volume-statefulset.yaml
  8. 20
      k8s/seaweedfs/values.yaml
  9. 2
      other/java/client/pom.xml
  10. 2
      other/java/client/pom.xml.deploy
  11. 2
      other/java/client/pom_debug.xml
  12. 33
      other/java/client/src/main/java/seaweedfs/client/FilerClient.java
  13. 3
      other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
  14. 4
      other/java/examples/pom.xml
  15. 2
      other/java/hdfs2/dependency-reduced-pom.xml
  16. 2
      other/java/hdfs2/pom.xml
  17. 2
      other/java/hdfs3/dependency-reduced-pom.xml
  18. 2
      other/java/hdfs3/pom.xml
  19. 4
      weed/Makefile
  20. 2
      weed/command/benchmark.go
  21. 1
      weed/command/command.go
  22. 2
      weed/command/filer.go
  23. 158
      weed/command/filer_backup.go
  24. 2
      weed/command/filer_copy.go
  25. 4
      weed/command/filer_meta_tail.go
  26. 29
      weed/command/filer_replication.go
  27. 207
      weed/command/filer_sync.go
  28. 2
      weed/command/mount.go
  29. 8
      weed/command/scaffold.go
  30. 4
      weed/command/server.go
  31. 2
      weed/command/upload.go
  32. 2
      weed/command/volume.go
  33. 2
      weed/command/webdav.go
  34. 2
      weed/filer/stream.go
  35. 33
      weed/operation/upload_content.go
  36. 2
      weed/replication/repl_util/replication_util.go
  37. 2
      weed/replication/replicator.go
  38. 14
      weed/replication/sink/azuresink/azure_sink.go
  39. 6
      weed/replication/sink/b2sink/b2_sink.go
  40. 6
      weed/replication/sink/filersink/filer_sink.go
  41. 6
      weed/replication/sink/gcssink/gcs_sink.go
  42. 8
      weed/replication/sink/localsink/local_sink.go
  43. 1
      weed/replication/sink/replication_sink.go
  44. 29
      weed/replication/sink/s3sink/s3_sink.go
  45. 7
      weed/replication/sink/s3sink/s3_write.go
  46. 8
      weed/replication/source/filer_source.go
  47. 9
      weed/server/filer_server_handlers_read.go
  48. 7
      weed/server/master_grpc_server_volume.go
  49. 2
      weed/server/volume_grpc_copy.go
  50. 11
      weed/shell/command_ec_common.go
  51. 6
      weed/shell/command_ec_decode.go
  52. 11
      weed/shell/command_ec_encode.go
  53. 2
      weed/shell/command_fs_configure.go
  54. 20
      weed/shell/command_volume_balance.go
  55. 9
      weed/shell/command_volume_configure_replication.go
  56. 21
      weed/shell/command_volume_fix_replication.go
  57. 13
      weed/shell/command_volume_fsck.go
  58. 10
      weed/shell/command_volume_list.go
  59. 6
      weed/shell/command_volume_move.go
  60. 22
      weed/shell/command_volume_server_evacuate.go
  61. 2
      weed/shell/command_volume_tier_download.go
  62. 121
      weed/shell/command_volume_tier_move.go
  63. 7
      weed/storage/types/volume_disk_type.go
  64. 25
      weed/topology/topology_vacuum.go
  65. 2
      weed/topology/volume_layout.go
  66. 2
      weed/topology/volume_location_list.go
  67. 2
      weed/util/constants.go
  68. 4
      weed/util/file_util.go
  69. 4
      weed/util/fla9/fla9.go

3
docker/compose/local-clusters-compose.yml

@ -20,4 +20,5 @@ services:
- 18085:18080 - 18085:18080
- 8889:8888 - 8889:8888
- 18889:18888 - 18889:18888
command: "server -ip=server2 -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1"
- 8334:8333
command: "server -ip=server2 -filer -s3 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1"

8
k8s/README.md

@ -29,6 +29,14 @@ please set/update the corresponding affinity rule in values.yaml to an empty one
```affinity: ""``` ```affinity: ""```
### PVC - storage class ###
on the volume stateful set added support for K8S PVC, currently example
with the simple local-path-provisioner from Rancher (comes included with k3d / k3s)
https://github.com/rancher/local-path-provisioner
you can use ANY storage class you like, just update the correct storage-class
for your deployment.
### current instances config (AIO): ### current instances config (AIO):
1 instance for each type (master/filer+s3/volume) 1 instance for each type (master/filer+s3/volume)

4
k8s/seaweedfs/Chart.yaml

@ -1,5 +1,5 @@
apiVersion: v1 apiVersion: v1
description: SeaweedFS description: SeaweedFS
name: seaweedfs name: seaweedfs
appVersion: "2.26"
version: 2.26
appVersion: "2.28"
version: 2.28

23
k8s/seaweedfs/templates/_helpers.tpl

@ -126,3 +126,26 @@ Inject extra environment vars in the format key:value, if populated
{{- printf "%s%s%s:%s" $registryName $repositoryName $name $tag -}} {{- printf "%s%s%s:%s" $registryName $repositoryName $name $tag -}}
{{- end -}} {{- end -}}
{{- end -}} {{- end -}}
{{/* check if any PVC exists */}}
{{- define "volume.pvc_exists" -}}
{{- if or (or (eq .Values.volume.data.type "persistentVolumeClaim") (and (eq .Values.volume.idx.type "persistentVolumeClaim") .Values.volume.dir_idx )) (eq .Values.volume.logs.type "persistentVolumeClaim") -}}
{{- printf "true" -}}
{{- else -}}
{{- printf "false" -}}
{{- end -}}
{{- end -}}
{{/* check if any HostPath exists */}}
{{- define "volume.hostpath_exists" -}}
{{- if or (or (eq .Values.volume.data.type "hostPath") (and (eq .Values.volume.idx.type "hostPath") .Values.volume.dir_idx )) (eq .Values.volume.logs.type "hostPath") -}}
{{- printf "true" -}}
{{- else -}}
{{- if or .Values.global.enableSecurity .Values.volume.extraVolumes -}}
{{- printf "true" -}}
{{- else -}}
{{- printf "false" -}}
{{- end -}}
{{- end -}}
{{- end -}}

2
k8s/seaweedfs/templates/cronjob.yaml

@ -40,7 +40,7 @@ spec:
{{ if .Values.volume.dataCenter }} -dataCenter {{ .Values.volume.dataCenter }}{{ end }}\ {{ if .Values.volume.dataCenter }} -dataCenter {{ .Values.volume.dataCenter }}{{ end }}\
{{ if .Values.cronjob.collection }} -collection {{ .Values.cronjob.collection }}{{ end }}\n\ {{ if .Values.cronjob.collection }} -collection {{ .Values.cronjob.collection }}{{ end }}\n\
{{- if .Values.cronjob.enableFixReplication }} {{- if .Values.cronjob.enableFixReplication }}
volume.fix.replication {{ if .Values.cronjob.collectionPattern }} -collectionPattern={{ .Values.cronjob.collectionPattern }} {{ end }} \n\
volume.fix.replication -collectionPattern={{ .Values.cronjob.collectionPattern }} \n\
{{- end }} {{- end }}
unlock\n" | \ unlock\n" | \
/usr/bin/weed shell \ /usr/bin/weed shell \

1
k8s/seaweedfs/templates/filer-service-client.yaml

@ -10,6 +10,7 @@ metadata:
monitoring: "true" monitoring: "true"
{{- end }} {{- end }}
spec: spec:
clusterIP: None
ports: ports:
- name: "swfs-filer" - name: "swfs-filer"
port: {{ .Values.filer.port }} port: {{ .Values.filer.port }}

78
k8s/seaweedfs/templates/volume-statefulset.yaml

@ -45,6 +45,19 @@ spec:
priorityClassName: {{ .Values.volume.priorityClassName | quote }} priorityClassName: {{ .Values.volume.priorityClassName | quote }}
{{- end }} {{- end }}
enableServiceLinks: false enableServiceLinks: false
{{- if .Values.volume.dir_idx }}
initContainers:
- name: seaweedfs-vol-move-idx
image: {{ template "volume.image" . }}
imagePullPolicy: {{ .Values.global.pullPolicy | default "IfNotPresent" }}
command: [ '/bin/sh', '-c' ]
args: ['if ls {{ .Values.volume.dir }}/*.idx >/dev/null 2>&1; then mv {{ .Values.volume.dir }}/*.idx {{ .Values.volume.dir_idx }}/; fi;']
volumeMounts:
- name: idx
mountPath: {{ .Values.volume.dir_idx }}
- name: data
mountPath: {{ .Values.volume.dir }}
{{- end }}
containers: containers:
- name: seaweedfs - name: seaweedfs
image: {{ template "volume.image" . }} image: {{ template "volume.image" . }}
@ -118,9 +131,13 @@ spec:
-compactionMBps={{ .Values.volume.compactionMBps }} \ -compactionMBps={{ .Values.volume.compactionMBps }} \
-mserver={{ range $index := until (.Values.master.replicas | int) }}${SEAWEEDFS_FULLNAME}-master-{{ $index }}.${SEAWEEDFS_FULLNAME}-master:{{ $.Values.master.port }}{{ if lt $index (sub ($.Values.master.replicas | int) 1) }},{{ end }}{{ end }} -mserver={{ range $index := until (.Values.master.replicas | int) }}${SEAWEEDFS_FULLNAME}-master-{{ $index }}.${SEAWEEDFS_FULLNAME}-master:{{ $.Values.master.port }}{{ if lt $index (sub ($.Values.master.replicas | int) 1) }},{{ end }}{{ end }}
volumeMounts: volumeMounts:
- name: seaweedfs-volume-storage
mountPath: "/data/"
- name: seaweedfs-volume-log-volume
- name: data
mountPath: "{{ .Values.volume.dir }}/"
{{- if .Values.volume.dir_idx }}
- name: idx
mountPath: "{{ .Values.volume.dir_idx }}/"
{{- end }}
- name: logs
mountPath: "/logs/" mountPath: "/logs/"
{{- if .Values.global.enableSecurity }} {{- if .Values.global.enableSecurity }}
- name: security-config - name: security-config
@ -173,15 +190,27 @@ spec:
resources: resources:
{{ tpl .Values.volume.resources . | nindent 12 | trim }} {{ tpl .Values.volume.resources . | nindent 12 | trim }}
{{- end }} {{- end }}
{{- $hostpath_exists := include "volume.hostpath_exists" . -}}
{{- if $hostpath_exists }}
volumes: volumes:
- name: seaweedfs-volume-log-volume
{{- if eq .Values.volume.data.type "hostPath" }}
- name: data
hostPath: hostPath:
path: /storage/logs/seaweedfs/volume
path: /storage/object_store/
type: DirectoryOrCreate type: DirectoryOrCreate
- name: seaweedfs-volume-storage
{{- end }}
{{- if and (eq .Values.volume.idx.type "hostPath") .Values.volume.dir_idx }}
- name: idx
hostPath: hostPath:
path: /storage/object_store/
path: /ssd/seaweedfs-volume-idx/
type: DirectoryOrCreate type: DirectoryOrCreate
{{- end }}
{{- if eq .Values.volume.logs.type "hostPath" }}
- name: logs
hostPath:
path: /storage/logs/seaweedfs/volume
type: DirectoryOrCreate
{{- end }}
{{- if .Values.global.enableSecurity }} {{- if .Values.global.enableSecurity }}
- name: security-config - name: security-config
configMap: configMap:
@ -205,8 +234,43 @@ spec:
{{- if .Values.volume.extraVolumes }} {{- if .Values.volume.extraVolumes }}
{{ tpl .Values.volume.extraVolumes . | indent 8 | trim }} {{ tpl .Values.volume.extraVolumes . | indent 8 | trim }}
{{- end }} {{- end }}
{{- end }}
{{- if .Values.volume.nodeSelector }} {{- if .Values.volume.nodeSelector }}
nodeSelector: nodeSelector:
{{ tpl .Values.volume.nodeSelector . | indent 8 | trim }} {{ tpl .Values.volume.nodeSelector . | indent 8 | trim }}
{{- end }} {{- end }}
{{- $pvc_exists := include "volume.pvc_exists" . -}}
{{- if $pvc_exists }}
volumeClaimTemplates:
{{- if eq .Values.volume.data.type "persistentVolumeClaim"}}
- metadata:
name: data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: {{ .Values.volume.data.storageClass }}
resources:
requests:
storage: {{ .Values.volume.data.size }}
{{- end }}
{{- if and (eq .Values.volume.idx.type "persistentVolumeClaim") .Values.volume.dir_idx }}
- metadata:
name: idx
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: {{ .Values.volume.idx.storageClass }}
resources:
requests:
storage: {{ .Values.volume.idx.size }}
{{- end }}
{{- if eq .Values.volume.logs.type "persistentVolumeClaim" }}
- metadata:
name: logs
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: {{ .Values.volume.logs.storageClass }}
resources:
requests:
storage: {{ .Values.volume.logs.size }}
{{- end }}
{{- end }}
{{- end }} {{- end }}

20
k8s/seaweedfs/values.yaml

@ -4,7 +4,7 @@ global:
registry: "" registry: ""
repository: "" repository: ""
imageName: chrislusf/seaweedfs imageName: chrislusf/seaweedfs
# imageTag: "2.26" - started using {.Chart.appVersion}
# imageTag: "2.28" - started using {.Chart.appVersion}
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret imagePullSecrets: imagepullsecret
restartPolicy: Always restartPolicy: Always
@ -138,6 +138,24 @@ volume:
# minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly # minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly
minFreeSpacePercent: 7 minFreeSpacePercent: 7
# can use ANY storage-class , example with local-path-provisner
# data:
# type: "persistentVolumeClaim"
# size: "24Ti"
# storageClass: "local-path-provisioner"
data:
type: "hostPath"
size: ""
storageClass: ""
idx:
type: "hostPath"
size: ""
storageClass: ""
logs:
type: "hostPath"
size: ""
storageClass: ""
# limit background compaction or copying speed in mega bytes per second # limit background compaction or copying speed in mega bytes per second
compactionMBps: "50" compactionMBps: "50"

2
other/java/client/pom.xml

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.6.1</version>
<version>1.6.2</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

2
other/java/client/pom.xml.deploy

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.6.1</version>
<version>1.6.2</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

2
other/java/client/pom_debug.xml

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.6.1</version>
<version>1.6.2</version>
<parent> <parent>
<groupId>org.sonatype.oss</groupId> <groupId>org.sonatype.oss</groupId>

33
other/java/client/src/main/java/seaweedfs/client/FilerClient.java

@ -4,8 +4,7 @@ import com.google.common.base.Strings;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
@ -94,9 +93,9 @@ public class FilerClient extends FilerGrpcClient {
if ("/".equals(path)) { if ("/".equals(path)) {
return true; return true;
} }
Path pathObject = Paths.get(path);
String parent = pathObject.getParent().toString();
String name = pathObject.getFileName().toString();
File pathFile = new File(path);
String parent = pathFile.getParent();
String name = pathFile.getName();
mkdirs(parent, mode, uid, gid, userName, groupNames); mkdirs(parent, mode, uid, gid, userName, groupNames);
@ -115,13 +114,13 @@ public class FilerClient extends FilerGrpcClient {
public boolean mv(String oldPath, String newPath) { public boolean mv(String oldPath, String newPath) {
Path oldPathObject = Paths.get(oldPath);
String oldParent = oldPathObject.getParent().toString();
String oldName = oldPathObject.getFileName().toString();
File oldPathFile = new File(oldPath);
String oldParent = oldPathFile.getParent();
String oldName = oldPathFile.getName();
Path newPathObject = Paths.get(newPath);
String newParent = newPathObject.getParent().toString();
String newName = newPathObject.getFileName().toString();
File newPathFile = new File(newPath);
String newParent = newPathFile.getParent();
String newName = newPathFile.getName();
return atomicRenameEntry(oldParent, oldName, newParent, newName); return atomicRenameEntry(oldParent, oldName, newParent, newName);
@ -129,9 +128,9 @@ public class FilerClient extends FilerGrpcClient {
public boolean rm(String path, boolean isRecursive, boolean ignoreRecusiveError) { public boolean rm(String path, boolean isRecursive, boolean ignoreRecusiveError) {
Path pathObject = Paths.get(path);
String parent = pathObject.getParent().toString();
String name = pathObject.getFileName().toString();
File pathFile = new File(path);
String parent = pathFile.getParent();
String name = pathFile.getName();
return deleteEntry( return deleteEntry(
parent, parent,
@ -148,9 +147,9 @@ public class FilerClient extends FilerGrpcClient {
public boolean touch(String path, int mode, int uid, int gid, String userName, String[] groupNames) { public boolean touch(String path, int mode, int uid, int gid, String userName, String[] groupNames) {
Path pathObject = Paths.get(path);
String parent = pathObject.getParent().toString();
String name = pathObject.getFileName().toString();
File pathFile = new File(path);
String parent = pathFile.getParent();
String name = pathFile.getName();
FilerProto.Entry entry = lookupEntry(parent, name); FilerProto.Entry entry = lookupEntry(parent, name);
if (entry == null) { if (entry == null) {

3
other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java

@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.*; import java.util.concurrent.*;
@ -217,7 +218,7 @@ public class SeaweedOutputStream extends OutputStream {
private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException {
bufferToWrite.flip();
((Buffer)bufferToWrite).flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {

4
other/java/examples/pom.xml

@ -11,13 +11,13 @@
<dependency> <dependency>
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId> <artifactId>seaweedfs-client</artifactId>
<version>1.6.1</version>
<version>1.6.2</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.github.chrislusf</groupId> <groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-hadoop2-client</artifactId> <artifactId>seaweedfs-hadoop2-client</artifactId>
<version>1.6.1</version>
<version>1.6.2</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency> <dependency>

2
other/java/hdfs2/dependency-reduced-pom.xml

@ -301,7 +301,7 @@
</snapshotRepository> </snapshotRepository>
</distributionManagement> </distributionManagement>
<properties> <properties>
<seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<seaweedfs.client.version>1.6.2</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version> <hadoop.version>2.9.2</hadoop.version>
</properties> </properties>
</project> </project>

2
other/java/hdfs2/pom.xml

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<properties> <properties>
<seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<seaweedfs.client.version>1.6.2</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version> <hadoop.version>2.9.2</hadoop.version>
</properties> </properties>

2
other/java/hdfs3/dependency-reduced-pom.xml

@ -309,7 +309,7 @@
</snapshotRepository> </snapshotRepository>
</distributionManagement> </distributionManagement>
<properties> <properties>
<seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<seaweedfs.client.version>1.6.2</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version> <hadoop.version>3.1.1</hadoop.version>
</properties> </properties>
</project> </project>

2
other/java/hdfs3/pom.xml

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<properties> <properties>
<seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<seaweedfs.client.version>1.6.2</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version> <hadoop.version>3.1.1</hadoop.version>
</properties> </properties>

4
weed/Makefile

@ -33,3 +33,7 @@ debug_webdav:
debug_s3: debug_s3:
go build -gcflags="all=-N -l" go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 s3 dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 s3
debug_filer_copy:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h

2
weed/command/benchmark.go

@ -63,7 +63,7 @@ func init() {
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file") b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type") b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write") b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")

1
weed/command/command.go

@ -15,6 +15,7 @@ var Commands = []*Command{
cmdDownload, cmdDownload,
cmdExport, cmdExport,
cmdFiler, cmdFiler,
cmdFilerBackup,
cmdFilerCat, cmdFilerCat,
cmdFilerMetaTail, cmdFilerMetaTail,
cmdFilerReplicate, cmdFilerReplicate,

2
weed/command/filer.go

@ -84,7 +84,7 @@ func init() {
filerWebDavOptions.port = cmdFiler.Flag.Int("webdav.port", 7333, "webdav server http listen port") filerWebDavOptions.port = cmdFiler.Flag.Int("webdav.port", 7333, "webdav server http listen port")
filerWebDavOptions.collection = cmdFiler.Flag.String("webdav.collection", "", "collection to create the files") filerWebDavOptions.collection = cmdFiler.Flag.String("webdav.collection", "", "collection to create the files")
filerWebDavOptions.replication = cmdFiler.Flag.String("webdav.replication", "", "replication to create the files") filerWebDavOptions.replication = cmdFiler.Flag.String("webdav.replication", "", "replication to create the files")
filerWebDavOptions.disk = cmdFiler.Flag.String("webdav.disk", "", "[hdd|ssd] hard drive or solid state drive")
filerWebDavOptions.disk = cmdFiler.Flag.String("webdav.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
filerWebDavOptions.tlsPrivateKey = cmdFiler.Flag.String("webdav.key.file", "", "path to the TLS private key file") filerWebDavOptions.tlsPrivateKey = cmdFiler.Flag.String("webdav.key.file", "", "path to the TLS private key file")
filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file") filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file")
filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")

158
weed/command/filer_backup.go

@ -0,0 +1,158 @@
package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"io"
"time"
)
type FilerBackupOptions struct {
isActivePassive *bool
filer *string
path *string
debug *bool
proxyByFiler *bool
timeAgo *time.Duration
}
var (
filerBackupOptions FilerBackupOptions
)
func init() {
cmdFilerBackup.Run = runFilerBackup // break init cycle
filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
}
var cmdFilerBackup = &Command{
UsageLine: "filer.backup -filer=<filerHost>:<filerPort> ",
Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml",
Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml
filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content,
and write to the destination. This is to replace filer.replicate command since additional message queue is not needed.
If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute.
A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value.
`,
}
func runFilerBackup(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
util.LoadConfiguration("security", false)
util.LoadConfiguration("replication", true)
for {
err := doFilerBackup(grpcDialOption, &filerBackupOptions)
if err != nil {
glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
time.Sleep(1747 * time.Millisecond)
}
}
return true
}
const (
BackupKeyPrefix = "backup."
)
func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error {
// find data sink
config := util.GetViper()
dataSink := findSink(config)
if dataSink == nil {
return fmt.Errorf("no data sink configured in replication.toml")
}
sourceFiler := *backupOption.filer
sourcePath := *backupOption.path
timeAgo := *backupOption.timeAgo
targetPath := dataSink.GetSinkToDirectory()
debug := *backupOption.debug
// get start time for the data sink
startFrom := time.Unix(0, 0)
sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory())
if timeAgo.Milliseconds() == 0 {
lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId))
if err != nil {
glog.V(0).Infof("starting from %v", startFrom)
} else {
startFrom = time.Unix(0, lastOffsetTsNs)
glog.V(0).Infof("resuming from %v", startFrom)
}
} else {
startFrom = time.Now().Add(-timeAgo)
glog.V(0).Infof("start time is set to %v", startFrom)
}
// create filer sink
filerSource := &source.FilerSource{}
filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, *backupOption.proxyByFiler)
dataSink.SetSourceFiler(filerSource)
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "backup_" + dataSink.GetName(),
PathPrefix: sourcePath,
SinceNs: startFrom.UnixNano(),
})
if err != nil {
return fmt.Errorf("listen: %v", err)
}
var counter int64
var lastWriteTime time.Time
for {
resp, listenErr := stream.Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
return listenErr
}
if err := processEventFn(resp); err != nil {
return fmt.Errorf("processEventFn: %v", err)
}
counter++
if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
counter = 0
lastWriteTime = time.Now()
if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
return fmt.Errorf("setOffset: %v", err)
}
}
}
})
}

2
weed/command/filer_copy.go

@ -55,7 +55,7 @@ func init() {
copy.replication = cmdCopy.Flag.String("replication", "", "replication type") copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit") copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file") copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")

4
weed/command/filer_meta_tail.go

@ -24,8 +24,8 @@ func init() {
var cmdFilerMetaTail = &Command{ var cmdFilerMetaTail = &Command{
UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]", UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]",
Short: "see recent changes on a filer",
Long: `See recent changes on a filer.
Short: "see continuous changes on a filer",
Long: `See continuous changes on a filer.
weed filer.meta.tail -timeAgo=30h | grep truncate weed filer.meta.tail -timeAgo=30h | grep truncate
weed filer.meta.tail -timeAgo=30h | jq . weed filer.meta.tail -timeAgo=30h | jq .

29
weed/command/filer_replication.go

@ -74,18 +74,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
} }
} }
var dataSink sink.ReplicationSink
for _, sk := range sink.Sinks {
if config.GetBool("sink." + sk.GetName() + ".enabled") {
if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize sink for %s: %+v",
sk.GetName(), err)
}
glog.V(0).Infof("Configure sink to %s", sk.GetName())
dataSink = sk
break
}
}
dataSink := findSink(config)
if dataSink == nil { if dataSink == nil {
println("no data sink configured in replication.toml:") println("no data sink configured in replication.toml:")
@ -135,6 +124,22 @@ func runFilerReplicate(cmd *Command, args []string) bool {
} }
func findSink(config *util.ViperProxy) sink.ReplicationSink {
var dataSink sink.ReplicationSink
for _, sk := range sink.Sinks {
if config.GetBool("sink." + sk.GetName() + ".enabled") {
if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize sink for %s: %+v",
sk.GetName(), err)
}
glog.V(0).Infof("Configure sink to %s", sk.GetName())
dataSink = sk
break
}
}
return dataSink
}
func validateOneEnabledInput(config *util.ViperProxy) { func validateOneEnabledInput(config *util.ViperProxy) {
enabledInput := "" enabledInput := ""
for _, input := range sub.NotificationInputs { for _, input := range sub.NotificationInputs {

207
weed/command/filer_sync.go

@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication" "github.com/chrislusf/seaweedfs/weed/replication"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
"github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
@ -58,8 +59,8 @@ func init() {
syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B") syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A") syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B") syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd] hard drive or solid state drive on filer A")
syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd] hard drive or solid state drive on filer B")
syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A")
syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B")
syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers") syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers") syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files") syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
@ -137,7 +138,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
// if first time, start from now // if first time, start from now
// if has previously synced, resume from that point of time // if has previously synced, resume from that point of time
sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature)
sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature)
if err != nil { if err != nil {
return err return err
} }
@ -151,93 +152,17 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource) filerSink.SetSourceFiler(filerSource)
persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification message := resp.EventNotification
var sourceOldKey, sourceNewKey util.FullPath
if message.OldEntry != nil {
sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
}
if message.NewEntry != nil {
sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
}
for _, sig := range message.Signatures { for _, sig := range message.Signatures {
if sig == targetFilerSignature && targetFilerSignature != 0 { if sig == targetFilerSignature && targetFilerSignature != 0 {
fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message) fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
return nil return nil
} }
} }
if debug {
fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature)
}
if !strings.HasPrefix(resp.Directory, sourcePath) {
return nil
}
// handle deletions
if message.OldEntry != nil && message.NewEntry == nil {
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
return nil
}
key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
// handle new entries
if message.OldEntry == nil && message.NewEntry != nil {
if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
return nil
}
key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
}
// this is something special?
if message.OldEntry == nil && message.NewEntry == nil {
return nil
}
// handle updates
if strings.HasPrefix(string(sourceOldKey), sourcePath) {
// old key is in the watched directory
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
// new key is also in the watched directory
oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
if foundExisting {
return err
}
// not able to find old entry
if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
return fmt.Errorf("delete old entry %v: %v", oldKey, err)
}
// create the new entry
newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
} else {
// new key is outside of the watched directory
key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
} else {
// old key is outside of the watched directory
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
// new key is in the watched directory
key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
} else {
// new key is also outside of the watched directory
// skip
}
}
return nil
return persistEventFn(resp)
} }
return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@ -275,7 +200,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
counter = 0 counter = 0
lastWriteTime = time.Now() lastWriteTime = time.Now()
if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil {
if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil {
return err return err
} }
} }
@ -290,11 +215,11 @@ const (
SyncKeyPrefix = "sync." SyncKeyPrefix = "sync."
) )
func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) {
func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(SyncKeyPrefix + "____")
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey}) resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
if err != nil { if err != nil {
@ -317,11 +242,11 @@ func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature
} }
func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error {
func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error {
return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(SyncKeyPrefix + "____")
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
valueBuf := make([]byte, 8) valueBuf := make([]byte, 8)
util.Uint64toBytes(valueBuf, uint64(offsetTsNs)) util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
@ -343,3 +268,107 @@ func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignatur
}) })
} }
func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
// process function
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
var sourceOldKey, sourceNewKey util.FullPath
if message.OldEntry != nil {
sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
}
if message.NewEntry != nil {
sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
}
if debug {
glog.V(0).Infof("received %v", resp)
}
if !strings.HasPrefix(resp.Directory, sourcePath) {
return nil
}
// handle deletions
if message.OldEntry != nil && message.NewEntry == nil {
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
return nil
}
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
// handle new entries
if message.OldEntry == nil && message.NewEntry != nil {
if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
return nil
}
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
}
// this is something special?
if message.OldEntry == nil && message.NewEntry == nil {
return nil
}
// handle updates
if strings.HasPrefix(string(sourceOldKey), sourcePath) {
// old key is in the watched directory
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
// new key is also in the watched directory
if !dataSink.IsIncremental() {
oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
if foundExisting {
return err
}
// not able to find old entry
if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
return fmt.Errorf("delete old entry %v: %v", oldKey, err)
}
}
// create the new entry
newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
} else {
// new key is outside of the watched directory
if !dataSink.IsIncremental() {
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
}
} else {
// old key is outside of the watched directory
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
// new key is in the watched directory
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
} else {
// new key is also outside of the watched directory
// skip
}
}
return nil
}
return processEventFn
}
func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string {
if !dataSink.IsIncremental() {
return util.Join(targetPath, string(sourceKey)[len(sourcePath):])
}
var mTime int64
if message.NewEntry != nil {
mTime = message.NewEntry.Attributes.Mtime
} else if message.OldEntry != nil {
mTime = message.OldEntry.Attributes.Mtime
}
dateKey := time.Unix(mTime, 0).Format("2006-01-02")
return util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
}

2
weed/command/mount.go

@ -42,7 +42,7 @@ func init() {
mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to") mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files") mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.diskType = cmdMount.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
mountOptions.diskType = cmdMount.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 128, "limit concurrent goroutine writers if not 0") mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 128, "limit concurrent goroutine writers if not 0")

8
weed/command/scaffold.go

@ -356,6 +356,9 @@ directory = "/buckets"
[sink.local] [sink.local]
enabled = false enabled = false
directory = "/data" directory = "/data"
# all replicated files are under modified time as yyyy-mm-dd directories
# so each date directory contains all new and updated files.
is_incremental = false
[sink.local_incremental] [sink.local_incremental]
# all replicated files are under modified time as yyyy-mm-dd directories # all replicated files are under modified time as yyyy-mm-dd directories
@ -373,6 +376,7 @@ directory = "/backup"
replication = "" replication = ""
collection = "" collection = ""
ttlSec = 0 ttlSec = 0
is_incremental = false
[sink.s3] [sink.s3]
# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html # read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html
@ -384,6 +388,7 @@ region = "us-east-2"
bucket = "your_bucket_name" # an existing bucket bucket = "your_bucket_name" # an existing bucket
directory = "/" # destination directory directory = "/" # destination directory
endpoint = "" endpoint = ""
is_incremental = false
[sink.google_cloud_storage] [sink.google_cloud_storage]
# read credentials doc at https://cloud.google.com/docs/authentication/getting-started # read credentials doc at https://cloud.google.com/docs/authentication/getting-started
@ -391,6 +396,7 @@ enabled = false
google_application_credentials = "/path/to/x.json" # path to json credential file google_application_credentials = "/path/to/x.json" # path to json credential file
bucket = "your_bucket_seaweedfs" # an existing bucket bucket = "your_bucket_seaweedfs" # an existing bucket
directory = "/" # destination directory directory = "/" # destination directory
is_incremental = false
[sink.azure] [sink.azure]
# experimental, let me know if it works # experimental, let me know if it works
@ -399,6 +405,7 @@ account_name = ""
account_key = "" account_key = ""
container = "mycontainer" # an existing container container = "mycontainer" # an existing container
directory = "/" # destination directory directory = "/" # destination directory
is_incremental = false
[sink.backblaze] [sink.backblaze]
enabled = false enabled = false
@ -406,6 +413,7 @@ b2_account_id = ""
b2_master_application_key = "" b2_master_application_key = ""
bucket = "mybucket" # an existing bucket bucket = "mybucket" # an existing bucket
directory = "/" # destination directory directory = "/" # destination directory
is_incremental = false
` `

4
weed/command/server.go

@ -102,7 +102,7 @@ func init() {
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd] hard drive or solid state drive")
serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
@ -122,7 +122,7 @@ func init() {
webdavOptions.port = cmdServer.Flag.Int("webdav.port", 7333, "webdav server http listen port") webdavOptions.port = cmdServer.Flag.Int("webdav.port", 7333, "webdav server http listen port")
webdavOptions.collection = cmdServer.Flag.String("webdav.collection", "", "collection to create the files") webdavOptions.collection = cmdServer.Flag.String("webdav.collection", "", "collection to create the files")
webdavOptions.replication = cmdServer.Flag.String("webdav.replication", "", "replication to create the files") webdavOptions.replication = cmdServer.Flag.String("webdav.replication", "", "replication to create the files")
webdavOptions.disk = cmdServer.Flag.String("webdav.disk", "", "[hdd|ssd] hard drive or solid state drive")
webdavOptions.disk = cmdServer.Flag.String("webdav.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
webdavOptions.tlsPrivateKey = cmdServer.Flag.String("webdav.key.file", "", "path to the TLS private key file") webdavOptions.tlsPrivateKey = cmdServer.Flag.String("webdav.key.file", "", "path to the TLS private key file")
webdavOptions.tlsCertificate = cmdServer.Flag.String("webdav.cert.file", "", "path to the TLS certificate file") webdavOptions.tlsCertificate = cmdServer.Flag.String("webdav.cert.file", "", "path to the TLS certificate file")
webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")

2
weed/command/upload.go

@ -41,7 +41,7 @@ func init() {
upload.replication = cmdUpload.Flag.String("replication", "", "replication type") upload.replication = cmdUpload.Flag.String("replication", "", "replication type")
upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name") upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name")
upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name") upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name")
upload.diskType = cmdUpload.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
upload.diskType = cmdUpload.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit") upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit")
upload.usePublicUrl = cmdUpload.Flag.Bool("usePublicUrl", false, "upload to public url from volume server") upload.usePublicUrl = cmdUpload.Flag.Bool("usePublicUrl", false, "upload to public url from volume server")

2
weed/command/volume.go

@ -78,7 +78,7 @@ func init() {
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.")
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")

2
weed/command/webdav.go

@ -39,7 +39,7 @@ func init() {
webDavStandaloneOptions.port = cmdWebDav.Flag.Int("port", 7333, "webdav server http listen port") webDavStandaloneOptions.port = cmdWebDav.Flag.Int("port", 7333, "webdav server http listen port")
webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files") webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files")
webDavStandaloneOptions.replication = cmdWebDav.Flag.String("replication", "", "replication to create the files") webDavStandaloneOptions.replication = cmdWebDav.Flag.String("replication", "", "replication to create the files")
webDavStandaloneOptions.disk = cmdWebDav.Flag.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
webDavStandaloneOptions.disk = cmdWebDav.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file") webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file")
webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file") webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file")
webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks") webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")

2
weed/filer/stream.go

@ -181,7 +181,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer var buffer bytes.Buffer
var shouldRetry bool var shouldRetry bool
for _, urlString := range urlStrings { for _, urlString := range urlStrings {
shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data) buffer.Write(data)
}) })
if !shouldRetry { if !shouldRetry {

33
weed/operation/upload_content.go

@ -130,7 +130,8 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
// gzip if possible // gzip if possible
// this could be double copying // this could be double copying
clearDataLen = len(data) clearDataLen = len(data)
if shouldGzipNow {
clearData := data
if shouldGzipNow && !cipher {
compressed, compressErr := util.GzipData(data) compressed, compressErr := util.GzipData(data)
// fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed)) // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed))
if compressErr == nil { if compressErr == nil {
@ -139,7 +140,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
} }
} else if isInputCompressed { } else if isInputCompressed {
// just to get the clear data length // just to get the clear data length
clearData, err := util.DecompressData(data)
clearData, err = util.DecompressData(data)
if err == nil { if err == nil {
clearDataLen = len(clearData) clearDataLen = len(clearData)
} }
@ -150,7 +151,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
// encrypt // encrypt
cipherKey := util.GenCipherKey() cipherKey := util.GenCipherKey()
encryptedData, encryptionErr := util.Encrypt(data, cipherKey)
encryptedData, encryptionErr := util.Encrypt(clearData, cipherKey)
if encryptionErr != nil { if encryptionErr != nil {
err = fmt.Errorf("encrypt input: %v", encryptionErr) err = fmt.Errorf("encrypt input: %v", encryptionErr)
return return
@ -161,26 +162,26 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
_, err = w.Write(encryptedData) _, err = w.Write(encryptedData)
return return
}, "", false, len(encryptedData), "", nil, jwt) }, "", false, len(encryptedData), "", nil, jwt)
if uploadResult != nil {
uploadResult.Name = filename
uploadResult.Mime = mtype
uploadResult.CipherKey = cipherKey
if uploadResult == nil {
return
} }
uploadResult.Name = filename
uploadResult.Mime = mtype
uploadResult.CipherKey = cipherKey
uploadResult.Size = uint32(clearDataLen)
} else { } else {
// upload data // upload data
uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
_, err = w.Write(data) _, err = w.Write(data)
return return
}, filename, contentIsGzipped, len(data), mtype, pairMap, jwt) }, filename, contentIsGzipped, len(data), mtype, pairMap, jwt)
}
if uploadResult == nil {
return
}
uploadResult.Size = uint32(clearDataLen)
if contentIsGzipped {
uploadResult.Gzip = 1
if uploadResult == nil {
return
}
uploadResult.Size = uint32(clearDataLen)
if contentIsGzipped {
uploadResult.Gzip = 1
}
} }
return uploadResult, err return uploadResult, err

2
weed/replication/repl_util/replication_utli.go → weed/replication/repl_util/replication_util.go

@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var shouldRetry bool var shouldRetry bool
for _, fileUrl := range fileUrls { for _, fileUrl := range fileUrls {
shouldRetry, err = util.FastReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
shouldRetry, err = util.FastReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data) writeErr = writeFunc(data)
}) })
if err != nil { if err != nil {

2
weed/replication/replicator.go

@ -42,7 +42,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
return nil return nil
} }
var dateKey string var dateKey string
if r.sink.GetName() == "local_incremental" {
if r.sink.IsIncremental() {
var mTime int64 var mTime int64
if message.NewEntry != nil { if message.NewEntry != nil {
mTime = message.NewEntry.Attributes.Mtime mTime = message.NewEntry.Attributes.Mtime

14
weed/replication/sink/azuresink/azure_sink.go

@ -18,10 +18,11 @@ import (
) )
type AzureSink struct { type AzureSink struct {
containerURL azblob.ContainerURL
container string
dir string
filerSource *source.FilerSource
containerURL azblob.ContainerURL
container string
dir string
filerSource *source.FilerSource
isIncremental bool
} }
func init() { func init() {
@ -36,7 +37,12 @@ func (g *AzureSink) GetSinkToDirectory() string {
return g.dir return g.dir
} }
func (g *AzureSink) IsIncremental() bool {
return g.isIncremental
}
func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error { func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error {
g.isIncremental = configuration.GetBool(prefix+"is_incremental")
return g.initialize( return g.initialize(
configuration.GetString(prefix+"account_name"), configuration.GetString(prefix+"account_name"),
configuration.GetString(prefix+"account_key"), configuration.GetString(prefix+"account_key"),

6
weed/replication/sink/b2sink/b2_sink.go

@ -18,6 +18,7 @@ type B2Sink struct {
bucket string bucket string
dir string dir string
filerSource *source.FilerSource filerSource *source.FilerSource
isIncremental bool
} }
func init() { func init() {
@ -32,7 +33,12 @@ func (g *B2Sink) GetSinkToDirectory() string {
return g.dir return g.dir
} }
func (g *B2Sink) IsIncremental() bool {
return g.isIncremental
}
func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error { func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error {
g.isIncremental = configuration.GetBool(prefix+"is_incremental")
return g.initialize( return g.initialize(
configuration.GetString(prefix+"b2_account_id"), configuration.GetString(prefix+"b2_account_id"),
configuration.GetString(prefix+"b2_master_application_key"), configuration.GetString(prefix+"b2_master_application_key"),

6
weed/replication/sink/filersink/filer_sink.go

@ -30,6 +30,7 @@ type FilerSink struct {
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
address string address string
writeChunkByFiler bool writeChunkByFiler bool
isIncremental bool
} }
func init() { func init() {
@ -44,7 +45,12 @@ func (fs *FilerSink) GetSinkToDirectory() string {
return fs.dir return fs.dir
} }
func (fs *FilerSink) IsIncremental() bool {
return fs.isIncremental
}
func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error { func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
fs.isIncremental = configuration.GetBool(prefix+"is_incremental")
return fs.DoInitialize( return fs.DoInitialize(
"", "",
configuration.GetString(prefix+"grpcAddress"), configuration.GetString(prefix+"grpcAddress"),

6
weed/replication/sink/gcssink/gcs_sink.go

@ -22,6 +22,7 @@ type GcsSink struct {
bucket string bucket string
dir string dir string
filerSource *source.FilerSource filerSource *source.FilerSource
isIncremental bool
} }
func init() { func init() {
@ -36,7 +37,12 @@ func (g *GcsSink) GetSinkToDirectory() string {
return g.dir return g.dir
} }
func (g *GcsSink) IsIncremental() bool {
return g.isIncremental
}
func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error { func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error {
g.isIncremental = configuration.GetBool(prefix+"is_incremental")
return g.initialize( return g.initialize(
configuration.GetString(prefix+"google_application_credentials"), configuration.GetString(prefix+"google_application_credentials"),
configuration.GetString(prefix+"bucket"), configuration.GetString(prefix+"bucket"),

8
weed/replication/sink/localsink/local_sink.go

@ -50,6 +50,10 @@ func (localsink *LocalSink) GetSinkToDirectory() string {
return localsink.Dir return localsink.Dir
} }
func (localsink *LocalSink) IsIncremental() bool {
return true
}
func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
if localsink.isMultiPartEntry(key) { if localsink.isMultiPartEntry(key) {
return nil return nil
@ -74,13 +78,13 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa
if _, err := os.Stat(dir); os.IsNotExist(err) { if _, err := os.Stat(dir); os.IsNotExist(err) {
glog.V(4).Infof("Create Direcotry key: %s", dir) glog.V(4).Infof("Create Direcotry key: %s", dir)
if err = os.MkdirAll(dir, 0); err != nil {
if err = os.MkdirAll(dir, 0755); err != nil {
return err return err
} }
} }
writeFunc := func(data []byte) error { writeFunc := func(data []byte) error {
writeErr := ioutil.WriteFile(key, data, 0)
writeErr := ioutil.WriteFile(key, data, 0755)
return writeErr return writeErr
} }

1
weed/replication/sink/replication_sink.go

@ -14,6 +14,7 @@ type ReplicationSink interface {
UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error)
GetSinkToDirectory() string GetSinkToDirectory() string
SetSourceFiler(s *source.FilerSource) SetSourceFiler(s *source.FilerSource)
IsIncremental() bool
} }
var ( var (

29
weed/replication/sink/s3sink/s3_sink.go

@ -21,12 +21,13 @@ import (
) )
type S3Sink struct { type S3Sink struct {
conn s3iface.S3API
region string
bucket string
dir string
endpoint string
filerSource *source.FilerSource
conn s3iface.S3API
region string
bucket string
dir string
endpoint string
filerSource *source.FilerSource
isIncremental bool
} }
func init() { func init() {
@ -41,11 +42,17 @@ func (s3sink *S3Sink) GetSinkToDirectory() string {
return s3sink.dir return s3sink.dir
} }
func (s3sink *S3Sink) IsIncremental() bool {
return s3sink.isIncremental
}
func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error { func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error {
glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region"))
glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket"))
glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory"))
glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint")) glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint"))
glog.V(0).Infof("sink.s3.is_incremental: %v", configuration.GetString(prefix+"is_incremental"))
s3sink.isIncremental = configuration.GetBool(prefix + "is_incremental")
return s3sink.initialize( return s3sink.initialize(
configuration.GetString(prefix+"aws_access_key_id"), configuration.GetString(prefix+"aws_access_key_id"),
configuration.GetString(prefix+"aws_secret_access_key"), configuration.GetString(prefix+"aws_secret_access_key"),
@ -67,8 +74,9 @@ func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, buc
s3sink.endpoint = endpoint s3sink.endpoint = endpoint
config := &aws.Config{ config := &aws.Config{
Region: aws.String(s3sink.region),
Endpoint: aws.String(s3sink.endpoint),
Region: aws.String(s3sink.region),
Endpoint: aws.String(s3sink.endpoint),
S3ForcePathStyle: aws.Bool(true),
} }
if awsAccessKeyId != "" && awsSecretAccessKey != "" { if awsAccessKeyId != "" && awsSecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
@ -104,7 +112,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
uploadId, err := s3sink.createMultipartUpload(key, entry) uploadId, err := s3sink.createMultipartUpload(key, entry)
if err != nil { if err != nil {
return err
return fmt.Errorf("createMultipartUpload: %v", err)
} }
totalSize := filer.FileSize(entry) totalSize := filer.FileSize(entry)
@ -120,6 +128,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
defer wg.Done() defer wg.Done()
if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil { if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil {
err = uploadErr err = uploadErr
glog.Errorf("uploadPart: %v", uploadErr)
} else { } else {
parts[index] = part parts[index] = part
} }
@ -129,7 +138,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
if err != nil { if err != nil {
s3sink.abortMultipartUpload(key, uploadId) s3sink.abortMultipartUpload(key, uploadId)
return err
return fmt.Errorf("uploadPart: %v", err)
} }
return s3sink.completeMultipartUpload(context.Background(), key, uploadId, parts) return s3sink.completeMultipartUpload(context.Background(), key, uploadId, parts)

7
weed/replication/sink/s3sink/s3_write.go

@ -94,12 +94,13 @@ func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId
result, err := s3sink.conn.CompleteMultipartUpload(input) result, err := s3sink.conn.CompleteMultipartUpload(input)
if err == nil { if err == nil {
glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
glog.V(1).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
} else { } else {
glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err) glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
return fmt.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
} }
return err
return nil
} }
// To upload a part // To upload a part
@ -163,7 +164,7 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, er
} }
buf := make([]byte, chunk.Size) buf := make([]byte, chunk.Size)
for _, fileUrl := range fileUrls { for _, fileUrl := range fileUrls {
_, err = util.ReadUrl(fileUrl+"?readDeleted=true", nil, false, false, chunk.Offset, int(chunk.Size), buf)
_, err = util.ReadUrl(fileUrl, chunk.CipherKey, chunk.IsGzipped, false, chunk.Offset, int(chunk.Size), buf)
if err != nil { if err != nil {
glog.V(1).Infof("read from %s: %v", fileUrl, err) glog.V(1).Infof("read from %s: %v", fileUrl, err)
} else { } else {

8
weed/replication/source/filer_source.go

@ -83,8 +83,12 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err) return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
} }
for _, loc := range locations.Locations {
fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s", loc.Url, part))
if !fs.proxyByFiler {
for _, loc := range locations.Locations {
fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s?readDeleted=true", loc.Url, part))
}
} else {
fileUrls = append(fileUrls, fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, part))
} }
return return

9
weed/server/filer_server_handlers_read.go

@ -61,15 +61,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return return
} }
if len(entry.Chunks) == 0 && len(entry.Content) == 0 {
glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc()
w.WriteHeader(http.StatusNoContent)
return
}
w.Header().Set("Accept-Ranges", "bytes") w.Header().Set("Accept-Ranges", "bytes")
w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat))
// mime type // mime type
mimeType := entry.Attr.Mime mimeType := entry.Attr.Mime
@ -164,6 +156,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
} }
if offset+size <= int64(len(entry.Content)) { if offset+size <= int64(len(entry.Content)) {
_, err := writer.Write(entry.Content[offset : offset+size]) _, err := writer.Write(entry.Content[offset : offset+size])
glog.Errorf("failed to write entry content: %v", err)
return err return err
} }
return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)

7
weed/server/master_grpc_server_volume.go

@ -77,7 +77,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if !ms.Topo.HasWritableVolume(option) { if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.AvailableSpaceFor(option) <= 0 { if ms.Topo.AvailableSpaceFor(option) <= 0 {
return nil, fmt.Errorf("no free volumes left for "+option.String())
return nil, fmt.Errorf("no free volumes left for " + option.String())
} }
ms.vgLock.Lock() ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) { if !ms.Topo.HasWritableVolume(option) {
@ -122,11 +122,8 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType)) volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
stats := volumeLayout.Stats() stats := volumeLayout.Stats()
totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
resp := &master_pb.StatisticsResponse{ resp := &master_pb.StatisticsResponse{
TotalSize: uint64(totalSize),
TotalSize: stats.TotalSize,
UsedSize: stats.UsedSize, UsedSize: stats.UsedSize,
FileCount: stats.FileCount, FileCount: stats.FileCount,
} }

2
weed/server/volume_grpc_copy.go

@ -61,7 +61,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
} }
location := vs.store.FindFreeLocation(types.ToDiskType(diskType)) location := vs.store.FindFreeLocation(types.ToDiskType(diskType))
if location == nil { if location == nil {
return fmt.Errorf("no space left")
return fmt.Errorf("no space left for disk type %s", types.ToDiskType(diskType).ReadableString())
} }
dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId)) dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))

11
weed/shell/command_ec_common.go

@ -201,17 +201,14 @@ type EcRack struct {
func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations // list all possible locations
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil { if err != nil {
return nil, 0, err
return
} }
// find out all volume servers with one slot left. // find out all volume servers with one slot left.
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(resp.TopologyInfo, selectedDataCenter)
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
sortEcNodesByFreeslotsDecending(ecNodes) sortEcNodesByFreeslotsDecending(ecNodes)

6
weed/shell/command_ec_decode.go

@ -51,7 +51,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
vid := needle.VolumeId(*volumeId) vid := needle.VolumeId(*volumeId)
// collect topology information // collect topology information
topologyInfo, err := collectTopologyInfo(commandEnv)
topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil { if err != nil {
return err return err
} }
@ -208,7 +208,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasur
} }
func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) {
func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
var resp *master_pb.VolumeListResponse var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
@ -219,7 +219,7 @@ func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyIn
return return
} }
return resp.TopologyInfo, nil
return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
} }

11
weed/shell/command_ec_encode.go

@ -265,11 +265,8 @@ func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
// collect topology information
topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv)
if err != nil { if err != nil {
return return
} }
@ -280,11 +277,11 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds) fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds)
vidMap := make(map[uint32]bool) vidMap := make(map[uint32]bool)
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, diskInfo := range dn.DiskInfos { for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos { for _, v := range diskInfo.VolumeInfos {
if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 {
if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
vidMap[v.Id] = true vidMap[v.Id] = true
} }
} }

2
weed/shell/command_fs_configure.go

@ -52,7 +52,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
collection := fsConfigureCommand.String("collection", "", "assign writes to this collection") collection := fsConfigureCommand.String("collection", "", "assign writes to this collection")
replication := fsConfigureCommand.String("replication", "", "assign writes with this replication") replication := fsConfigureCommand.String("replication", "", "assign writes with this replication")
ttl := fsConfigureCommand.String("ttl", "", "assign writes with this ttl") ttl := fsConfigureCommand.String("ttl", "", "assign writes with this ttl")
diskType := fsConfigureCommand.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
diskType := fsConfigureCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
fsync := fsConfigureCommand.Bool("fsync", false, "fsync for the writes") fsync := fsConfigureCommand.Bool("fsync", false, "fsync for the writes")
volumeGrowthCount := fsConfigureCommand.Int("volumeGrowthCount", 0, "the number of physical volumes to add if no writable volumes") volumeGrowthCount := fsConfigureCommand.Int("volumeGrowthCount", 0, "the number of physical volumes to add if no writable volumes")
isDelete := fsConfigureCommand.Bool("delete", false, "delete the configuration by locationPrefix") isDelete := fsConfigureCommand.Bool("delete", false, "delete the configuration by locationPrefix")

20
weed/shell/command_volume_balance.go

@ -1,7 +1,6 @@
package shell package shell
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/super_block"
@ -75,18 +74,15 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return nil return nil
} }
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
// collect topology information
topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv)
if err != nil { if err != nil {
return err return err
} }
volumeServers := collectVolumeServersByDc(resp.TopologyInfo, *dc)
volumeReplicas, _ := collectVolumeReplicaLocations(resp)
diskTypes := collectVolumeDiskTypes(resp.TopologyInfo)
volumeServers := collectVolumeServersByDc(topologyInfo, *dc)
volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
diskTypes := collectVolumeDiskTypes(topologyInfo)
if *collection == "EACH_COLLECTION" { if *collection == "EACH_COLLECTION" {
collections, err := ListCollectionNames(commandEnv, true, false) collections, err := ListCollectionNames(commandEnv, true, false)
@ -94,16 +90,16 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return err return err
} }
for _, c := range collections { for _, c := range collections {
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
return err return err
} }
} }
} else if *collection == "ALL_COLLECTIONS" { } else if *collection == "ALL_COLLECTIONS" {
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
return err return err
} }
} else { } else {
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
return err return err
} }
} }

9
weed/shell/command_volume_configure_replication.go

@ -56,11 +56,8 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
} }
replicaPlacementInt32 := uint32(replicaPlacement.Byte()) replicaPlacementInt32 := uint32(replicaPlacement.Byte())
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil { if err != nil {
return err return err
} }
@ -69,7 +66,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
// find all data nodes with volumes that needs replication change // find all data nodes with volumes that needs replication change
var allLocations []location var allLocations []location
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
loc := newLocation(dc, string(rack), dn) loc := newLocation(dc, string(rack), dn)
for _, diskInfo := range dn.DiskInfos { for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos { for _, v := range diskInfo.VolumeInfos {

21
weed/shell/command_volume_fix_replication.go

@ -64,18 +64,15 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
takeAction := !*skipChange takeAction := !*skipChange
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil { if err != nil {
return err return err
} }
// find all volumes that needs replication // find all volumes that needs replication
// collect all data nodes // collect all data nodes
volumeReplicas, allLocations := collectVolumeReplicaLocations(resp)
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
if len(allLocations) == 0 { if len(allLocations) == 0 {
return fmt.Errorf("no data nodes at all") return fmt.Errorf("no data nodes at all")
@ -107,10 +104,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
} }
func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint32][]*VolumeReplica, []location) {
func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
volumeReplicas := make(map[uint32][]*VolumeReplica) volumeReplicas := make(map[uint32][]*VolumeReplica)
var allLocations []location var allLocations []location
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
loc := newLocation(dc, string(rack), dn) loc := newLocation(dc, string(rack), dn)
for _, diskInfo := range dn.DiskInfos { for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos { for _, v := range diskInfo.VolumeInfos {
@ -165,10 +162,10 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
foundNewLocation := false foundNewLocation := false
hasSkippedCollection := false hasSkippedCollection := false
keepDataNodesSorted(allLocations, replica.info.DiskType)
keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
for _, dst := range allLocations { for _, dst := range allLocations {
// check whether data nodes satisfy the constraints // check whether data nodes satisfy the constraints
fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
// check collection name pattern // check collection name pattern
if *c.collectionPattern != "" { if *c.collectionPattern != "" {
@ -219,8 +216,8 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
return nil return nil
} }
func keepDataNodesSorted(dataNodes []location, diskType string) {
fn := capacityByFreeVolumeCount(types.ToDiskType(diskType))
func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
fn := capacityByFreeVolumeCount(diskType)
sort.Slice(dataNodes, func(i, j int) bool { sort.Slice(dataNodes, func(i, j int) bool {
return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode) return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode)
}) })

13
weed/shell/command_volume_fsck.go

@ -73,7 +73,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
defer os.RemoveAll(tempFolder) defer os.RemoveAll(tempFolder)
// collect all volume id locations // collect all volume id locations
volumeIdToVInfo, err := c.collectVolumeIds(*verbose, writer)
volumeIdToVInfo, err := c.collectVolumeIds(commandEnv, *verbose, writer)
if err != nil { if err != nil {
return fmt.Errorf("failed to collect all volume locations: %v", err) return fmt.Errorf("failed to collect all volume locations: %v", err)
} }
@ -268,23 +268,20 @@ type VInfo struct {
isEcVolume bool isEcVolume bool
} }
func (c *commandVolumeFsck) collectVolumeIds(verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) {
func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) {
if verbose { if verbose {
fmt.Fprintf(writer, "collecting volume id and locations from master ...\n") fmt.Fprintf(writer, "collecting volume id and locations from master ...\n")
} }
volumeIdToServer = make(map[uint32]VInfo) volumeIdToServer = make(map[uint32]VInfo)
var resp *master_pb.VolumeListResponse
err = c.env.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil { if err != nil {
return return
} }
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
for _, diskInfo := range t.DiskInfos { for _, diskInfo := range t.DiskInfos {
for _, vi := range diskInfo.VolumeInfos { for _, vi := range diskInfo.VolumeInfos {
volumeIdToServer[vi.Id] = VInfo{ volumeIdToServer[vi.Id] = VInfo{

10
weed/shell/command_volume_list.go

@ -2,7 +2,6 @@ package shell
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
@ -32,16 +31,13 @@ func (c *commandVolumeList) Help() string {
func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
// collect topology information
topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv)
if err != nil { if err != nil {
return err return err
} }
writeTopologyInfo(writer, resp.TopologyInfo, resp.VolumeSizeLimitMb)
writeTopologyInfo(writer, topologyInfo, volumeSizeLimitMb)
return nil return nil
} }

6
weed/shell/command_volume_move.go

@ -29,7 +29,7 @@ func (c *commandVolumeMove) Help() string {
return `move a live volume from one volume server to another volume server return `move a live volume from one volume server to another volume server
volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id> volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id>
volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id> -disk [hdd|ssd]
volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id> -disk [hdd|ssd|<tag>]
This command move a live volume from one volume server to another volume server. Here are the steps: This command move a live volume from one volume server to another volume server. Here are the steps:
@ -41,7 +41,7 @@ func (c *commandVolumeMove) Help() string {
Now the master will mark this volume id as writable. Now the master will mark this volume id as writable.
5. This command asks the source volume server to delete the source volume 5. This command asks the source volume server to delete the source volume
The option "-disk [hdd|ssd]" can be used to change the volume disk type.
The option "-disk [hdd|ssd|<tag>]" can be used to change the volume disk type.
` `
} }
@ -56,7 +56,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
volumeIdInt := volMoveCommand.Int("volumeId", 0, "the volume id") volumeIdInt := volMoveCommand.Int("volumeId", 0, "the volume id")
sourceNodeStr := volMoveCommand.String("source", "", "the source volume server <host>:<port>") sourceNodeStr := volMoveCommand.String("source", "", "the source volume server <host>:<port>")
targetNodeStr := volMoveCommand.String("target", "", "the target volume server <host>:<port>") targetNodeStr := volMoveCommand.String("target", "", "the target volume server <host>:<port>")
diskTypeStr := volMoveCommand.String("disk", "", "[hdd|ssd] hard drive or solid state drive")
diskTypeStr := volMoveCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
if err = volMoveCommand.Parse(args); err != nil { if err = volMoveCommand.Parse(args); err != nil {
return nil return nil
} }

22
weed/shell/command_volume_server_evacuate.go

@ -1,7 +1,6 @@
package shell package shell
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@ -71,36 +70,33 @@ func volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMo
// 3. move to any other volume server as long as it satisfy the replication requirements // 3. move to any other volume server as long as it satisfy the replication requirements
// list all the volumes // list all the volumes
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil { if err != nil {
return err return err
} }
if err := evacuateNormalVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
if err := evacuateNormalVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err return err
} }
if err := evacuateEcVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
if err := evacuateEcVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err return err
} }
return nil return nil
} }
func evacuateNormalVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
func evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this volume server // find this volume server
volumeServers := collectVolumeServersByDc(resp.TopologyInfo, "")
volumeServers := collectVolumeServersByDc(topologyInfo, "")
thisNode, otherNodes := nodesOtherThan(volumeServers, volumeServer) thisNode, otherNodes := nodesOtherThan(volumeServers, volumeServer)
if thisNode == nil { if thisNode == nil {
return fmt.Errorf("%s is not found in this cluster", volumeServer) return fmt.Errorf("%s is not found in this cluster", volumeServer)
} }
// move away normal volumes // move away normal volumes
volumeReplicas, _ := collectVolumeReplicaLocations(resp)
volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
for _, diskInfo := range thisNode.info.DiskInfos { for _, diskInfo := range thisNode.info.DiskInfos {
for _, vol := range diskInfo.VolumeInfos { for _, vol := range diskInfo.VolumeInfos {
hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
@ -120,9 +116,9 @@ func evacuateNormalVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListRes
return nil return nil
} }
func evacuateEcVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
func evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this ec volume server // find this ec volume server
ecNodes, _ := collectEcVolumeServersByDc(resp.TopologyInfo, "")
ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "")
thisNode, otherNodes := ecNodesOtherThan(ecNodes, volumeServer) thisNode, otherNodes := ecNodesOtherThan(ecNodes, volumeServer)
if thisNode == nil { if thisNode == nil {
return fmt.Errorf("%s is not found in this cluster\n", volumeServer) return fmt.Errorf("%s is not found in this cluster\n", volumeServer)

2
weed/shell/command_volume_tier_download.go

@ -56,7 +56,7 @@ func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, wr
vid := needle.VolumeId(*volumeId) vid := needle.VolumeId(*volumeId)
// collect topology information // collect topology information
topologyInfo, err := collectTopologyInfo(commandEnv)
topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil { if err != nil {
return err return err
} }

121
weed/shell/command_volume_tier_move.go

@ -1,11 +1,11 @@
package shell package shell
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"io" "io"
"time" "time"
@ -24,10 +24,12 @@ func (c *commandVolumeTierMove) Name() string {
} }
func (c *commandVolumeTierMove) Help() string { func (c *commandVolumeTierMove) Help() string {
return `<WIP> change a volume from one disk type to another
return `change a volume from one disk type to another
volume.tier.move -source=hdd -target=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h]
volume.tier.move -target=hdd [-collection=""] -volumeId=<volume_id>
volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h]
Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped.
So "volume.fix.replication" and "volume.balance" should be followed.
` `
} }
@ -39,60 +41,127 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
} }
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := tierCommand.Int("volumeId", 0, "the volume id")
collection := tierCommand.String("collection", "", "the collection name") collection := tierCommand.String("collection", "", "the collection name")
fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
source := tierCommand.String("fromDiskType", "", "the source disk type") source := tierCommand.String("fromDiskType", "", "the source disk type")
target := tierCommand.String("toDiskType", "", "the target disk type") target := tierCommand.String("toDiskType", "", "the target disk type")
applyChange := tierCommand.Bool("force", false, "actually apply the changes")
if err = tierCommand.Parse(args); err != nil { if err = tierCommand.Parse(args); err != nil {
return nil return nil
} }
if *source == *target {
return fmt.Errorf("source tier %s is the same as target tier %s", *source, *target)
}
fromDiskType := types.ToDiskType(*source)
toDiskType := types.ToDiskType(*target)
vid := needle.VolumeId(*volumeId)
if fromDiskType == toDiskType {
return fmt.Errorf("source tier %s is the same as target tier %s", fromDiskType, toDiskType)
}
// volumeId is provided
if vid != 0 {
// return doVolumeTierMove(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile)
// collect topology information
topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv)
if err != nil {
return err
} }
// apply to all volumes in the collection
// reusing collectVolumeIdsForEcEncode for now
volumeIds, err := collectVolumeIdsForTierChange(commandEnv, *source, *collection, *fullPercentage, *quietPeriod)
// collect all volumes that should change
volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collection, *fullPercentage, *quietPeriod)
if err != nil { if err != nil {
return err return err
} }
fmt.Printf("tier move volumes: %v\n", volumeIds) fmt.Printf("tier move volumes: %v\n", volumeIds)
_, allLocations := collectVolumeReplicaLocations(topologyInfo)
for _, vid := range volumeIds {
if err = doVolumeTierMove(commandEnv, writer, *collection, vid, toDiskType, allLocations, *applyChange); err != nil {
fmt.Printf("tier move volume %d: %v\n", vid, err)
}
}
return nil return nil
} }
func collectVolumeIdsForTierChange(commandEnv *CommandEnv, sourceTier string, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
func isOneOf(server string, locations []wdclient.Location) bool {
for _, loc := range locations {
if server == loc.Url {
return true
}
}
return false
}
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
return
func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
return fmt.Errorf("volume %d not found", vid)
} }
// find one server with the most empty volume slots with target disk type
hasFoundTarget := false
keepDataNodesSorted(allLocations, toDiskType)
fn := capacityByFreeVolumeCount(toDiskType)
for _, dst := range allLocations {
if fn(dst.dataNode) > 0 && !hasFoundTarget {
// ask the volume server to replicate the volume
if isOneOf(dst.dataNode.Id, locations) {
continue
}
sourceVolumeServer := ""
for _, loc := range locations {
if loc.Url != dst.dataNode.Id {
sourceVolumeServer = loc.Url
}
}
if sourceVolumeServer == "" {
continue
}
fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", vid, sourceVolumeServer, dst.dataNode.Id, toDiskType.ReadableString())
hasFoundTarget = true
if !applyChanges {
break
}
// mark all replicas as read only
if err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString()); err != nil {
return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
}
// remove the remaining replicas
for _, loc := range locations {
if loc.Url != sourceVolumeServer {
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.Url); err != nil {
fmt.Fprintf(writer, "failed to delete volume %d on %s\n", vid, loc.Url)
}
}
}
}
}
if !hasFoundTarget {
fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.ReadableString(), vid)
}
return nil
}
func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
quietSeconds := int64(quietPeriod / time.Second) quietSeconds := int64(quietPeriod / time.Second)
nowUnixSeconds := time.Now().Unix() nowUnixSeconds := time.Now().Unix()
fmt.Printf("collect %s volumes quiet for: %d seconds\n", sourceTier, quietSeconds) fmt.Printf("collect %s volumes quiet for: %d seconds\n", sourceTier, quietSeconds)
vidMap := make(map[uint32]bool) vidMap := make(map[uint32]bool)
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, diskInfo := range dn.DiskInfos { for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos { for _, v := range diskInfo.VolumeInfos {
if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == types.ToDiskType(sourceTier) {
if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 {
if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == sourceTier {
if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
vidMap[v.Id] = true vidMap[v.Id] = true
} }
} }

7
weed/storage/types/volume_disk_type.go

@ -31,3 +31,10 @@ func (diskType DiskType) String() string {
} }
return string(diskType) return string(diskType)
} }
func (diskType DiskType) ReadableString() string {
if diskType == "" {
return "hdd"
}
return string(diskType)
}

25
weed/topology/topology_vacuum.go

@ -14,7 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
) )
func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid needle.VolumeId,
locationlist *VolumeLocationList, garbageThreshold float64) (*VolumeLocationList, bool) { locationlist *VolumeLocationList, garbageThreshold float64) (*VolumeLocationList, bool) {
ch := make(chan int, locationlist.Length()) ch := make(chan int, locationlist.Length())
errCount := int32(0) errCount := int32(0)
@ -43,7 +43,7 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi
} }
vacuumLocationList := NewVolumeLocationList() vacuumLocationList := NewVolumeLocationList()
waitTimeout := time.NewTimer(30 * time.Minute)
waitTimeout := time.NewTimer(time.Minute * time.Duration(t.volumeSizeLimit/1024/1024/1000+1))
defer waitTimeout.Stop() defer waitTimeout.Stop()
for range locationlist.list { for range locationlist.list {
@ -58,7 +58,7 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi
} }
return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0 return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0
} }
func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
locationlist *VolumeLocationList, preallocate int64) bool { locationlist *VolumeLocationList, preallocate int64) bool {
vl.accessLock.Lock() vl.accessLock.Lock()
vl.removeFromWritable(vid) vl.removeFromWritable(vid)
@ -86,7 +86,7 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout,
} }
isVacuumSuccess := true isVacuumSuccess := true
waitTimeout := time.NewTimer(30 * time.Minute)
waitTimeout := time.NewTimer(3 * time.Minute * time.Duration(t.volumeSizeLimit/1024/1024/1000+1))
defer waitTimeout.Stop() defer waitTimeout.Stop()
for range locationlist.list { for range locationlist.list {
@ -99,7 +99,7 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout,
} }
return isVacuumSuccess return isVacuumSuccess
} }
func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool {
func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true isCommitSuccess := true
isReadOnly := false isReadOnly := false
for _, dn := range locationlist.list { for _, dn := range locationlist.list {
@ -127,7 +127,7 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v
} }
return isCommitSuccess return isCommitSuccess
} }
func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list { for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
@ -161,13 +161,13 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
for _, vl := range c.storageType2VolumeLayout.Items() { for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil { if vl != nil {
volumeLayout := vl.(*VolumeLayout) volumeLayout := vl.(*VolumeLayout)
vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
} }
} }
} }
} }
func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
volumeLayout.accessLock.RLock() volumeLayout.accessLock.RLock()
tmpMap := make(map[needle.VolumeId]*VolumeLocationList) tmpMap := make(map[needle.VolumeId]*VolumeLocationList)
@ -187,12 +187,11 @@ func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeL
} }
glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid) glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
if vacuumLocationList, needVacuum := batchVacuumVolumeCheck(
grpcDialOption, volumeLayout, vid, locationList, garbageThreshold); needVacuum {
if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) {
batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList)
if vacuumLocationList, needVacuum := t.batchVacuumVolumeCheck(grpcDialOption, vid, locationList, garbageThreshold); needVacuum {
if t.batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) {
t.batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList)
} else { } else {
batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList)
t.batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList)
} }
} }
} }

2
weed/topology/volume_layout.go

@ -432,7 +432,7 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats {
if vl.readonlyVolumes.IsTrue(vid) { if vl.readonlyVolumes.IsTrue(vid) {
ret.TotalSize += size ret.TotalSize += size
} else { } else {
ret.TotalSize += vl.volumeSizeLimit
ret.TotalSize += vl.volumeSizeLimit * uint64(vll.Length())
} }
} }

2
weed/topology/volume_location_list.go

@ -82,7 +82,7 @@ func (dnll *VolumeLocationList) Stats(vid needle.VolumeId, freshThreshHold int64
if dnl.LastSeen < freshThreshHold { if dnl.LastSeen < freshThreshHold {
vinfo, err := dnl.GetVolumesById(vid) vinfo, err := dnl.GetVolumesById(vid)
if err == nil { if err == nil {
return vinfo.Size - vinfo.DeletedByteCount, vinfo.FileCount - vinfo.DeleteCount
return (vinfo.Size - vinfo.DeletedByteCount) * uint64(len(dnll.list)), vinfo.FileCount - vinfo.DeleteCount
} }
} }
} }

2
weed/util/constants.go

@ -5,7 +5,7 @@ import (
) )
var ( var (
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 26)
VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 28)
COMMIT = "" COMMIT = ""
) )

4
weed/util/file_util.go

@ -69,6 +69,10 @@ func CheckFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti
func ResolvePath(path string) string { func ResolvePath(path string) string {
if !strings.Contains(path, "~") {
return path
}
usr, _ := user.Current() usr, _ := user.Current()
dir := usr.HomeDir dir := usr.HomeDir

4
weed/util/fla9/fla9.go

@ -886,7 +886,7 @@ func (f *FlagSet) parseOne() (bool, error) {
// The return value will be ErrHelp if -help or -h were set but not defined. // The return value will be ErrHelp if -help or -h were set but not defined.
func (f *FlagSet) Parse(arguments []string) error { func (f *FlagSet) Parse(arguments []string) error {
if _, ok := f.formal[DefaultConfigFlagName]; !ok { if _, ok := f.formal[DefaultConfigFlagName]; !ok {
f.String(DefaultConfigFlagName, "", "config file")
f.String(DefaultConfigFlagName, "", "file with command line options with each line in optionName=optionValue format")
} }
f.parsed = true f.parsed = true
@ -1078,7 +1078,7 @@ func NewFlagSetWithEnvPrefix(name string, prefix string, errorHandling ErrorHand
// DefaultConfigFlagName defines the flag name of the optional config file // DefaultConfigFlagName defines the flag name of the optional config file
// path. Used to lookup and parse the config file when a default is set and // path. Used to lookup and parse the config file when a default is set and
// available on disk. // available on disk.
var DefaultConfigFlagName = "config"
var DefaultConfigFlagName = "options"
// ParseFile parses flags from the file in path. // ParseFile parses flags from the file in path.
// Same format as commandline arguments, newlines and lines beginning with a // Same format as commandline arguments, newlines and lines beginning with a

Loading…
Cancel
Save