committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
130 changed files with 4431 additions and 1430 deletions
-
14README.md
-
35docker/Dockerfile.go_build_large
-
11go.mod
-
26go.sum
-
2k8s/seaweedfs/Chart.yaml
-
22k8s/seaweedfs/templates/filer-statefulset.yaml
-
30k8s/seaweedfs/templates/master-statefulset.yaml
-
2k8s/seaweedfs/templates/s3-deployment.yaml
-
2k8s/seaweedfs/templates/seaweefs-grafana-dashboard.yaml
-
23k8s/seaweedfs/templates/volume-statefulset.yaml
-
58k8s/seaweedfs/values.yaml
-
2other/java/client/pom.xml
-
2other/java/client/pom.xml.deploy
-
2other/java/client/pom_debug.xml
-
23other/java/client/src/main/proto/filer.proto
-
2other/java/hdfs2/dependency-reduced-pom.xml
-
2other/java/hdfs2/pom.xml
-
29other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
-
2other/java/hdfs3/dependency-reduced-pom.xml
-
2other/java/hdfs3/pom.xml
-
29other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
-
175test/s3/multipart/aws_upload.go
-
4unmaintained/diff_volume_servers/diff_volume_servers.go
-
8weed/Makefile
-
1weed/command/command.go
-
14weed/command/download.go
-
20weed/command/export.go
-
2weed/command/filer.go
-
2weed/command/filer_copy.go
-
337weed/command/filer_sync.go
-
4weed/command/mount.go
-
9weed/command/mount_std.go
-
16weed/command/scaffold.go
-
2weed/command/server.go
-
66weed/command/volume.go
-
7weed/command/watch.go
-
22weed/filer/abstract_sql/abstract_sql_store.go
-
8weed/filer/cassandra/cassandra_store.go
-
51weed/filer/cassandra/cassandra_store_kv.go
-
338weed/filer/elastic/v7/elastic_store.go
-
65weed/filer/elastic/v7/elastic_store_kv.go
-
12weed/filer/etcd/etcd_store.go
-
2weed/filer/etcd/etcd_store_kv.go
-
5weed/filer/filechunks.go
-
32weed/filer/filer.go
-
8weed/filer/filer_delete_entry.go
-
11weed/filer/filer_notify.go
-
5weed/filer/filerstore.go
-
8weed/filer/leveldb/leveldb_store.go
-
43weed/filer/leveldb2/leveldb2_local_store.go
-
9weed/filer/leveldb2/leveldb2_store.go
-
132weed/filer/meta_aggregator.go
-
10weed/filer/mongodb/mongodb_store.go
-
4weed/filer/mongodb/mongodb_store_kv.go
-
4weed/filer/mysql/mysql_store.go
-
4weed/filer/postgres/postgres_store.go
-
6weed/filer/redis/universal_redis_store.go
-
18weed/filer/redis2/universal_redis_store.go
-
14weed/filesys/dir.go
-
4weed/filesys/dir_link.go
-
4weed/filesys/dir_rename.go
-
3weed/filesys/file.go
-
3weed/filesys/filehandle.go
-
101weed/filesys/meta_cache/id_mapper.go
-
12weed/filesys/meta_cache/meta_cache.go
-
8weed/filesys/meta_cache/meta_cache_subscribe.go
-
11weed/filesys/wfs.go
-
2weed/messaging/broker/broker_grpc_server.go
-
22weed/messaging/broker/broker_grpc_server_subscribe.go
-
4weed/messaging/broker/broker_server.go
-
4weed/operation/tail_volume.go
-
8weed/operation/upload_content.go
-
23weed/pb/filer.proto
-
706weed/pb/filer_pb/filer.pb.go
-
8weed/pb/filer_pb/filer_client.go
-
9weed/pb/filer_pb/filer_pb_helper.go
-
1weed/pb/master.proto
-
300weed/pb/master_pb/master.pb.go
-
7weed/pb/volume_server.proto
-
1068weed/pb/volume_server_pb/volume_server.pb.go
-
26weed/replication/replicator.go
-
6weed/replication/sink/azuresink/azure_sink.go
-
6weed/replication/sink/b2sink/b2_sink.go
-
9weed/replication/sink/filersink/fetch_write.go
-
42weed/replication/sink/filersink/filer_sink.go
-
6weed/replication/sink/gcssink/gcs_sink.go
-
6weed/replication/sink/replication_sink.go
-
6weed/replication/sink/s3sink/s3_sink.go
-
10weed/replication/source/filer_source.go
-
75weed/s3api/filer_multipart.go
-
23weed/s3api/filer_multipart_test.go
-
7weed/s3api/filer_util.go
-
2weed/s3api/s3api_bucket_handlers.go
-
6weed/s3api/s3api_object_copy_handlers.go
-
4weed/s3api/s3api_objects_list_handlers.go
-
3weed/server/filer_grpc_server.go
-
42weed/server/filer_grpc_server_kv.go
-
74weed/server/filer_grpc_server_sub_meta.go
-
6weed/server/filer_server.go
-
56weed/server/filer_server_handlers_write_autochunk.go
@ -0,0 +1,35 @@ |
|||||
|
FROM frolvlad/alpine-glibc as builder |
||||
|
RUN apk add git go g++ |
||||
|
RUN mkdir -p /go/src/github.com/chrislusf/ |
||||
|
RUN git clone https://github.com/chrislusf/seaweedfs /go/src/github.com/chrislusf/seaweedfs |
||||
|
RUN cd /go/src/github.com/chrislusf/seaweedfs/weed && go install -tags 5BytesOffset |
||||
|
|
||||
|
FROM alpine AS final |
||||
|
LABEL author="Chris Lu" |
||||
|
COPY --from=builder /root/go/bin/weed /usr/bin/ |
||||
|
RUN mkdir -p /etc/seaweedfs |
||||
|
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/filer.toml /etc/seaweedfs/filer.toml |
||||
|
COPY --from=builder /go/src/github.com/chrislusf/seaweedfs/docker/entrypoint.sh /entrypoint.sh |
||||
|
|
||||
|
# volume server gprc port |
||||
|
EXPOSE 18080 |
||||
|
# volume server http port |
||||
|
EXPOSE 8080 |
||||
|
# filer server gprc port |
||||
|
EXPOSE 18888 |
||||
|
# filer server http port |
||||
|
EXPOSE 8888 |
||||
|
# master server shared gprc port |
||||
|
EXPOSE 19333 |
||||
|
# master server shared http port |
||||
|
EXPOSE 9333 |
||||
|
# s3 server http port |
||||
|
EXPOSE 8333 |
||||
|
|
||||
|
RUN mkdir -p /data/filerldb2 |
||||
|
|
||||
|
VOLUME /data |
||||
|
|
||||
|
RUN chmod +x /entrypoint.sh |
||||
|
|
||||
|
ENTRYPOINT ["/entrypoint.sh"] |
@ -1,4 +1,4 @@ |
|||||
apiVersion: v1 |
apiVersion: v1 |
||||
description: SeaweedFS |
description: SeaweedFS |
||||
name: seaweedfs |
name: seaweedfs |
||||
version: 1.92 |
|
||||
|
version: 1.99 |
@ -0,0 +1,175 @@ |
|||||
|
package main |
||||
|
|
||||
|
// copied from https://github.com/apoorvam/aws-s3-multipart-upload
|
||||
|
|
||||
|
import ( |
||||
|
"bytes" |
||||
|
"flag" |
||||
|
"fmt" |
||||
|
"net/http" |
||||
|
"os" |
||||
|
|
||||
|
"github.com/aws/aws-sdk-go/aws" |
||||
|
"github.com/aws/aws-sdk-go/aws/awserr" |
||||
|
"github.com/aws/aws-sdk-go/aws/credentials" |
||||
|
"github.com/aws/aws-sdk-go/aws/session" |
||||
|
"github.com/aws/aws-sdk-go/service/s3" |
||||
|
) |
||||
|
|
||||
|
const ( |
||||
|
maxPartSize = int64(5 * 1024 * 1024) |
||||
|
maxRetries = 3 |
||||
|
awsAccessKeyID = "Your access key" |
||||
|
awsSecretAccessKey = "Your secret key" |
||||
|
awsBucketRegion = "S3 bucket region" |
||||
|
awsBucketName = "newBucket" |
||||
|
) |
||||
|
|
||||
|
var ( |
||||
|
filename = flag.String("f", "", "the file name") |
||||
|
) |
||||
|
|
||||
|
func main() { |
||||
|
flag.Parse() |
||||
|
|
||||
|
creds := credentials.NewStaticCredentials(awsAccessKeyID, awsSecretAccessKey, "") |
||||
|
_, err := creds.Get() |
||||
|
if err != nil { |
||||
|
fmt.Printf("bad credentials: %s", err) |
||||
|
} |
||||
|
cfg := aws.NewConfig().WithRegion(awsBucketRegion).WithCredentials(creds).WithDisableSSL(true).WithEndpoint("localhost:8333") |
||||
|
svc := s3.New(session.New(), cfg) |
||||
|
|
||||
|
file, err := os.Open(*filename) |
||||
|
if err != nil { |
||||
|
fmt.Printf("err opening file: %s", err) |
||||
|
return |
||||
|
} |
||||
|
defer file.Close() |
||||
|
fileInfo, _ := file.Stat() |
||||
|
size := fileInfo.Size() |
||||
|
buffer := make([]byte, size) |
||||
|
fileType := http.DetectContentType(buffer) |
||||
|
file.Read(buffer) |
||||
|
|
||||
|
path := "/media/" + file.Name() |
||||
|
input := &s3.CreateMultipartUploadInput{ |
||||
|
Bucket: aws.String(awsBucketName), |
||||
|
Key: aws.String(path), |
||||
|
ContentType: aws.String(fileType), |
||||
|
} |
||||
|
|
||||
|
resp, err := svc.CreateMultipartUpload(input) |
||||
|
if err != nil { |
||||
|
fmt.Println(err.Error()) |
||||
|
return |
||||
|
} |
||||
|
fmt.Println("Created multipart upload request") |
||||
|
|
||||
|
var curr, partLength int64 |
||||
|
var remaining = size |
||||
|
var completedParts []*s3.CompletedPart |
||||
|
partNumber := 1 |
||||
|
for curr = 0; remaining != 0; curr += partLength { |
||||
|
if remaining < maxPartSize { |
||||
|
partLength = remaining |
||||
|
} else { |
||||
|
partLength = maxPartSize |
||||
|
} |
||||
|
completedPart, err := uploadPart(svc, resp, buffer[curr:curr+partLength], partNumber) |
||||
|
if err != nil { |
||||
|
fmt.Println(err.Error()) |
||||
|
err := abortMultipartUpload(svc, resp) |
||||
|
if err != nil { |
||||
|
fmt.Println(err.Error()) |
||||
|
} |
||||
|
return |
||||
|
} |
||||
|
remaining -= partLength |
||||
|
partNumber++ |
||||
|
completedParts = append(completedParts, completedPart) |
||||
|
} |
||||
|
|
||||
|
// list parts
|
||||
|
parts, err := svc.ListParts(&s3.ListPartsInput{ |
||||
|
Bucket: input.Bucket, |
||||
|
Key: input.Key, |
||||
|
MaxParts: nil, |
||||
|
PartNumberMarker: nil, |
||||
|
RequestPayer: nil, |
||||
|
UploadId: resp.UploadId, |
||||
|
}) |
||||
|
if err != nil { |
||||
|
fmt.Println(err.Error()) |
||||
|
return |
||||
|
} |
||||
|
fmt.Printf("list parts: %d\n", len(parts.Parts)) |
||||
|
for i, part := range parts.Parts { |
||||
|
fmt.Printf("part %d: %v\n", i, part) |
||||
|
} |
||||
|
|
||||
|
|
||||
|
completeResponse, err := completeMultipartUpload(svc, resp, completedParts) |
||||
|
if err != nil { |
||||
|
fmt.Println(err.Error()) |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
fmt.Printf("Successfully uploaded file: %s\n", completeResponse.String()) |
||||
|
} |
||||
|
|
||||
|
func completeMultipartUpload(svc *s3.S3, resp *s3.CreateMultipartUploadOutput, completedParts []*s3.CompletedPart) (*s3.CompleteMultipartUploadOutput, error) { |
||||
|
completeInput := &s3.CompleteMultipartUploadInput{ |
||||
|
Bucket: resp.Bucket, |
||||
|
Key: resp.Key, |
||||
|
UploadId: resp.UploadId, |
||||
|
MultipartUpload: &s3.CompletedMultipartUpload{ |
||||
|
Parts: completedParts, |
||||
|
}, |
||||
|
} |
||||
|
return svc.CompleteMultipartUpload(completeInput) |
||||
|
} |
||||
|
|
||||
|
func uploadPart(svc *s3.S3, resp *s3.CreateMultipartUploadOutput, fileBytes []byte, partNumber int) (*s3.CompletedPart, error) { |
||||
|
tryNum := 1 |
||||
|
partInput := &s3.UploadPartInput{ |
||||
|
Body: bytes.NewReader(fileBytes), |
||||
|
Bucket: resp.Bucket, |
||||
|
Key: resp.Key, |
||||
|
PartNumber: aws.Int64(int64(partNumber)), |
||||
|
UploadId: resp.UploadId, |
||||
|
ContentLength: aws.Int64(int64(len(fileBytes))), |
||||
|
} |
||||
|
|
||||
|
for tryNum <= maxRetries { |
||||
|
uploadResult, err := svc.UploadPart(partInput) |
||||
|
if err != nil { |
||||
|
if tryNum == maxRetries { |
||||
|
if aerr, ok := err.(awserr.Error); ok { |
||||
|
return nil, aerr |
||||
|
} |
||||
|
return nil, err |
||||
|
} |
||||
|
fmt.Printf("Retrying to upload part #%v\n", partNumber) |
||||
|
tryNum++ |
||||
|
} else { |
||||
|
fmt.Printf("Uploaded part #%v\n", partNumber) |
||||
|
return &s3.CompletedPart{ |
||||
|
ETag: uploadResult.ETag, |
||||
|
PartNumber: aws.Int64(int64(partNumber)), |
||||
|
}, nil |
||||
|
} |
||||
|
} |
||||
|
return nil, nil |
||||
|
} |
||||
|
|
||||
|
func abortMultipartUpload(svc *s3.S3, resp *s3.CreateMultipartUploadOutput) error { |
||||
|
fmt.Println("Aborting multipart upload for UploadId#" + *resp.UploadId) |
||||
|
abortInput := &s3.AbortMultipartUploadInput{ |
||||
|
Bucket: resp.Bucket, |
||||
|
Key: resp.Key, |
||||
|
UploadId: resp.UploadId, |
||||
|
} |
||||
|
_, err := svc.AbortMultipartUpload(abortInput) |
||||
|
return err |
||||
|
} |
@ -0,0 +1,337 @@ |
|||||
|
package command |
||||
|
|
||||
|
import ( |
||||
|
"context" |
||||
|
"errors" |
||||
|
"fmt" |
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
||||
|
"github.com/chrislusf/seaweedfs/weed/replication" |
||||
|
"github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" |
||||
|
"github.com/chrislusf/seaweedfs/weed/replication/source" |
||||
|
"github.com/chrislusf/seaweedfs/weed/security" |
||||
|
"github.com/chrislusf/seaweedfs/weed/util" |
||||
|
"github.com/chrislusf/seaweedfs/weed/util/grace" |
||||
|
"google.golang.org/grpc" |
||||
|
"io" |
||||
|
"strings" |
||||
|
"time" |
||||
|
) |
||||
|
|
||||
|
type SyncOptions struct { |
||||
|
isActivePassive *bool |
||||
|
filerA *string |
||||
|
filerB *string |
||||
|
aPath *string |
||||
|
bPath *string |
||||
|
aReplication *string |
||||
|
bReplication *string |
||||
|
aCollection *string |
||||
|
bCollection *string |
||||
|
aTtlSec *int |
||||
|
bTtlSec *int |
||||
|
aDebug *bool |
||||
|
bDebug *bool |
||||
|
} |
||||
|
|
||||
|
var ( |
||||
|
syncOptions SyncOptions |
||||
|
syncCpuProfile *string |
||||
|
syncMemProfile *string |
||||
|
) |
||||
|
|
||||
|
func init() { |
||||
|
cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
|
||||
|
syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow if true") |
||||
|
syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster") |
||||
|
syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster") |
||||
|
syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A") |
||||
|
syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B") |
||||
|
syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A") |
||||
|
syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B") |
||||
|
syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A") |
||||
|
syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B") |
||||
|
syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A") |
||||
|
syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B") |
||||
|
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files") |
||||
|
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files") |
||||
|
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file") |
||||
|
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file") |
||||
|
} |
||||
|
|
||||
|
var cmdFilerSynchronize = &Command{ |
||||
|
UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>", |
||||
|
Short: "continuously synchronize between two active-active or active-passive SeaweedFS clusters", |
||||
|
Long: `continuously synchronize file changes between two active-active or active-passive filers |
||||
|
|
||||
|
filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content, |
||||
|
and write to the other destination. Different from filer.replicate: |
||||
|
|
||||
|
* filer.sync only works between two filers. |
||||
|
* filer.sync does not need any special message queue setup. |
||||
|
* filer.sync supports both active-active and active-passive modes. |
||||
|
|
||||
|
If restarted, the synchronization will resume from the previous checkpoints, persisted every minute. |
||||
|
A fresh sync will start from the earliest metadata logs. |
||||
|
|
||||
|
`, |
||||
|
} |
||||
|
|
||||
|
func runFilerSynchronize(cmd *Command, args []string) bool { |
||||
|
|
||||
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") |
||||
|
|
||||
|
grace.SetupProfiling(*syncCpuProfile, *syncMemProfile) |
||||
|
|
||||
|
go func() { |
||||
|
for { |
||||
|
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.filerB, |
||||
|
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bDebug) |
||||
|
if err != nil { |
||||
|
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) |
||||
|
time.Sleep(1747 * time.Millisecond) |
||||
|
} |
||||
|
} |
||||
|
}() |
||||
|
|
||||
|
if !*syncOptions.isActivePassive { |
||||
|
go func() { |
||||
|
for { |
||||
|
err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.filerA, |
||||
|
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aDebug) |
||||
|
if err != nil { |
||||
|
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) |
||||
|
time.Sleep(2147 * time.Millisecond) |
||||
|
} |
||||
|
} |
||||
|
}() |
||||
|
} |
||||
|
|
||||
|
select {} |
||||
|
|
||||
|
return true |
||||
|
} |
||||
|
|
||||
|
func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath, targetFiler, targetPath string, |
||||
|
replicationStr, collection string, ttlSec int, debug bool) error { |
||||
|
|
||||
|
// read source filer signature
|
||||
|
sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler) |
||||
|
if sourceErr != nil { |
||||
|
return sourceErr |
||||
|
} |
||||
|
// read target filer signature
|
||||
|
targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler) |
||||
|
if targetErr != nil { |
||||
|
return targetErr |
||||
|
} |
||||
|
|
||||
|
// if first time, start from now
|
||||
|
// if has previously synced, resume from that point of time
|
||||
|
sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs) |
||||
|
|
||||
|
// create filer sink
|
||||
|
filerSource := &source.FilerSource{} |
||||
|
filerSource.DoInitialize(pb.ServerToGrpcAddress(sourceFiler), sourcePath) |
||||
|
filerSink := &filersink.FilerSink{} |
||||
|
filerSink.DoInitialize(pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption) |
||||
|
filerSink.SetSourceFiler(filerSource) |
||||
|
|
||||
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { |
||||
|
message := resp.EventNotification |
||||
|
|
||||
|
var sourceOldKey, sourceNewKey util.FullPath |
||||
|
if message.OldEntry != nil { |
||||
|
sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name) |
||||
|
} |
||||
|
if message.NewEntry != nil { |
||||
|
sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name) |
||||
|
} |
||||
|
|
||||
|
for _, sig := range message.Signatures { |
||||
|
if sig == targetFilerSignature && targetFilerSignature != 0 { |
||||
|
fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message) |
||||
|
return nil |
||||
|
} |
||||
|
} |
||||
|
if debug { |
||||
|
fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature) |
||||
|
} |
||||
|
|
||||
|
if !strings.HasPrefix(resp.Directory, sourcePath) { |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
// handle deletions
|
||||
|
if message.OldEntry != nil && message.NewEntry == nil { |
||||
|
if !strings.HasPrefix(string(sourceOldKey), sourcePath) { |
||||
|
return nil |
||||
|
} |
||||
|
key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) |
||||
|
return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) |
||||
|
} |
||||
|
|
||||
|
// handle new entries
|
||||
|
if message.OldEntry == nil && message.NewEntry != nil { |
||||
|
if !strings.HasPrefix(string(sourceNewKey), sourcePath) { |
||||
|
return nil |
||||
|
} |
||||
|
key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):]) |
||||
|
return filerSink.CreateEntry(key, message.NewEntry, message.Signatures) |
||||
|
} |
||||
|
|
||||
|
// this is something special?
|
||||
|
if message.OldEntry == nil && message.NewEntry == nil { |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
// handle updates
|
||||
|
if strings.HasPrefix(string(sourceOldKey), sourcePath) { |
||||
|
// old key is in the watched directory
|
||||
|
if strings.HasPrefix(string(sourceNewKey), sourcePath) { |
||||
|
// new key is also in the watched directory
|
||||
|
oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) |
||||
|
message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):]) |
||||
|
foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) |
||||
|
if foundExisting { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
// not able to find old entry
|
||||
|
if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil { |
||||
|
return fmt.Errorf("delete old entry %v: %v", oldKey, err) |
||||
|
} |
||||
|
|
||||
|
// create the new entry
|
||||
|
newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):]) |
||||
|
return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures) |
||||
|
|
||||
|
} else { |
||||
|
// new key is outside of the watched directory
|
||||
|
key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) |
||||
|
return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) |
||||
|
} |
||||
|
} else { |
||||
|
// old key is outside of the watched directory
|
||||
|
if strings.HasPrefix(string(sourceNewKey), sourcePath) { |
||||
|
// new key is in the watched directory
|
||||
|
key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):]) |
||||
|
return filerSink.CreateEntry(key, message.NewEntry, message.Signatures) |
||||
|
} else { |
||||
|
// new key is also outside of the watched directory
|
||||
|
// skip
|
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
||||
|
|
||||
|
ctx, cancel := context.WithCancel(context.Background()) |
||||
|
defer cancel() |
||||
|
|
||||
|
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ |
||||
|
ClientName: "syncTo_" + targetFiler, |
||||
|
PathPrefix: sourcePath, |
||||
|
SinceNs: sourceFilerOffsetTsNs, |
||||
|
Signature: targetFilerSignature, |
||||
|
}) |
||||
|
if err != nil { |
||||
|
return fmt.Errorf("listen: %v", err) |
||||
|
} |
||||
|
|
||||
|
var counter int64 |
||||
|
var lastWriteTime time.Time |
||||
|
for { |
||||
|
resp, listenErr := stream.Recv() |
||||
|
if listenErr == io.EOF { |
||||
|
return nil |
||||
|
} |
||||
|
if listenErr != nil { |
||||
|
return listenErr |
||||
|
} |
||||
|
|
||||
|
if err := processEventFn(resp); err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
counter++ |
||||
|
if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { |
||||
|
glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) |
||||
|
counter = 0 |
||||
|
lastWriteTime = time.Now() |
||||
|
if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil { |
||||
|
return err |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} |
||||
|
|
||||
|
}) |
||||
|
|
||||
|
} |
||||
|
|
||||
|
const ( |
||||
|
SyncKeyPrefix = "sync." |
||||
|
) |
||||
|
|
||||
|
func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) { |
||||
|
|
||||
|
readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
||||
|
syncKey := []byte(SyncKeyPrefix + "____") |
||||
|
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature)) |
||||
|
|
||||
|
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey}) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
if len(resp.Error) != 0 { |
||||
|
return errors.New(resp.Error) |
||||
|
} |
||||
|
if len(resp.Value) < 8 { |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
lastOffsetTsNs = int64(util.BytesToUint64(resp.Value)) |
||||
|
|
||||
|
return nil |
||||
|
}) |
||||
|
|
||||
|
return |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error { |
||||
|
return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
||||
|
|
||||
|
syncKey := []byte(SyncKeyPrefix + "____") |
||||
|
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature)) |
||||
|
|
||||
|
valueBuf := make([]byte, 8) |
||||
|
util.Uint64toBytes(valueBuf, uint64(offsetTsNs)) |
||||
|
|
||||
|
resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{ |
||||
|
Key: syncKey, |
||||
|
Value: valueBuf, |
||||
|
}) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
if len(resp.Error) != 0 { |
||||
|
return errors.New(resp.Error) |
||||
|
} |
||||
|
|
||||
|
return nil |
||||
|
|
||||
|
}) |
||||
|
|
||||
|
} |
@ -0,0 +1,338 @@ |
|||||
|
package elastic |
||||
|
|
||||
|
import ( |
||||
|
"context" |
||||
|
"fmt" |
||||
|
"math" |
||||
|
"strings" |
||||
|
|
||||
|
"github.com/chrislusf/seaweedfs/weed/filer" |
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
||||
|
weed_util "github.com/chrislusf/seaweedfs/weed/util" |
||||
|
jsoniter "github.com/json-iterator/go" |
||||
|
elastic "github.com/olivere/elastic/v7" |
||||
|
) |
||||
|
|
||||
|
var ( |
||||
|
indexType = "_doc" |
||||
|
indexPrefix = ".seaweedfs_" |
||||
|
indexKV = ".seaweedfs_kv_entries" |
||||
|
kvMappings = ` { |
||||
|
"mappings": { |
||||
|
"enabled": false, |
||||
|
"properties": { |
||||
|
"Value":{ |
||||
|
"type": "binary" |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
}` |
||||
|
) |
||||
|
|
||||
|
type ESEntry struct { |
||||
|
ParentId string `json:"ParentId"` |
||||
|
Entry *filer.Entry |
||||
|
} |
||||
|
|
||||
|
type ESKVEntry struct { |
||||
|
Value []byte `json:"Value"` |
||||
|
} |
||||
|
|
||||
|
func init() { |
||||
|
filer.Stores = append(filer.Stores, &ElasticStore{}) |
||||
|
} |
||||
|
|
||||
|
type ElasticStore struct { |
||||
|
client *elastic.Client |
||||
|
maxPageSize int |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) GetName() string { |
||||
|
return "elastic7" |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { |
||||
|
options := []elastic.ClientOptionFunc{} |
||||
|
servers := configuration.GetStringSlice(prefix + "servers") |
||||
|
options = append(options, elastic.SetURL(servers...)) |
||||
|
username := configuration.GetString(prefix + "username") |
||||
|
password := configuration.GetString(prefix + "password") |
||||
|
if username != "" && password != "" { |
||||
|
options = append(options, elastic.SetBasicAuth(username, password)) |
||||
|
} |
||||
|
options = append(options, elastic.SetSniff(configuration.GetBool(prefix+"sniff_enabled"))) |
||||
|
options = append(options, elastic.SetHealthcheck(configuration.GetBool(prefix+"healthcheck_enabled"))) |
||||
|
store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window") |
||||
|
if store.maxPageSize <= 0 { |
||||
|
store.maxPageSize = 10000 |
||||
|
} |
||||
|
glog.Infof("filer store elastic endpoints: %v.", servers) |
||||
|
return store.initialize(options) |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) initialize(options []elastic.ClientOptionFunc) (err error) { |
||||
|
ctx := context.Background() |
||||
|
store.client, err = elastic.NewClient(options...) |
||||
|
if err != nil { |
||||
|
return fmt.Errorf("init elastic %v.", err) |
||||
|
} |
||||
|
if ok, err := store.client.IndexExists(indexKV).Do(ctx); err == nil && !ok { |
||||
|
_, err = store.client.CreateIndex(indexKV).Body(kvMappings).Do(ctx) |
||||
|
if err != nil { |
||||
|
return fmt.Errorf("create index(%s) %v.", indexKV, err) |
||||
|
} |
||||
|
} |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) { |
||||
|
return ctx, nil |
||||
|
} |
||||
|
func (store *ElasticStore) CommitTransaction(ctx context.Context) error { |
||||
|
return nil |
||||
|
} |
||||
|
func (store *ElasticStore) RollbackTransaction(ctx context.Context) error { |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { |
||||
|
return nil, filer.ErrUnsupportedListDirectoryPrefixed |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { |
||||
|
index := getIndex(entry.FullPath) |
||||
|
dir, _ := entry.FullPath.DirAndName() |
||||
|
id := weed_util.Md5String([]byte(entry.FullPath)) |
||||
|
esEntry := &ESEntry{ |
||||
|
ParentId: weed_util.Md5String([]byte(dir)), |
||||
|
Entry: entry, |
||||
|
} |
||||
|
value, err := jsoniter.Marshal(esEntry) |
||||
|
if err != nil { |
||||
|
glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err) |
||||
|
return fmt.Errorf("insert entry %v.", err) |
||||
|
} |
||||
|
_, err = store.client.Index(). |
||||
|
Index(index). |
||||
|
Type(indexType). |
||||
|
Id(id). |
||||
|
BodyJson(string(value)). |
||||
|
Do(ctx) |
||||
|
if err != nil { |
||||
|
glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err) |
||||
|
return fmt.Errorf("insert entry %v.", err) |
||||
|
} |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { |
||||
|
return store.InsertEntry(ctx, entry) |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { |
||||
|
index := getIndex(fullpath) |
||||
|
id := weed_util.Md5String([]byte(fullpath)) |
||||
|
searchResult, err := store.client.Get(). |
||||
|
Index(index). |
||||
|
Type(indexType). |
||||
|
Id(id). |
||||
|
Do(ctx) |
||||
|
if elastic.IsNotFound(err) { |
||||
|
return nil, filer_pb.ErrNotFound |
||||
|
} |
||||
|
if searchResult != nil && searchResult.Found { |
||||
|
esEntry := &ESEntry{ |
||||
|
ParentId: "", |
||||
|
Entry: &filer.Entry{}, |
||||
|
} |
||||
|
err := jsoniter.Unmarshal(searchResult.Source, esEntry) |
||||
|
return esEntry.Entry, err |
||||
|
} |
||||
|
glog.Errorf("find entry(%s),%v.", string(fullpath), err) |
||||
|
return nil, filer_pb.ErrNotFound |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { |
||||
|
index := getIndex(fullpath) |
||||
|
id := weed_util.Md5String([]byte(fullpath)) |
||||
|
if strings.Count(string(fullpath), "/") == 1 { |
||||
|
return store.deleteIndex(ctx, index) |
||||
|
} |
||||
|
return store.deleteEntry(ctx, index, id) |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) { |
||||
|
deleteResult, err := store.client.DeleteIndex(index).Do(ctx) |
||||
|
if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) { |
||||
|
return nil |
||||
|
} |
||||
|
glog.Errorf("delete index(%s) %v.", index, err) |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) { |
||||
|
deleteResult, err := store.client.Delete(). |
||||
|
Index(index). |
||||
|
Type(indexType). |
||||
|
Id(id). |
||||
|
Do(ctx) |
||||
|
if err == nil { |
||||
|
if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" { |
||||
|
return nil |
||||
|
} |
||||
|
} |
||||
|
glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err) |
||||
|
return fmt.Errorf("delete entry %v.", err) |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { |
||||
|
if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil { |
||||
|
for _, entry := range entries { |
||||
|
store.DeleteEntry(ctx, entry.FullPath) |
||||
|
} |
||||
|
} |
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) ListDirectoryEntries( |
||||
|
ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, |
||||
|
) (entries []*filer.Entry, err error) { |
||||
|
if string(fullpath) == "/" { |
||||
|
return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit) |
||||
|
} |
||||
|
return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit) |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) { |
||||
|
indexResult, err := store.client.CatIndices().Do(ctx) |
||||
|
if err != nil { |
||||
|
glog.Errorf("list indices %v.", err) |
||||
|
return entries, err |
||||
|
} |
||||
|
for _, index := range indexResult { |
||||
|
if index.Index == indexKV { |
||||
|
continue |
||||
|
} |
||||
|
if strings.HasPrefix(index.Index, indexPrefix) { |
||||
|
if entry, err := store.FindEntry(ctx, |
||||
|
weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil { |
||||
|
fileName := getFileName(entry.FullPath) |
||||
|
if fileName == startFileName && !inclusive { |
||||
|
continue |
||||
|
} |
||||
|
limit-- |
||||
|
if limit < 0 { |
||||
|
break |
||||
|
} |
||||
|
entries = append(entries, entry) |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
return entries, nil |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) listDirectoryEntries( |
||||
|
ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, |
||||
|
) (entries []*filer.Entry, err error) { |
||||
|
first := true |
||||
|
index := getIndex(fullpath) |
||||
|
nextStart := "" |
||||
|
parentId := weed_util.Md5String([]byte(fullpath)) |
||||
|
if _, err := store.client.Refresh(index).Do(ctx); err != nil { |
||||
|
if elastic.IsNotFound(err) { |
||||
|
store.client.CreateIndex(index).Do(ctx) |
||||
|
return entries, nil |
||||
|
} |
||||
|
} |
||||
|
for { |
||||
|
result := &elastic.SearchResult{} |
||||
|
if (startFileName == "" && first) || inclusive { |
||||
|
if result, err = store.search(ctx, index, parentId); err != nil { |
||||
|
glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) |
||||
|
return entries, err |
||||
|
} |
||||
|
} else { |
||||
|
fullPath := string(fullpath) + "/" + startFileName |
||||
|
if !first { |
||||
|
fullPath = nextStart |
||||
|
} |
||||
|
after := weed_util.Md5String([]byte(fullPath)) |
||||
|
if result, err = store.searchAfter(ctx, index, parentId, after); err != nil { |
||||
|
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) |
||||
|
return entries, err |
||||
|
} |
||||
|
} |
||||
|
first = false |
||||
|
for _, hit := range result.Hits.Hits { |
||||
|
esEntry := &ESEntry{ |
||||
|
ParentId: "", |
||||
|
Entry: &filer.Entry{}, |
||||
|
} |
||||
|
if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil { |
||||
|
limit-- |
||||
|
if limit < 0 { |
||||
|
return entries, nil |
||||
|
} |
||||
|
nextStart = string(esEntry.Entry.FullPath) |
||||
|
fileName := getFileName(esEntry.Entry.FullPath) |
||||
|
if fileName == startFileName && !inclusive { |
||||
|
continue |
||||
|
} |
||||
|
entries = append(entries, esEntry.Entry) |
||||
|
} |
||||
|
} |
||||
|
if len(result.Hits.Hits) < store.maxPageSize { |
||||
|
break |
||||
|
} |
||||
|
} |
||||
|
return entries, nil |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) { |
||||
|
if count, err := store.client.Count(index).Do(ctx); err == nil && count == 0 { |
||||
|
return &elastic.SearchResult{ |
||||
|
Hits: &elastic.SearchHits{ |
||||
|
Hits: make([]*elastic.SearchHit, 0)}, |
||||
|
}, nil |
||||
|
} |
||||
|
queryResult, err := store.client.Search(). |
||||
|
Index(index). |
||||
|
Query(elastic.NewMatchQuery("ParentId", parentId)). |
||||
|
Size(store.maxPageSize). |
||||
|
Sort("_id", false). |
||||
|
Do(ctx) |
||||
|
return queryResult, err |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) searchAfter(ctx context.Context, index, parentId, after string) (result *elastic.SearchResult, err error) { |
||||
|
queryResult, err := store.client.Search(). |
||||
|
Index(index). |
||||
|
Query(elastic.NewMatchQuery("ParentId", parentId)). |
||||
|
SearchAfter(after). |
||||
|
Size(store.maxPageSize). |
||||
|
Sort("_id", false). |
||||
|
Do(ctx) |
||||
|
return queryResult, err |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) Shutdown() { |
||||
|
store.client.Stop() |
||||
|
} |
||||
|
|
||||
|
func getIndex(fullpath weed_util.FullPath) string { |
||||
|
path := strings.Split(string(fullpath), "/") |
||||
|
if len(path) > 1 { |
||||
|
return indexPrefix + path[1] |
||||
|
} |
||||
|
return "" |
||||
|
} |
||||
|
|
||||
|
func getFileName(fullpath weed_util.FullPath) string { |
||||
|
path := strings.Split(string(fullpath), "/") |
||||
|
if len(path) > 1 { |
||||
|
return path[len(path)-1] |
||||
|
} |
||||
|
return "" |
||||
|
} |
@ -0,0 +1,65 @@ |
|||||
|
package elastic |
||||
|
|
||||
|
import ( |
||||
|
"context" |
||||
|
"fmt" |
||||
|
|
||||
|
"github.com/chrislusf/seaweedfs/weed/filer" |
||||
|
|
||||
|
"github.com/chrislusf/seaweedfs/weed/glog" |
||||
|
jsoniter "github.com/json-iterator/go" |
||||
|
elastic "github.com/olivere/elastic/v7" |
||||
|
) |
||||
|
|
||||
|
func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) { |
||||
|
deleteResult, err := store.client.Delete(). |
||||
|
Index(indexKV). |
||||
|
Type(indexType). |
||||
|
Id(string(key)). |
||||
|
Do(ctx) |
||||
|
if err == nil { |
||||
|
if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" { |
||||
|
return nil |
||||
|
} |
||||
|
} |
||||
|
glog.Errorf("delete key(id:%s) %v.", string(key), err) |
||||
|
return fmt.Errorf("delete key %v.", err) |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { |
||||
|
searchResult, err := store.client.Get(). |
||||
|
Index(indexKV). |
||||
|
Type(indexType). |
||||
|
Id(string(key)). |
||||
|
Do(ctx) |
||||
|
if elastic.IsNotFound(err) { |
||||
|
return value, filer.ErrKvNotFound |
||||
|
} |
||||
|
if searchResult != nil && searchResult.Found { |
||||
|
esEntry := &ESKVEntry{} |
||||
|
if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil { |
||||
|
return esEntry.Value, nil |
||||
|
} |
||||
|
} |
||||
|
glog.Errorf("find key(%s),%v.", string(key), err) |
||||
|
return value, filer.ErrKvNotFound |
||||
|
} |
||||
|
|
||||
|
func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { |
||||
|
esEntry := &ESKVEntry{value} |
||||
|
val, err := jsoniter.Marshal(esEntry) |
||||
|
if err != nil { |
||||
|
glog.Errorf("insert key(%s) %v.", string(key), err) |
||||
|
return fmt.Errorf("insert key %v.", err) |
||||
|
} |
||||
|
_, err = store.client.Index(). |
||||
|
Index(indexKV). |
||||
|
Type(indexType). |
||||
|
Id(string(key)). |
||||
|
BodyJson(string(val)). |
||||
|
Do(ctx) |
||||
|
if err != nil { |
||||
|
return fmt.Errorf("kv put: %v", err) |
||||
|
} |
||||
|
return nil |
||||
|
} |
@ -1,43 +0,0 @@ |
|||||
package leveldb |
|
||||
|
|
||||
import ( |
|
||||
"fmt" |
|
||||
|
|
||||
"github.com/chrislusf/seaweedfs/weed/filer" |
|
||||
"github.com/chrislusf/seaweedfs/weed/util" |
|
||||
) |
|
||||
|
|
||||
var ( |
|
||||
_ = filer.FilerLocalStore(&LevelDB2Store{}) |
|
||||
) |
|
||||
|
|
||||
func (store *LevelDB2Store) UpdateOffset(filer string, lastTsNs int64) error { |
|
||||
|
|
||||
value := make([]byte, 8) |
|
||||
util.Uint64toBytes(value, uint64(lastTsNs)) |
|
||||
|
|
||||
err := store.dbs[0].Put([]byte("meta"+filer), value, nil) |
|
||||
|
|
||||
if err != nil { |
|
||||
return fmt.Errorf("UpdateOffset %s : %v", filer, err) |
|
||||
} |
|
||||
|
|
||||
println("UpdateOffset", filer, "lastTsNs", lastTsNs) |
|
||||
|
|
||||
return nil |
|
||||
} |
|
||||
|
|
||||
func (store *LevelDB2Store) ReadOffset(filer string) (lastTsNs int64, err error) { |
|
||||
|
|
||||
value, err := store.dbs[0].Get([]byte("meta"+filer), nil) |
|
||||
|
|
||||
if err != nil { |
|
||||
return 0, fmt.Errorf("ReadOffset %s : %v", filer, err) |
|
||||
} |
|
||||
|
|
||||
lastTsNs = int64(util.BytesToUint64(value)) |
|
||||
|
|
||||
println("ReadOffset", filer, "lastTsNs", lastTsNs) |
|
||||
|
|
||||
return |
|
||||
} |
|
@ -0,0 +1,101 @@ |
|||||
|
package meta_cache |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"strconv" |
||||
|
"strings" |
||||
|
) |
||||
|
|
||||
|
type UidGidMapper struct { |
||||
|
uidMapper *IdMapper |
||||
|
gidMapper *IdMapper |
||||
|
} |
||||
|
|
||||
|
type IdMapper struct { |
||||
|
localToFiler map[uint32]uint32 |
||||
|
filerToLocal map[uint32]uint32 |
||||
|
} |
||||
|
|
||||
|
// UidGidMapper translates local uid/gid to filer uid/gid
|
||||
|
// The local storage always persists the same as the filer.
|
||||
|
// The local->filer translation happens when updating the filer first and later saving to meta_cache.
|
||||
|
// And filer->local happens when reading from the meta_cache.
|
||||
|
func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) { |
||||
|
uidMapper, err := newIdMapper(uidPairsStr) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
gidMapper, err := newIdMapper(gidPairStr) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
|
||||
|
return &UidGidMapper{ |
||||
|
uidMapper: uidMapper, |
||||
|
gidMapper: gidMapper, |
||||
|
}, nil |
||||
|
} |
||||
|
|
||||
|
func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) { |
||||
|
return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid) |
||||
|
} |
||||
|
func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) { |
||||
|
return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid) |
||||
|
} |
||||
|
|
||||
|
func (m *IdMapper) LocalToFiler(id uint32) uint32 { |
||||
|
value, found := m.localToFiler[id] |
||||
|
if found { |
||||
|
return value |
||||
|
} |
||||
|
return id |
||||
|
} |
||||
|
func (m *IdMapper) FilerToLocal(id uint32) uint32 { |
||||
|
value, found := m.filerToLocal[id] |
||||
|
if found { |
||||
|
return value |
||||
|
} |
||||
|
return id |
||||
|
} |
||||
|
|
||||
|
func newIdMapper(pairsStr string) (*IdMapper, error) { |
||||
|
|
||||
|
localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr) |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
|
||||
|
return &IdMapper{ |
||||
|
localToFiler: localToFiler, |
||||
|
filerToLocal: filerToLocal, |
||||
|
}, nil |
||||
|
|
||||
|
} |
||||
|
|
||||
|
func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) { |
||||
|
|
||||
|
if pairsStr == "" { |
||||
|
return |
||||
|
} |
||||
|
|
||||
|
localToFiler = make(map[uint32]uint32) |
||||
|
filerToLocal = make(map[uint32]uint32) |
||||
|
for _, pairStr := range strings.Split(pairsStr, ",") { |
||||
|
pair := strings.Split(pairStr, ":") |
||||
|
localUidStr, filerUidStr := pair[0], pair[1] |
||||
|
localUid, localUidErr := strconv.Atoi(localUidStr) |
||||
|
if localUidErr != nil { |
||||
|
err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr) |
||||
|
return |
||||
|
} |
||||
|
filerUid, filerUidErr := strconv.Atoi(filerUidStr) |
||||
|
if filerUidErr != nil { |
||||
|
err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr) |
||||
|
return |
||||
|
} |
||||
|
localToFiler[uint32(localUid)] = uint32(filerUid) |
||||
|
filerToLocal[uint32(filerUid)] = uint32(localUid) |
||||
|
} |
||||
|
|
||||
|
return |
||||
|
} |
1068
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,42 @@ |
|||||
|
package weed_server |
||||
|
|
||||
|
import ( |
||||
|
"context" |
||||
|
"github.com/chrislusf/seaweedfs/weed/filer" |
||||
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
||||
|
) |
||||
|
|
||||
|
func (fs *FilerServer) KvGet(ctx context.Context, req *filer_pb.KvGetRequest) (*filer_pb.KvGetResponse, error) { |
||||
|
|
||||
|
value, err := fs.filer.Store.KvGet(ctx, req.Key) |
||||
|
if err == filer.ErrKvNotFound { |
||||
|
return &filer_pb.KvGetResponse{}, nil |
||||
|
} |
||||
|
|
||||
|
if err != nil { |
||||
|
return &filer_pb.KvGetResponse{Error: err.Error()}, nil |
||||
|
} |
||||
|
|
||||
|
return &filer_pb.KvGetResponse{ |
||||
|
Value: value, |
||||
|
}, nil |
||||
|
|
||||
|
} |
||||
|
|
||||
|
// KvPut sets the key~value. if empty value, delete the kv entry
|
||||
|
func (fs *FilerServer) KvPut(ctx context.Context, req *filer_pb.KvPutRequest) (*filer_pb.KvPutResponse, error) { |
||||
|
|
||||
|
if len(req.Value) == 0 { |
||||
|
if err := fs.filer.Store.KvDelete(ctx, req.Key); err != nil { |
||||
|
return &filer_pb.KvPutResponse{Error: err.Error()}, nil |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
err := fs.filer.Store.KvPut(ctx, req.Key, req.Value) |
||||
|
if err != nil { |
||||
|
return &filer_pb.KvPutResponse{Error: err.Error()}, nil |
||||
|
} |
||||
|
|
||||
|
return &filer_pb.KvPutResponse{}, nil |
||||
|
|
||||
|
} |
Some files were not shown because too many files changed in this diff
Write
Preview
Loading…
Cancel
Save
Reference in new issue